1"""Network Authentication Helpers
2
3Contains interface (MultiDomainBasicAuth) and associated glue code for
4providing credentials in the context of network requests.
5"""
6
7from __future__ import annotations
8
9import logging
10import os
11import shutil
12import subprocess
13import sysconfig
14import typing
15import urllib.parse
16from abc import ABC, abstractmethod
17from functools import cache
18from os.path import commonprefix
19from pathlib import Path
20from typing import Any, NamedTuple
21
22from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
23from pip._vendor.requests.utils import get_netrc_auth
24
25from pip._internal.utils.logging import getLogger
26from pip._internal.utils.misc import (
27 ask,
28 ask_input,
29 ask_password,
30 remove_auth_from_url,
31 split_auth_netloc_from_url,
32)
33from pip._internal.vcs.versioncontrol import AuthInfo
34
35if typing.TYPE_CHECKING:
36 from pip._vendor.requests import PreparedRequest
37 from pip._vendor.requests.models import Response
38
39logger = getLogger(__name__)
40
41KEYRING_DISABLED = False
42
43
44class Credentials(NamedTuple):
45 url: str
46 username: str
47 password: str
48
49
50class KeyRingBaseProvider(ABC):
51 """Keyring base provider interface"""
52
53 has_keyring: bool
54
55 @abstractmethod
56 def get_auth_info(self, url: str, username: str | None) -> AuthInfo | None: ...
57
58 @abstractmethod
59 def save_auth_info(self, url: str, username: str, password: str) -> None: ...
60
61
62class KeyRingNullProvider(KeyRingBaseProvider):
63 """Keyring null provider"""
64
65 has_keyring = False
66
67 def get_auth_info(self, url: str, username: str | None) -> AuthInfo | None:
68 return None
69
70 def save_auth_info(self, url: str, username: str, password: str) -> None:
71 return None
72
73
74class KeyRingPythonProvider(KeyRingBaseProvider):
75 """Keyring interface which uses locally imported `keyring`"""
76
77 has_keyring = True
78
79 def __init__(self) -> None:
80 import keyring
81
82 self.keyring = keyring
83
84 def get_auth_info(self, url: str, username: str | None) -> AuthInfo | None:
85 # Support keyring's get_credential interface which supports getting
86 # credentials without a username. This is only available for
87 # keyring>=15.2.0.
88 if hasattr(self.keyring, "get_credential"):
89 logger.debug("Getting credentials from keyring for %s", url)
90 cred = self.keyring.get_credential(url, username)
91 if cred is not None:
92 return cred.username, cred.password
93 return None
94
95 if username is not None:
96 logger.debug("Getting password from keyring for %s", url)
97 password = self.keyring.get_password(url, username)
98 if password:
99 return username, password
100 return None
101
102 def save_auth_info(self, url: str, username: str, password: str) -> None:
103 self.keyring.set_password(url, username, password)
104
105
106class KeyRingCliProvider(KeyRingBaseProvider):
107 """Provider which uses `keyring` cli
108
109 Instead of calling the keyring package installed alongside pip
110 we call keyring on the command line which will enable pip to
111 use which ever installation of keyring is available first in
112 PATH.
113 """
114
115 has_keyring = True
116
117 def __init__(self, cmd: str) -> None:
118 self.keyring = cmd
119
120 def get_auth_info(self, url: str, username: str | None) -> AuthInfo | None:
121 # This is the default implementation of keyring.get_credential
122 # https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139
123 if username is not None:
124 password = self._get_password(url, username)
125 if password is not None:
126 return username, password
127 return None
128
129 def save_auth_info(self, url: str, username: str, password: str) -> None:
130 return self._set_password(url, username, password)
131
132 def _get_password(self, service_name: str, username: str) -> str | None:
133 """Mirror the implementation of keyring.get_password using cli"""
134 if self.keyring is None:
135 return None
136
137 cmd = [self.keyring, "get", service_name, username]
138 env = os.environ.copy()
139 env["PYTHONIOENCODING"] = "utf-8"
140 res = subprocess.run(
141 cmd,
142 stdin=subprocess.DEVNULL,
143 stdout=subprocess.PIPE,
144 env=env,
145 )
146 if res.returncode:
147 return None
148 return res.stdout.decode("utf-8").strip(os.linesep)
149
150 def _set_password(self, service_name: str, username: str, password: str) -> None:
151 """Mirror the implementation of keyring.set_password using cli"""
152 if self.keyring is None:
153 return None
154 env = os.environ.copy()
155 env["PYTHONIOENCODING"] = "utf-8"
156 subprocess.run(
157 [self.keyring, "set", service_name, username],
158 input=f"{password}{os.linesep}".encode(),
159 env=env,
160 check=True,
161 )
162 return None
163
164
165@cache
166def get_keyring_provider(provider: str) -> KeyRingBaseProvider:
167 logger.verbose("Keyring provider requested: %s", provider)
168
169 # keyring has previously failed and been disabled
170 if KEYRING_DISABLED:
171 provider = "disabled"
172 if provider in ["import", "auto"]:
173 try:
174 impl = KeyRingPythonProvider()
175 logger.verbose("Keyring provider set: import")
176 return impl
177 except ImportError:
178 pass
179 except Exception as exc:
180 # In the event of an unexpected exception
181 # we should warn the user
182 msg = "Installed copy of keyring fails with exception %s"
183 if provider == "auto":
184 msg = msg + ", trying to find a keyring executable as a fallback"
185 logger.warning(msg, exc, exc_info=logger.isEnabledFor(logging.DEBUG))
186 if provider in ["subprocess", "auto"]:
187 cli = shutil.which("keyring")
188 if cli and cli.startswith(sysconfig.get_path("scripts")):
189 # all code within this function is stolen from shutil.which implementation
190 @typing.no_type_check
191 def PATH_as_shutil_which_determines_it() -> str:
192 path = os.environ.get("PATH", None)
193 if path is None:
194 try:
195 path = os.confstr("CS_PATH")
196 except (AttributeError, ValueError):
197 # os.confstr() or CS_PATH is not available
198 path = os.defpath
199 # bpo-35755: Don't use os.defpath if the PATH environment variable is
200 # set to an empty string
201
202 return path
203
204 scripts = Path(sysconfig.get_path("scripts"))
205
206 paths = []
207 for path in PATH_as_shutil_which_determines_it().split(os.pathsep):
208 p = Path(path)
209 try:
210 if not p.samefile(scripts):
211 paths.append(path)
212 except FileNotFoundError:
213 pass
214
215 path = os.pathsep.join(paths)
216
217 cli = shutil.which("keyring", path=path)
218
219 if cli:
220 logger.verbose("Keyring provider set: subprocess with executable %s", cli)
221 return KeyRingCliProvider(cli)
222
223 logger.verbose("Keyring provider set: disabled")
224 return KeyRingNullProvider()
225
226
227class MultiDomainBasicAuth(AuthBase):
228 def __init__(
229 self,
230 prompting: bool = True,
231 index_urls: list[str] | None = None,
232 keyring_provider: str = "auto",
233 ) -> None:
234 self.prompting = prompting
235 self.index_urls = index_urls
236 self.keyring_provider = keyring_provider
237 self.passwords: dict[str, AuthInfo] = {}
238 # When the user is prompted to enter credentials and keyring is
239 # available, we will offer to save them. If the user accepts,
240 # this value is set to the credentials they entered. After the
241 # request authenticates, the caller should call
242 # ``save_credentials`` to save these.
243 self._credentials_to_save: Credentials | None = None
244
245 @property
246 def keyring_provider(self) -> KeyRingBaseProvider:
247 return get_keyring_provider(self._keyring_provider)
248
249 @keyring_provider.setter
250 def keyring_provider(self, provider: str) -> None:
251 # The free function get_keyring_provider has been decorated with
252 # functools.cache. If an exception occurs in get_keyring_auth that
253 # cache will be cleared and keyring disabled, take that into account
254 # if you want to remove this indirection.
255 self._keyring_provider = provider
256
257 @property
258 def use_keyring(self) -> bool:
259 # We won't use keyring when --no-input is passed unless
260 # a specific provider is requested because it might require
261 # user interaction
262 return self.prompting or self._keyring_provider not in ["auto", "disabled"]
263
264 def _get_keyring_auth(
265 self,
266 url: str | None,
267 username: str | None,
268 ) -> AuthInfo | None:
269 """Return the tuple auth for a given url from keyring."""
270 # Do nothing if no url was provided
271 if not url:
272 return None
273
274 try:
275 return self.keyring_provider.get_auth_info(url, username)
276 except Exception as exc:
277 # Log the full exception (with stacktrace) at debug, so it'll only
278 # show up when running in verbose mode.
279 logger.debug("Keyring is skipped due to an exception", exc_info=True)
280 # Always log a shortened version of the exception.
281 logger.warning(
282 "Keyring is skipped due to an exception: %s",
283 str(exc),
284 )
285 global KEYRING_DISABLED
286 KEYRING_DISABLED = True
287 get_keyring_provider.cache_clear()
288 return None
289
290 def _get_index_url(self, url: str) -> str | None:
291 """Return the original index URL matching the requested URL.
292
293 Cached or dynamically generated credentials may work against
294 the original index URL rather than just the netloc.
295
296 The provided url should have had its username and password
297 removed already. If the original index url had credentials then
298 they will be included in the return value.
299
300 Returns None if no matching index was found, or if --no-index
301 was specified by the user.
302 """
303 if not url or not self.index_urls:
304 return None
305
306 url = remove_auth_from_url(url).rstrip("/") + "/"
307 parsed_url = urllib.parse.urlsplit(url)
308
309 candidates = []
310
311 for index in self.index_urls:
312 index = index.rstrip("/") + "/"
313 parsed_index = urllib.parse.urlsplit(remove_auth_from_url(index))
314 if parsed_url == parsed_index:
315 return index
316
317 if parsed_url.netloc != parsed_index.netloc:
318 continue
319
320 candidate = urllib.parse.urlsplit(index)
321 candidates.append(candidate)
322
323 if not candidates:
324 return None
325
326 candidates.sort(
327 reverse=True,
328 key=lambda candidate: commonprefix(
329 [
330 parsed_url.path,
331 candidate.path,
332 ]
333 ).rfind("/"),
334 )
335
336 return urllib.parse.urlunsplit(candidates[0])
337
338 def _get_new_credentials(
339 self,
340 original_url: str,
341 *,
342 allow_netrc: bool = True,
343 allow_keyring: bool = False,
344 ) -> AuthInfo:
345 """Find and return credentials for the specified URL."""
346 # Split the credentials and netloc from the url.
347 url, netloc, url_user_password = split_auth_netloc_from_url(
348 original_url,
349 )
350
351 # Start with the credentials embedded in the url
352 username, password = url_user_password
353 if username is not None and password is not None:
354 logger.debug("Found credentials in url for %s", netloc)
355 return url_user_password
356
357 # Find a matching index url for this request
358 index_url = self._get_index_url(url)
359 if index_url:
360 # Split the credentials from the url.
361 index_info = split_auth_netloc_from_url(index_url)
362 if index_info:
363 index_url, _, index_url_user_password = index_info
364 logger.debug("Found index url %s", index_url)
365
366 # If an index URL was found, try its embedded credentials
367 if index_url and index_url_user_password[0] is not None:
368 username, password = index_url_user_password
369 if username is not None and password is not None:
370 logger.debug("Found credentials in index url for %s", netloc)
371 return index_url_user_password
372
373 # Get creds from netrc if we still don't have them
374 if allow_netrc:
375 netrc_auth = get_netrc_auth(original_url)
376 if netrc_auth:
377 logger.debug("Found credentials in netrc for %s", netloc)
378 return netrc_auth
379
380 # If we don't have a password and keyring is available, use it.
381 if allow_keyring:
382 # The index url is more specific than the netloc, so try it first
383 # fmt: off
384 kr_auth = (
385 self._get_keyring_auth(index_url, username) or
386 self._get_keyring_auth(netloc, username)
387 )
388 # fmt: on
389 if kr_auth:
390 logger.debug("Found credentials in keyring for %s", netloc)
391 return kr_auth
392
393 return username, password
394
395 def _get_url_and_credentials(
396 self, original_url: str
397 ) -> tuple[str, str | None, str | None]:
398 """Return the credentials to use for the provided URL.
399
400 If allowed, netrc and keyring may be used to obtain the
401 correct credentials.
402
403 Returns (url_without_credentials, username, password). Note
404 that even if the original URL contains credentials, this
405 function may return a different username and password.
406 """
407 url, netloc, _ = split_auth_netloc_from_url(original_url)
408
409 # Try to get credentials from original url
410 username, password = self._get_new_credentials(original_url)
411
412 # If credentials not found, use any stored credentials for this netloc.
413 # Do this if either the username or the password is missing.
414 # This accounts for the situation in which the user has specified
415 # the username in the index url, but the password comes from keyring.
416 if (username is None or password is None) and netloc in self.passwords:
417 un, pw = self.passwords[netloc]
418 # It is possible that the cached credentials are for a different username,
419 # in which case the cache should be ignored.
420 if username is None or username == un:
421 username, password = un, pw
422
423 if username is not None or password is not None:
424 # Convert the username and password if they're None, so that
425 # this netloc will show up as "cached" in the conditional above.
426 # Further, HTTPBasicAuth doesn't accept None, so it makes sense to
427 # cache the value that is going to be used.
428 username = username or ""
429 password = password or ""
430
431 # Store any acquired credentials.
432 self.passwords[netloc] = (username, password)
433
434 assert (
435 # Credentials were found
436 (username is not None and password is not None)
437 # Credentials were not found
438 or (username is None and password is None)
439 ), f"Could not load credentials from url: {original_url}"
440
441 return url, username, password
442
443 def __call__(self, req: PreparedRequest) -> PreparedRequest:
444 # Get credentials for this request
445 assert req.url is not None
446 url, username, password = self._get_url_and_credentials(req.url)
447
448 # Set the url of the request to the url without any credentials
449 req.url = url
450
451 if username is not None and password is not None:
452 # Send the basic auth with this request
453 req = HTTPBasicAuth(username, password)(req)
454
455 # Attach a hook to handle 401 responses
456 req.register_hook("response", self.handle_401)
457
458 return req
459
460 # Factored out to allow for easy patching in tests
461 def _prompt_for_password(self, netloc: str) -> tuple[str | None, str | None, bool]:
462 username = ask_input(f"User for {netloc}: ") if self.prompting else None
463 if not username:
464 return None, None, False
465 if self.use_keyring:
466 auth = self._get_keyring_auth(netloc, username)
467 if auth and auth[0] is not None and auth[1] is not None:
468 return auth[0], auth[1], False
469 password = ask_password("Password: ")
470 return username, password, True
471
472 # Factored out to allow for easy patching in tests
473 def _should_save_password_to_keyring(self) -> bool:
474 if (
475 not self.prompting
476 or not self.use_keyring
477 or not self.keyring_provider.has_keyring
478 ):
479 return False
480 return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
481
482 def handle_401(self, resp: Response, **kwargs: Any) -> Response:
483 # We only care about 401 responses, anything else we want to just
484 # pass through the actual response
485 if resp.status_code != 401:
486 return resp
487
488 username, password = None, None
489
490 # Query the keyring for credentials:
491 if self.use_keyring:
492 username, password = self._get_new_credentials(
493 resp.url,
494 allow_netrc=False,
495 allow_keyring=True,
496 )
497
498 # We are not able to prompt the user so simply return the response
499 if not self.prompting and not username and not password:
500 return resp
501
502 parsed = urllib.parse.urlparse(resp.url)
503
504 # Prompt the user for a new username and password
505 save = False
506 if not username and not password:
507 username, password, save = self._prompt_for_password(parsed.netloc)
508
509 # Store the new username and password to use for future requests
510 self._credentials_to_save = None
511 if username is not None and password is not None:
512 self.passwords[parsed.netloc] = (username, password)
513
514 # Prompt to save the password to keyring
515 if save and self._should_save_password_to_keyring():
516 self._credentials_to_save = Credentials(
517 url=parsed.netloc,
518 username=username,
519 password=password,
520 )
521
522 # Consume content and release the original connection to allow our new
523 # request to reuse the same one.
524 # The result of the assignment isn't used, it's just needed to consume
525 # the content.
526 _ = resp.content
527 resp.raw.release_conn()
528
529 # Add our new username and password to the request
530 req = HTTPBasicAuth(username or "", password or "")(resp.request)
531 req.register_hook("response", self.warn_on_401)
532
533 # On successful request, save the credentials that were used to
534 # keyring. (Note that if the user responded "no" above, this member
535 # is not set and nothing will be saved.)
536 if self._credentials_to_save:
537 req.register_hook("response", self.save_credentials)
538
539 # Send our new request
540 new_resp = resp.connection.send(req, **kwargs)
541 new_resp.history.append(resp)
542
543 return new_resp
544
545 def warn_on_401(self, resp: Response, **kwargs: Any) -> None:
546 """Response callback to warn about incorrect credentials."""
547 if resp.status_code == 401:
548 logger.warning(
549 "401 Error, Credentials not correct for %s",
550 resp.request.url,
551 )
552
553 def save_credentials(self, resp: Response, **kwargs: Any) -> None:
554 """Response callback to save credentials on success."""
555 assert (
556 self.keyring_provider.has_keyring
557 ), "should never reach here without keyring"
558
559 creds = self._credentials_to_save
560 self._credentials_to_save = None
561 if creds and resp.status_code < 400:
562 try:
563 logger.info("Saving credentials to keyring")
564 self.keyring_provider.save_auth_info(
565 creds.url, creds.username, creds.password
566 )
567 except Exception:
568 logger.exception("Failed to save credentials")