Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/network/auth.py: 26%

278 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1"""Network Authentication Helpers 

2 

3Contains interface (MultiDomainBasicAuth) and associated glue code for 

4providing credentials in the context of network requests. 

5""" 

6import logging 

7import os 

8import shutil 

9import subprocess 

10import sysconfig 

11import typing 

12import urllib.parse 

13from abc import ABC, abstractmethod 

14from functools import lru_cache 

15from os.path import commonprefix 

16from pathlib import Path 

17from typing import Any, Dict, List, NamedTuple, Optional, Tuple 

18 

19from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth 

20from pip._vendor.requests.models import Request, Response 

21from pip._vendor.requests.utils import get_netrc_auth 

22 

23from pip._internal.utils.logging import getLogger 

24from pip._internal.utils.misc import ( 

25 ask, 

26 ask_input, 

27 ask_password, 

28 remove_auth_from_url, 

29 split_auth_netloc_from_url, 

30) 

31from pip._internal.vcs.versioncontrol import AuthInfo 

32 

33logger = getLogger(__name__) 

34 

35KEYRING_DISABLED = False 

36 

37 

38class Credentials(NamedTuple): 

39 url: str 

40 username: str 

41 password: str 

42 

43 

44class KeyRingBaseProvider(ABC): 

45 """Keyring base provider interface""" 

46 

47 has_keyring: bool 

48 

49 @abstractmethod 

50 def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: 

51 ... 

52 

53 @abstractmethod 

54 def save_auth_info(self, url: str, username: str, password: str) -> None: 

55 ... 

56 

57 

58class KeyRingNullProvider(KeyRingBaseProvider): 

59 """Keyring null provider""" 

60 

61 has_keyring = False 

62 

63 def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: 

64 return None 

65 

66 def save_auth_info(self, url: str, username: str, password: str) -> None: 

67 return None 

68 

69 

70class KeyRingPythonProvider(KeyRingBaseProvider): 

71 """Keyring interface which uses locally imported `keyring`""" 

72 

73 has_keyring = True 

74 

75 def __init__(self) -> None: 

76 import keyring 

77 

78 self.keyring = keyring 

79 

80 def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: 

81 # Support keyring's get_credential interface which supports getting 

82 # credentials without a username. This is only available for 

83 # keyring>=15.2.0. 

84 if hasattr(self.keyring, "get_credential"): 

85 logger.debug("Getting credentials from keyring for %s", url) 

86 cred = self.keyring.get_credential(url, username) 

87 if cred is not None: 

88 return cred.username, cred.password 

89 return None 

90 

91 if username is not None: 

92 logger.debug("Getting password from keyring for %s", url) 

93 password = self.keyring.get_password(url, username) 

94 if password: 

95 return username, password 

96 return None 

97 

98 def save_auth_info(self, url: str, username: str, password: str) -> None: 

99 self.keyring.set_password(url, username, password) 

100 

101 

102class KeyRingCliProvider(KeyRingBaseProvider): 

103 """Provider which uses `keyring` cli 

104 

105 Instead of calling the keyring package installed alongside pip 

106 we call keyring on the command line which will enable pip to 

107 use which ever installation of keyring is available first in 

108 PATH. 

109 """ 

110 

111 has_keyring = True 

112 

113 def __init__(self, cmd: str) -> None: 

114 self.keyring = cmd 

115 

116 def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: 

117 # This is the default implementation of keyring.get_credential 

118 # https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139 

119 if username is not None: 

120 password = self._get_password(url, username) 

121 if password is not None: 

122 return username, password 

123 return None 

124 

125 def save_auth_info(self, url: str, username: str, password: str) -> None: 

126 return self._set_password(url, username, password) 

127 

128 def _get_password(self, service_name: str, username: str) -> Optional[str]: 

129 """Mirror the implementation of keyring.get_password using cli""" 

130 if self.keyring is None: 

131 return None 

132 

133 cmd = [self.keyring, "get", service_name, username] 

134 env = os.environ.copy() 

135 env["PYTHONIOENCODING"] = "utf-8" 

136 res = subprocess.run( 

137 cmd, 

138 stdin=subprocess.DEVNULL, 

139 stdout=subprocess.PIPE, 

140 env=env, 

141 ) 

142 if res.returncode: 

143 return None 

144 return res.stdout.decode("utf-8").strip(os.linesep) 

145 

146 def _set_password(self, service_name: str, username: str, password: str) -> None: 

147 """Mirror the implementation of keyring.set_password using cli""" 

148 if self.keyring is None: 

149 return None 

150 env = os.environ.copy() 

151 env["PYTHONIOENCODING"] = "utf-8" 

152 subprocess.run( 

153 [self.keyring, "set", service_name, username], 

154 input=f"{password}{os.linesep}".encode("utf-8"), 

155 env=env, 

156 check=True, 

157 ) 

