1"""Network Authentication Helpers
2
3Contains interface (MultiDomainBasicAuth) and associated glue code for
4providing credentials in the context of network requests.
5"""
6
7from __future__ import annotations
8
9import logging
10import os
11import shutil
12import subprocess
13import sysconfig
14import typing
15import urllib.parse
16from abc import ABC, abstractmethod
17from functools import cache
18from os.path import commonpath
19from pathlib import Path
20from typing import Any, NamedTuple
21
22from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
23from pip._vendor.requests.utils import get_netrc_auth
24
25from pip._internal.utils.logging import getLogger
26from pip._internal.utils.misc import (
27 ask,
28 ask_input,
29 ask_password,
30 remove_auth_from_url,
31 split_auth_netloc_from_url,
32)
33from pip._internal.vcs.versioncontrol import AuthInfo
34
35if typing.TYPE_CHECKING:
36 from pip._vendor.requests import PreparedRequest
37 from pip._vendor.requests.models import Response
38
39logger = getLogger(__name__)
40
41KEYRING_DISABLED = False
42
43
44class Credentials(NamedTuple):
45 url: str
46 username: str
47 password: str
48
49
50class KeyRingBaseProvider(ABC):
51 """Keyring base provider interface"""
52
53 has_keyring: bool
54
55 @abstractmethod
56 def get_auth_info(self, url: str, username: str | None) -> AuthInfo | None: ...
57
58 @abstractmethod
59 def save_auth_info(self, url: str, username: str, password: str) -> None: ...
60
61
62class KeyRingNullProvider(KeyRingBaseProvider):
63 """Keyring null provider"""
64
65 has_keyring = False
66
67 def get_auth_info(self, url: str, username: str | None) -> AuthInfo | None:
68 return None
69
70 def save_auth_info(self, url: str, username: str, password: str) -> None:
71 return None
72
73
74class KeyRingPythonProvider(KeyRingBaseProvider):
75 """Keyring interface which uses locally imported `keyring`"""
76
77 has_keyring = True
78
79 def __init__(self) -> None:
80 import keyring
81
82 self.keyring = keyring
83
84 def get_auth_info(self, url: str, username: str | None) -> AuthInfo | None:
85 # Support keyring's get_credential interface which supports getting
86 # credentials without a username. This is only available for
87 # keyring>=15.2.0.
88 if hasattr(self.keyring, "get_credential"):
89 logger.debug("Getting credentials from keyring for %s", url)
90 cred = self.keyring.get_credential(url, username)
91 if cred is not None:
92 return cred.username, cred.password
93 return None
94
95 if username is not None:
96 logger.debug("Getting password from keyring for %s", url)
97 password = self.keyring.get_password(url, username)
98 if password:
99 return username, password
100 return None
101
102 def save_auth_info(self, url: str, username: str, password: str) -> None:
103 self.keyring.set_password(url, username, password)
104
105
106class KeyRingCliProvider(KeyRingBaseProvider):
107 """Provider which uses `keyring` cli
108
109 Instead of calling the keyring package installed alongside pip
110 we call keyring on the command line which will enable pip to
111 use which ever installation of keyring is available first in
112 PATH.
113 """
114
115 has_keyring = True
116
117 def __init__(self, cmd: str) -> None:
118 self.keyring = cmd
119
120 def get_auth_info(self, url: str, username: str | None) -> AuthInfo | None:
121 # This is the default implementation of keyring.get_credential
122 # https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139
123 if username is not None:
124 password = self._get_password(url, username)
125 if password is not None:
126 return username, password
127 return None
128
129 def save_auth_info(self, url: str, username: str, password: str) -> None:
130 return self._set_password(url, username, password)
131
132 def _get_password(self, service_name: str, username: str) -> str | None:
133 """Mirror the implementation of keyring.get_password using cli"""
134 if self.keyring is None:
135 return None
136
137 cmd = [self.keyring, "get", service_name, username]
138 env = os.environ.copy()
139 env["PYTHONIOENCODING"] = "utf-8"
140 res = subprocess.run(
141 cmd,
142 stdin=subprocess.DEVNULL,
143 stdout=subprocess.PIPE,
144 env=env,
145 )
146 if res.returncode:
147 return None
148 return res.stdout.decode("utf-8").strip(os.linesep)
149
150 def _set_password(self, service_name: str, username: str, password: str) -> None:
151 """Mirror the implementation of keyring.set_password using cli"""
152 if self.keyring is None:
153 return None
154 env = os.environ.copy()
155 env["PYTHONIOENCODING"] = "utf-8"
156 subprocess.run(
157 [self.keyring, "set", service_name, username],
158 input=f"{password}{os.linesep}".encode(),
159 env=env,
160 check=True,
161 )
162 return None
163
164
165@cache
166def get_keyring_provider(provider: str) -> KeyRingBaseProvider:
167 logger.verbose("Keyring provider requested: %s", provider)
168
169 # keyring has previously failed and been disabled
170 if KEYRING_DISABLED:
171 provider = "disabled"
172 if provider in ["import", "auto"]:
173 try:
174 impl = KeyRingPythonProvider()
175 logger.verbose("Keyring provider set: import")
176 return impl
177 except ImportError:
178 pass
179 except Exception as exc:
180 # In the event of an unexpected exception
181 # we should warn the user
182 msg = "Installed copy of keyring fails with exception %s"
183 if provider == "auto":
184 msg = msg + ", trying to find a keyring executable as a fallback"
185 logger.warning(msg, exc, exc_info=logger.isEnabledFor(logging.DEBUG))
186 if provider in ["subprocess", "auto"]:
187 cli = shutil.which("keyring")
188 if cli and cli.startswith(sysconfig.get_path("scripts")):
189 # all code within this function is stolen from shutil.which implementation
190 @typing.no_type_check
191 def PATH_as_shutil_which_determines_it() -> str:
192 path = os.environ.get("PATH", None)
193 if path is None:
194 try:
195 path = os.confstr("CS_PATH")
196 except (AttributeError, ValueError):
197 # os.confstr() or CS_PATH is not available
198 path = os.defpath
199 # bpo-35755: Don't use os.defpath if the PATH environment variable is
200 # set to an empty string
201
202 return path
203
204 scripts = Path(sysconfig.get_path("scripts"))
205
206 paths = []
207 for path in PATH_as_shutil_which_determines_it().split(os.pathsep):
208 p = Path(path)
209 try:
210 if not p.samefile(scripts):
211 paths.append(path)
212 except FileNotFoundError:
213 pass
214
215 path = os.pathsep.join(paths)
216
217 cli = shutil.which("keyring", path=path)
218
219 if cli:
220 logger.verbose("Keyring provider set: subprocess with executable %s", cli)
221 return KeyRingCliProvider(cli)
222
223 logger.verbose("Keyring provider set: disabled")
224 return KeyRingNullProvider()
225
226
227class MultiDomainBasicAuth(AuthBase):
228 def __init__(
229 self,
230 prompting: bool = True,
231 index_urls: list[str] | None = None,
232 keyring_provider: str = "auto",
233 ) -> None:
234 self.prompting = prompting
235 self.index_urls = index_urls
236 self.keyring_provider = keyring_provider
237 self.passwords: dict[str, AuthInfo] = {}
238 # When the user is prompted to enter credentials and keyring is
239 # available, we will offer to save them. If the user accepts,
240 # this value is set to the credentials they entered. After the
241 # request authenticates, the caller should call
242 # ``save_credentials`` to save these.
243 self._credentials_to_save: Credentials | None = None
244
245 @property
246 def keyring_provider(self) -> KeyRingBaseProvider:
247 return get_keyring_provider(self._keyring_provider)
248
249 @keyring_provider.setter
250 def keyring_provider(self, provider: str) -> None:
251 # The free function get_keyring_provider has been decorated with
252 # functools.cache. If an exception occurs in get_keyring_auth that
253 # cache will be cleared and keyring disabled, take that into account
254 # if you want to remove this indirection.
255 self._keyring_provider = provider
256
257 @property
258 def use_keyring(self) -> bool:
259 # We won't use keyring when --no-input is passed unless
260 # a specific provider is requested because it might require
261 # user interaction
262 return self.prompting or self._keyring_provider not in ["auto", "disabled"]
263
264 def _get_keyring_auth(
265 self,
266 url: str | None,
267 username: str | None,
268 ) -> AuthInfo | None:
269 """Return the tuple auth for a given url from keyring."""
270 # Do nothing if no url was provided
271 if not url:
272 return None
273
274 try:
275 return self.keyring_provider.get_auth_info(url, username)
276 except Exception as exc:
277 # Log the full exception (with stacktrace) at debug, so it'll only
278 # show up when running in verbose mode.
279 logger.debug("Keyring is skipped due to an exception", exc_info=True)
280 # Always log a shortened version of the exception.
281 logger.warning(
282 "Keyring is skipped due to an exception: %s",
283 str(exc),
284 )
285 global KEYRING_DISABLED
286 KEYRING_DISABLED = True
287 get_keyring_provider.cache_clear()
288 return None
289
290 def _get_index_url(self, url: str) -> str | None:
291 """Return the original index URL matching the requested URL.
292
293 Cached or dynamically generated credentials may work against
294 the original index URL rather than just the netloc.
295
296 The provided url should have had its username and password
297 removed already. If the original index url had credentials then
298 they will be included in the return value.
299
300 Returns None if no matching index was found, or if --no-index
301 was specified by the user.
302 """
303 if not url or not self.index_urls:
304 return None
305
306 url = remove_auth_from_url(url).rstrip("/") + "/"
307 parsed_url = urllib.parse.urlsplit(url)
308
309 candidates = []
310
311 for index in self.index_urls:
312 index = index.rstrip("/") + "/"
313 parsed_index = urllib.parse.urlsplit(remove_auth_from_url(index))
314 if parsed_url == parsed_index:
315 return index
316
317 if parsed_url.netloc != parsed_index.netloc:
318 continue
319
320 candidate = urllib.parse.urlsplit(index)
321 candidates.append(candidate)
322
323 if not candidates:
324 return None
325
326 candidates.sort(
327 reverse=True,
328 key=lambda candidate: len(
329 commonpath(
330 [
331 parsed_url.path,
332 candidate.path,
333 ]
334 )
335 ),
336 )
337
338 return urllib.parse.urlunsplit(candidates[0])
339
340 def _get_new_credentials(
341 self,
342 original_url: str,
343 *,
344 allow_netrc: bool = True,
345 allow_keyring: bool = False,
346 ) -> AuthInfo:
347 """Find and return credentials for the specified URL."""
348 # Split the credentials and netloc from the url.
349 url, netloc, url_user_password = split_auth_netloc_from_url(
350 original_url,
351 )
352
353 # Start with the credentials embedded in the url
354 username, password = url_user_password
355 if username is not None and password is not None:
356 logger.debug("Found credentials in url for %s", netloc)
357 return url_user_password
358
359 # Find a matching index url for this request
360 index_url = self._get_index_url(url)
361 if index_url:
362 # Split the credentials from the url.
363 index_info = split_auth_netloc_from_url(index_url)
364 if index_info:
365 index_url, _, index_url_user_password = index_info
366 logger.debug("Found index url %s", index_url)
367
368 # If an index URL was found, try its embedded credentials
369 if index_url and index_url_user_password[0] is not None:
370 username, password = index_url_user_password
371 if username is not None and password is not None:
372 logger.debug("Found credentials in index url for %s", netloc)
373 return index_url_user_password
374
375 # Get creds from netrc if we still don't have them
376 if allow_netrc:
377 netrc_auth = get_netrc_auth(original_url)
378 if netrc_auth:
379 logger.debug("Found credentials in netrc for %s", netloc)
380 return netrc_auth
381
382 # If we don't have a password and keyring is available, use it.
383 if allow_keyring:
384 # The index url is more specific than the netloc, so try it first
385 # fmt: off
386 kr_auth = (
387 self._get_keyring_auth(index_url, username) or
388 self._get_keyring_auth(netloc, username)
389 )
390 # fmt: on
391 if kr_auth:
392 logger.debug("Found credentials in keyring for %s", netloc)
393 return kr_auth
394
395 return username, password
396
397 def _get_url_and_credentials(
398 self, original_url: str
399 ) -> tuple[str, str | None, str | None]:
400 """Return the credentials to use for the provided URL.
401
402 If allowed, netrc and keyring may be used to obtain the
403 correct credentials.
404
405 Returns (url_without_credentials, username, password). Note
406 that even if the original URL contains credentials, this
407 function may return a different username and password.
408 """
409 url, netloc, _ = split_auth_netloc_from_url(original_url)
410
411 # Try to get credentials from original url
412 username, password = self._get_new_credentials(original_url)
413
414 # If credentials not found, use any stored credentials for this netloc.
415 # Do this if either the username or the password is missing.
416 # This accounts for the situation in which the user has specified
417 # the username in the index url, but the password comes from keyring.
418 if (username is None or password is None) and netloc in self.passwords:
419 un, pw = self.passwords[netloc]
420 # It is possible that the cached credentials are for a different username,
421 # in which case the cache should be ignored.
422 if username is None or username == un:
423 username, password = un, pw
424
425 if username is not None or password is not None:
426 # Convert the username and password if they're None, so that
427 # this netloc will show up as "cached" in the conditional above.
428 # Further, HTTPBasicAuth doesn't accept None, so it makes sense to
429 # cache the value that is going to be used.
430 username = username or ""
431 password = password or ""
432
433 # Store any acquired credentials.
434 self.passwords[netloc] = (username, password)
435
436 assert (
437 # Credentials were found
438 (username is not None and password is not None)
439 # Credentials were not found
440 or (username is None and password is None)
441 ), f"Could not load credentials from url: {original_url}"
442
443 return url, username, password
444
445 def __call__(self, req: PreparedRequest) -> PreparedRequest:
446 # Get credentials for this request
447 assert req.url is not None
448 url, username, password = self._get_url_and_credentials(req.url)
449
450 # Set the url of the request to the url without any credentials
451 req.url = url
452
453 if username is not None and password is not None:
454 # Send the basic auth with this request
455 req = HTTPBasicAuth(username, password)(req)
456
457 # Attach a hook to handle 401 responses
458 req.register_hook("response", self.handle_401)
459
460 return req
461
462 # Factored out to allow for easy patching in tests
463 def _prompt_for_password(self, netloc: str) -> tuple[str | None, str | None, bool]:
464 username = ask_input(f"User for {netloc}: ") if self.prompting else None
465 if not username:
466 return None, None, False
467 if self.use_keyring:
468 auth = self._get_keyring_auth(netloc, username)
469 if auth and auth[0] is not None and auth[1] is not None:
470 return auth[0], auth[1], False
471 password = ask_password("Password: ")
472 return username, password, True
473
474 # Factored out to allow for easy patching in tests
475 def _should_save_password_to_keyring(self) -> bool:
476 if (
477 not self.prompting
478 or not self.use_keyring
479 or not self.keyring_provider.has_keyring
480 ):
481 return False
482 return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
483
484 def handle_401(self, resp: Response, **kwargs: Any) -> Response:
485 # We only care about 401 responses, anything else we want to just
486 # pass through the actual response
487 if resp.status_code != 401:
488 return resp
489
490 username, password = None, None
491
492 # Query the keyring for credentials:
493 if self.use_keyring:
494 username, password = self._get_new_credentials(
495 resp.url,
496 allow_netrc=False,
497 allow_keyring=True,
498 )
499
500 # We are not able to prompt the user so simply return the response
501 if not self.prompting and not username and not password:
502 return resp
503
504 parsed = urllib.parse.urlparse(resp.url)
505
506 # Prompt the user for a new username and password
507 save = False
508 if not username and not password:
509 username, password, save = self._prompt_for_password(parsed.netloc)
510
511 # Store the new username and password to use for future requests
512 self._credentials_to_save = None
513 if username is not None and password is not None:
514 self.passwords[parsed.netloc] = (username, password)
515
516 # Prompt to save the password to keyring
517 if save and self._should_save_password_to_keyring():
518 self._credentials_to_save = Credentials(
519 url=parsed.netloc,
520 username=username,
521 password=password,
522 )
523
524 # Consume content and release the original connection to allow our new
525 # request to reuse the same one.
526 # The result of the assignment isn't used, it's just needed to consume
527 # the content.
528 _ = resp.content
529 resp.raw.release_conn()
530
531 # Add our new username and password to the request
532 req = HTTPBasicAuth(username or "", password or "")(resp.request)
533 req.register_hook("response", self.warn_on_401)
534
535 # On successful request, save the credentials that were used to
536 # keyring. (Note that if the user responded "no" above, this member
537 # is not set and nothing will be saved.)
538 if self._credentials_to_save:
539 req.register_hook("response", self.save_credentials)
540
541 # Send our new request
542 new_resp = resp.connection.send(req, **kwargs)
543 new_resp.history.append(resp)
544
545 return new_resp
546
547 def warn_on_401(self, resp: Response, **kwargs: Any) -> None:
548 """Response callback to warn about incorrect credentials."""
549 if resp.status_code == 401:
550 logger.warning(
551 "401 Error, Credentials not correct for %s",
552 resp.request.url,
553 )
554
555 def save_credentials(self, resp: Response, **kwargs: Any) -> None:
556 """Response callback to save credentials on success."""
557 assert (
558 self.keyring_provider.has_keyring
559 ), "should never reach here without keyring"
560
561 creds = self._credentials_to_save
562 self._credentials_to_save = None
563 if creds and resp.status_code < 400:
564 try:
565 logger.info("Saving credentials to keyring")
566 self.keyring_provider.save_auth_info(
567 creds.url, creds.username, creds.password
568 )
569 except Exception:
570 logger.exception("Failed to save credentials")