158 return None 

159 

160 

161@lru_cache(maxsize=None) 

162def get_keyring_provider(provider: str) -> KeyRingBaseProvider: 

163 logger.verbose("Keyring provider requested: %s", provider) 

164 

165 # keyring has previously failed and been disabled 

166 if KEYRING_DISABLED: 

167 provider = "disabled" 

168 if provider in ["import", "auto"]: 

169 try: 

170 impl = KeyRingPythonProvider() 

171 logger.verbose("Keyring provider set: import") 

172 return impl 

173 except ImportError: 

174 pass 

175 except Exception as exc: 

176 # In the event of an unexpected exception 

177 # we should warn the user 

178 msg = "Installed copy of keyring fails with exception %s" 

179 if provider == "auto": 

180 msg = msg + ", trying to find a keyring executable as a fallback" 

181 logger.warning(msg, exc, exc_info=logger.isEnabledFor(logging.DEBUG)) 

182 if provider in ["subprocess", "auto"]: 

183 cli = shutil.which("keyring") 

184 if cli and cli.startswith(sysconfig.get_path("scripts")): 

185 # all code within this function is stolen from shutil.which implementation 

186 @typing.no_type_check 

187 def PATH_as_shutil_which_determines_it() -> str: 

188 path = os.environ.get("PATH", None) 

189 if path is None: 

190 try: 

191 path = os.confstr("CS_PATH") 

192 except (AttributeError, ValueError): 

193 # os.confstr() or CS_PATH is not available 

194 path = os.defpath 

195 # bpo-35755: Don't use os.defpath if the PATH environment variable is 

196 # set to an empty string 

197 

198 return path 

199 

200 scripts = Path(sysconfig.get_path("scripts")) 

201 

202 paths = [] 

203 for path in PATH_as_shutil_which_determines_it().split(os.pathsep): 

204 p = Path(path) 

205 try: 

206 if not p.samefile(scripts): 

207 paths.append(path) 

208 except FileNotFoundError: 

209 pass 

210 

211 path = os.pathsep.join(paths) 

212 

213 cli = shutil.which("keyring", path=path) 

214 

215 if cli: 

216 logger.verbose("Keyring provider set: subprocess with executable %s", cli) 

217 return KeyRingCliProvider(cli) 

218 

219 logger.verbose("Keyring provider set: disabled") 

220 return KeyRingNullProvider() 

221 

222 

223class MultiDomainBasicAuth(AuthBase): 

224 def __init__( 

225 self, 

226 prompting: bool = True, 

227 index_urls: Optional[List[str]] = None, 

228 keyring_provider: str = "auto", 

229 ) -> None: 

230 self.prompting = prompting 

231 self.index_urls = index_urls 

232 self.keyring_provider = keyring_provider # type: ignore[assignment] 

233 self.passwords: Dict[str, AuthInfo] = {} 

234 # When the user is prompted to enter credentials and keyring is 

235 # available, we will offer to save them. If the user accepts, 

236 # this value is set to the credentials they entered. After the 

237 # request authenticates, the caller should call 

238 # ``save_credentials`` to save these. 

239 self._credentials_to_save: Optional[Credentials] = None 

240 

241 @property 

242 def keyring_provider(self) -> KeyRingBaseProvider: 

243 return get_keyring_provider(self._keyring_provider) 

244 

245 @keyring_provider.setter 

246 def keyring_provider(self, provider: str) -> None: 

247 # The free function get_keyring_provider has been decorated with 

248 # functools.cache. If an exception occurs in get_keyring_auth that 

249 # cache will be cleared and keyring disabled, take that into account 

250 # if you want to remove this indirection. 

251 self._keyring_provider = provider 

252 

253 @property 

254 def use_keyring(self) -> bool: 

255 # We won't use keyring when --no-input is passed unless 

256 # a specific provider is requested because it might require 

257 # user interaction 

258 return self.prompting or self._keyring_provider not in ["auto", "disabled"] 

259 

260 def _get_keyring_auth( 

261 self, 

262 url: Optional[str], 

263 username: Optional[str], 

264 ) -> Optional[AuthInfo]: 

265 """Return the tuple auth for a given url from keyring.""" 

266 # Do nothing if no url was provided 

267 if not url: 

268 return None 

269 

270 try: 

271 return self.keyring_provider.get_auth_info(url, username) 

272 except Exception as exc: 

273 logger.warning( 

274 "Keyring is skipped due to an exception: %s", 

275 str(exc), 

276 ) 

277 global KEYRING_DISABLED 

278 KEYRING_DISABLED = True 

279 get_keyring_provider.cache_clear() 

280 return None 

281 

282 def _get_index_url(self, url: str) -> Optional[str]: 

283 """Return the original index URL matching the requested URL. 

284 

285 Cached or dynamically generated credentials may work against 

286 the original index URL rather than just the netloc. 

287 

288 The provided url should have had its username and password 

289 removed already. If the original index url had credentials then 

290 they will be included in the return value. 

291 

292 Returns None if no matching index was found, or if --no-index 

293 was specified by the user. 

294 """ 

295 if not url or not self.index_urls: 

296 return None 

297 

298 url = remove_auth_from_url(url).rstrip("/") + "/" 

299 parsed_url = urllib.parse.urlsplit(url) 

300 

301 candidates = [] 

302 

303 for index in self.index_urls: 

304 index = index.rstrip("/") + "/" 

305 parsed_index = urllib.parse.urlsplit(remove_auth_from_url(index)) 

306 if parsed_url == parsed_index: 

307 return index 

308 

309 if parsed_url.netloc != parsed_index.netloc: 

310 continue 

311 

312 candidate = urllib.parse.urlsplit(index) 

313 candidates.append(candidate) 

314 

315 if not candidates: 

316 return None 

317 

318 candidates.sort( 

319 reverse=True, 

320 key=lambda candidate: commonprefix( 

321 [ 

322 parsed_url.path, 

323 candidate.path, 

324 ] 

325 ).rfind("/"), 

326 ) 

327 

328 return urllib.parse.urlunsplit(candidates[0]) 

329 

330 def _get_new_credentials( 

331 self, 

332 original_url: str, 

333 *, 

334 allow_netrc: bool = True, 

335 allow_keyring: bool = False, 

336 ) -> AuthInfo: 

337 """Find and return credentials for the specified URL.""" 

338 # Split the credentials and netloc from the url. 

339 url, netloc, url_user_password = split_auth_netloc_from_url( 

340 original_url, 

341 ) 

342 

343 # Start with the credentials embedded in the url 

344 username, password = url_user_password 

345 if username is not None and password is not None: 

346 logger.debug("Found credentials in url for %s", netloc) 

347 return url_user_password 

348 

349 # Find a matching index url for this request 

350 index_url = self._get_index_url(url) 

351 if index_url: 

352 # Split the credentials from the url. 

353 index_info = split_auth_netloc_from_url(index_url) 

354 if index_info: 

355 index_url, _, index_url_user_password = index_info 

356 logger.debug("Found index url %s", index_url) 

357 

358 # If an index URL was found, try its embedded credentials 

359 if index_url and index_url_user_password[0] is not None: 

360 username, password = index_url_user_password 

361 if username is not None and password is not None: 

362 logger.debug("Found credentials in index url for %s", netloc) 

363 return index_url_user_password 

364 

365 # Get creds from netrc if we still don't have them 

366 if allow_netrc: 

367 netrc_auth = get_netrc_auth(original_url) 

368 if netrc_auth: 

369 logger.debug("Found credentials in netrc for %s", netloc) 

370 return netrc_auth 

371 

372 # If we don't have a password and keyring is available, use it. 

373 if allow_keyring: 

374 # The index url is more specific than the netloc, so try it first 

375 # fmt: off 

376 kr_auth = ( 

377 self._get_keyring_auth(index_url, username) or 

378 self._get_keyring_auth(netloc, username) 

379 ) 

380 # fmt: on 

381 if kr_auth: 

382 logger.debug("Found credentials in keyring for %s", netloc) 

383 return kr_auth 

384 

385 return username, password 

386 

387 def _get_url_and_credentials( 

388 self, original_url: str 

389 ) -> Tuple[str, Optional[str], Optional[str]]: 

390 """Return the credentials to use for the provided URL. 

391 

392 If allowed, netrc and keyring may be used to obtain the 

393 correct credentials. 

394 

395 Returns (url_without_credentials, username, password). Note 

396 that even if the original URL contains credentials, this 

397 function may return a different username and password. 

398 """ 

399 url, netloc, _ = split_auth_netloc_from_url(original_url) 

400 

401 # Try to get credentials from original url 

402 username, password = self._get_new_credentials(original_url) 

403 

404 # If credentials not found, use any stored credentials for this netloc. 

405 # Do this if either the username or the password is missing. 

406 # This accounts for the situation in which the user has specified 

407 # the username in the index url, but the password comes from keyring. 

408 if (username is None or password is None) and netloc in self.passwords: 

409 un, pw = self.passwords[netloc] 

410 # It is possible that the cached credentials are for a different username, 

411 # in which case the cache should be ignored. 

412 if username is None or username == un: 

413 username, password = un, pw 

414 

415 if username is not None or password is not None: 

416 # Convert the username and password if they're None, so that 

417 # this netloc will show up as "cached" in the conditional above. 

418 # Further, HTTPBasicAuth doesn't accept None, so it makes sense to 

419 # cache the value that is going to be used. 

420 username = username or "" 

421 password = password or "" 

422 

423 # Store any acquired credentials. 

424 self.passwords[netloc] = (username, password) 

425 

426 assert ( 

427 # Credentials were found 

428 (username is not None and password is not None) 

429 # Credentials were not found 

430 or (username is None and password is None) 

431 ), f"Could not load credentials from url: {original_url}" 

432 

433 return url, username, password 

434 

435 def __call__(self, req: Request) -> Request: 

436 # Get credentials for this request 

437 url, username, password = self._get_url_and_credentials(req.url) 

438 

439 # Set the url of the request to the url without any credentials 

440 req.url = url 

441 

442 if username is not None and password is not None: 

443 # Send the basic auth with this request 

444 req = HTTPBasicAuth(username, password)(req) 

445 

446 # Attach a hook to handle 401 responses 

447 req.register_hook("response", self.handle_401) 

448 

449 return req 

450 

451 # Factored out to allow for easy patching in tests 

452 def _prompt_for_password( 

453 self, netloc: str 

454 ) -> Tuple[Optional[str], Optional[str], bool]: 

455 username = ask_input(f"User for {netloc}: ") if self.prompting else None 

456 if not username: 

457 return None, None, False 

458 if self.use_keyring: 

459 auth = self._get_keyring_auth(netloc, username) 

460 if auth and auth[0] is not None and auth[1] is not None: 

461 return auth[0], auth[1], False 

462 password = ask_password("Password: ") 

463 return username, password, True 

464 

465 # Factored out to allow for easy patching in tests 

466 def _should_save_password_to_keyring(self) -> bool: 

467 if ( 

468 not self.prompting 

469 or not self.use_keyring 

470 or not self.keyring_provider.has_keyring 

471 ): 

472 return False 

473 return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y" 

474 

475 def handle_401(self, resp: Response, **kwargs: Any) -> Response: 

476 # We only care about 401 responses, anything else we want to just 

477 # pass through the actual response 

478 if resp.status_code != 401: 

479 return resp 

480 

481 username, password = None, None 

482 

483 # Query the keyring for credentials: 

484 if self.use_keyring: 

485 username, password = self._get_new_credentials( 

486 resp.url, 

487 allow_netrc=False, 

488 allow_keyring=True, 

489 ) 

490 

491 # We are not able to prompt the user so simply return the response 

492 if not self.prompting and not username and not password: 

493 return resp 

494 

495 parsed = urllib.parse.urlparse(resp.url) 

496 

497 # Prompt the user for a new username and password 

498 save = False 

499 if not username and not password: 

500 username, password, save = self._prompt_for_password(parsed.netloc) 

501 

502 # Store the new username and password to use for future requests 

503 self._credentials_to_save = None 

504 if username is not None and password is not None: 

505 self.passwords[parsed.netloc] = (username, password) 

506 

507 # Prompt to save the password to keyring 

508 if save and self._should_save_password_to_keyring(): 

509 self._credentials_to_save = Credentials( 

510 url=parsed.netloc, 

511 username=username, 

512 password=password, 

513 ) 

514 

515 # Consume content and release the original connection to allow our new 

516 # request to reuse the same one. 

517 # The result of the assignment isn't used, it's just needed to consume 

518 # the content. 

519 _ = resp.content 

520 resp.raw.release_conn() 

521 

522 # Add our new username and password to the request 

523 req = HTTPBasicAuth(username or "", password or "")(resp.request) 

524 req.register_hook("response", self.warn_on_401) 

525 

526 # On successful request, save the credentials that were used to 

527 # keyring. (Note that if the user responded "no" above, this member 

528 # is not set and nothing will be saved.) 

529 if self._credentials_to_save: 

530 req.register_hook("response", self.save_credentials) 

531 

532 # Send our new request 

533 new_resp = resp.connection.send(req, **kwargs) 

534 new_resp.history.append(resp) 

535 

536 return new_resp 

537 

538 def warn_on_401(self, resp: Response, **kwargs: Any) -> None: 

539 """Response callback to warn about incorrect credentials.""" 

540 if resp.status_code == 401: 

541 logger.warning( 

542 "401 Error, Credentials not correct for %s", 

543 resp.request.url, 

544 ) 

545 

546 def save_credentials(self, resp: Response, **kwargs: Any) -> None: 

547 """Response callback to save credentials on success.""" 

548 assert ( 

549 self.keyring_provider.has_keyring 

550 ), "should never reach here without keyring" 

551 

552 creds = self._credentials_to_save 

553 self._credentials_to_save = None 

554 if creds and resp.status_code < 400: 

555 try: 

556 logger.info("Saving credentials to keyring") 

557 self.keyring_provider.save_auth_info( 

558 creds.url, creds.username, creds.password 

559 ) 

560 except Exception: 

561 logger.exception("Failed to save credentials")