Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/network/auth.py: 26%
278 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
1"""Network Authentication Helpers
3Contains interface (MultiDomainBasicAuth) and associated glue code for
4providing credentials in the context of network requests.
5"""
6import logging
7import os
8import shutil
9import subprocess
10import sysconfig
11import typing
12import urllib.parse
13from abc import ABC, abstractmethod
14from functools import lru_cache
15from os.path import commonprefix
16from pathlib import Path
17from typing import Any, Dict, List, NamedTuple, Optional, Tuple
19from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
20from pip._vendor.requests.models import Request, Response
21from pip._vendor.requests.utils import get_netrc_auth
23from pip._internal.utils.logging import getLogger
24from pip._internal.utils.misc import (
25 ask,
26 ask_input,
27 ask_password,
28 remove_auth_from_url,
29 split_auth_netloc_from_url,
30)
31from pip._internal.vcs.versioncontrol import AuthInfo
33logger = getLogger(__name__)
35KEYRING_DISABLED = False
38class Credentials(NamedTuple):
39 url: str
40 username: str
41 password: str
44class KeyRingBaseProvider(ABC):
45 """Keyring base provider interface"""
47 has_keyring: bool
49 @abstractmethod
50 def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
51 ...
53 @abstractmethod
54 def save_auth_info(self, url: str, username: str, password: str) -> None:
55 ...
58class KeyRingNullProvider(KeyRingBaseProvider):
59 """Keyring null provider"""
61 has_keyring = False
63 def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
64 return None
66 def save_auth_info(self, url: str, username: str, password: str) -> None:
67 return None
70class KeyRingPythonProvider(KeyRingBaseProvider):
71 """Keyring interface which uses locally imported `keyring`"""
73 has_keyring = True
75 def __init__(self) -> None:
76 import keyring
78 self.keyring = keyring
80 def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
81 # Support keyring's get_credential interface which supports getting
82 # credentials without a username. This is only available for
83 # keyring>=15.2.0.
84 if hasattr(self.keyring, "get_credential"):
85 logger.debug("Getting credentials from keyring for %s", url)
86 cred = self.keyring.get_credential(url, username)
87 if cred is not None:
88 return cred.username, cred.password
89 return None
91 if username is not None:
92 logger.debug("Getting password from keyring for %s", url)
93 password = self.keyring.get_password(url, username)
94 if password:
95 return username, password
96 return None
98 def save_auth_info(self, url: str, username: str, password: str) -> None:
99 self.keyring.set_password(url, username, password)
102class KeyRingCliProvider(KeyRingBaseProvider):
103 """Provider which uses `keyring` cli
105 Instead of calling the keyring package installed alongside pip
106 we call keyring on the command line which will enable pip to
107 use which ever installation of keyring is available first in
108 PATH.
109 """
111 has_keyring = True
113 def __init__(self, cmd: str) -> None:
114 self.keyring = cmd
116 def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
117 # This is the default implementation of keyring.get_credential
118 # https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139
119 if username is not None:
120 password = self._get_password(url, username)
121 if password is not None:
122 return username, password
123 return None
125 def save_auth_info(self, url: str, username: str, password: str) -> None:
126 return self._set_password(url, username, password)
128 def _get_password(self, service_name: str, username: str) -> Optional[str]:
129 """Mirror the implementation of keyring.get_password using cli"""
130 if self.keyring is None:
131 return None
133 cmd = [self.keyring, "get", service_name, username]
134 env = os.environ.copy()
135 env["PYTHONIOENCODING"] = "utf-8"
136 res = subprocess.run(
137 cmd,
138 stdin=subprocess.DEVNULL,
139 stdout=subprocess.PIPE,
140 env=env,
141 )
142 if res.returncode:
143 return None
144 return res.stdout.decode("utf-8").strip(os.linesep)
146 def _set_password(self, service_name: str, username: str, password: str) -> None:
147 """Mirror the implementation of keyring.set_password using cli"""
148 if self.keyring is None:
149 return None
150 env = os.environ.copy()
151 env["PYTHONIOENCODING"] = "utf-8"
152 subprocess.run(
153 [self.keyring, "set", service_name, username],
154 input=f"{password}{os.linesep}".encode("utf-8"),
155 env=env,
156 check=True,
157 )
158 return None
161@lru_cache(maxsize=None)
162def get_keyring_provider(provider: str) -> KeyRingBaseProvider:
163 logger.verbose("Keyring provider requested: %s", provider)
165 # keyring has previously failed and been disabled
166 if KEYRING_DISABLED:
167 provider = "disabled"
168 if provider in ["import", "auto"]:
169 try:
170 impl = KeyRingPythonProvider()
171 logger.verbose("Keyring provider set: import")
172 return impl
173 except ImportError:
174 pass
175 except Exception as exc:
176 # In the event of an unexpected exception
177 # we should warn the user
178 msg = "Installed copy of keyring fails with exception %s"
179 if provider == "auto":
180 msg = msg + ", trying to find a keyring executable as a fallback"
181 logger.warning(msg, exc, exc_info=logger.isEnabledFor(logging.DEBUG))
182 if provider in ["subprocess", "auto"]:
183 cli = shutil.which("keyring")
184 if cli and cli.startswith(sysconfig.get_path("scripts")):
185 # all code within this function is stolen from shutil.which implementation
186 @typing.no_type_check
187 def PATH_as_shutil_which_determines_it() -> str:
188 path = os.environ.get("PATH", None)
189 if path is None:
190 try:
191 path = os.confstr("CS_PATH")
192 except (AttributeError, ValueError):
193 # os.confstr() or CS_PATH is not available
194 path = os.defpath
195 # bpo-35755: Don't use os.defpath if the PATH environment variable is
196 # set to an empty string
198 return path
200 scripts = Path(sysconfig.get_path("scripts"))
202 paths = []
203 for path in PATH_as_shutil_which_determines_it().split(os.pathsep):
204 p = Path(path)
205 try:
206 if not p.samefile(scripts):
207 paths.append(path)
208 except FileNotFoundError:
209 pass
211 path = os.pathsep.join(paths)
213 cli = shutil.which("keyring", path=path)
215 if cli:
216 logger.verbose("Keyring provider set: subprocess with executable %s", cli)
217 return KeyRingCliProvider(cli)
219 logger.verbose("Keyring provider set: disabled")
220 return KeyRingNullProvider()
223class MultiDomainBasicAuth(AuthBase):
224 def __init__(
225 self,
226 prompting: bool = True,
227 index_urls: Optional[List[str]] = None,
228 keyring_provider: str = "auto",
229 ) -> None:
230 self.prompting = prompting
231 self.index_urls = index_urls
232 self.keyring_provider = keyring_provider # type: ignore[assignment]
233 self.passwords: Dict[str, AuthInfo] = {}
234 # When the user is prompted to enter credentials and keyring is
235 # available, we will offer to save them. If the user accepts,
236 # this value is set to the credentials they entered. After the
237 # request authenticates, the caller should call
238 # ``save_credentials`` to save these.
239 self._credentials_to_save: Optional[Credentials] = None
241 @property
242 def keyring_provider(self) -> KeyRingBaseProvider:
243 return get_keyring_provider(self._keyring_provider)
245 @keyring_provider.setter
246 def keyring_provider(self, provider: str) -> None:
247 # The free function get_keyring_provider has been decorated with
248 # functools.cache. If an exception occurs in get_keyring_auth that
249 # cache will be cleared and keyring disabled, take that into account
250 # if you want to remove this indirection.
251 self._keyring_provider = provider
253 @property
254 def use_keyring(self) -> bool:
255 # We won't use keyring when --no-input is passed unless
256 # a specific provider is requested because it might require
257 # user interaction
258 return self.prompting or self._keyring_provider not in ["auto", "disabled"]
260 def _get_keyring_auth(
261 self,
262 url: Optional[str],
263 username: Optional[str],
264 ) -> Optional[AuthInfo]:
265 """Return the tuple auth for a given url from keyring."""
266 # Do nothing if no url was provided
267 if not url:
268 return None
270 try:
271 return self.keyring_provider.get_auth_info(url, username)
272 except Exception as exc:
273 logger.warning(
274 "Keyring is skipped due to an exception: %s",
275 str(exc),
276 )
277 global KEYRING_DISABLED
278 KEYRING_DISABLED = True
279 get_keyring_provider.cache_clear()
280 return None
282 def _get_index_url(self, url: str) -> Optional[str]:
283 """Return the original index URL matching the requested URL.
285 Cached or dynamically generated credentials may work against
286 the original index URL rather than just the netloc.
288 The provided url should have had its username and password
289 removed already. If the original index url had credentials then
290 they will be included in the return value.
292 Returns None if no matching index was found, or if --no-index
293 was specified by the user.
294 """
295 if not url or not self.index_urls:
296 return None
298 url = remove_auth_from_url(url).rstrip("/") + "/"
299 parsed_url = urllib.parse.urlsplit(url)
301 candidates = []
303 for index in self.index_urls:
304 index = index.rstrip("/") + "/"
305 parsed_index = urllib.parse.urlsplit(remove_auth_from_url(index))
306 if parsed_url == parsed_index:
307 return index
309 if parsed_url.netloc != parsed_index.netloc:
310 continue
312 candidate = urllib.parse.urlsplit(index)
313 candidates.append(candidate)
315 if not candidates:
316 return None
318 candidates.sort(
319 reverse=True,
320 key=lambda candidate: commonprefix(
321 [
322 parsed_url.path,
323 candidate.path,
324 ]
325 ).rfind("/"),
326 )
328 return urllib.parse.urlunsplit(candidates[0])
330 def _get_new_credentials(
331 self,
332 original_url: str,
333 *,
334 allow_netrc: bool = True,
335 allow_keyring: bool = False,
336 ) -> AuthInfo:
337 """Find and return credentials for the specified URL."""
338 # Split the credentials and netloc from the url.
339 url, netloc, url_user_password = split_auth_netloc_from_url(
340 original_url,
341 )
343 # Start with the credentials embedded in the url
344 username, password = url_user_password
345 if username is not None and password is not None:
346 logger.debug("Found credentials in url for %s", netloc)
347 return url_user_password
349 # Find a matching index url for this request
350 index_url = self._get_index_url(url)
351 if index_url:
352 # Split the credentials from the url.
353 index_info = split_auth_netloc_from_url(index_url)
354 if index_info:
355 index_url, _, index_url_user_password = index_info
356 logger.debug("Found index url %s", index_url)
358 # If an index URL was found, try its embedded credentials
359 if index_url and index_url_user_password[0] is not None:
360 username, password = index_url_user_password
361 if username is not None and password is not None:
362 logger.debug("Found credentials in index url for %s", netloc)
363 return index_url_user_password
365 # Get creds from netrc if we still don't have them
366 if allow_netrc:
367 netrc_auth = get_netrc_auth(original_url)
368 if netrc_auth:
369 logger.debug("Found credentials in netrc for %s", netloc)
370 return netrc_auth
372 # If we don't have a password and keyring is available, use it.
373 if allow_keyring:
374 # The index url is more specific than the netloc, so try it first
375 # fmt: off
376 kr_auth = (
377 self._get_keyring_auth(index_url, username) or
378 self._get_keyring_auth(netloc, username)
379 )
380 # fmt: on
381 if kr_auth:
382 logger.debug("Found credentials in keyring for %s", netloc)
383 return kr_auth
385 return username, password
387 def _get_url_and_credentials(
388 self, original_url: str
389 ) -> Tuple[str, Optional[str], Optional[str]]:
390 """Return the credentials to use for the provided URL.
392 If allowed, netrc and keyring may be used to obtain the
393 correct credentials.
395 Returns (url_without_credentials, username, password). Note
396 that even if the original URL contains credentials, this
397 function may return a different username and password.
398 """
399 url, netloc, _ = split_auth_netloc_from_url(original_url)
401 # Try to get credentials from original url
402 username, password = self._get_new_credentials(original_url)
404 # If credentials not found, use any stored credentials for this netloc.
405 # Do this if either the username or the password is missing.
406 # This accounts for the situation in which the user has specified
407 # the username in the index url, but the password comes from keyring.
408 if (username is None or password is None) and netloc in self.passwords:
409 un, pw = self.passwords[netloc]
410 # It is possible that the cached credentials are for a different username,
411 # in which case the cache should be ignored.
412 if username is None or username == un:
413 username, password = un, pw
415 if username is not None or password is not None:
416 # Convert the username and password if they're None, so that
417 # this netloc will show up as "cached" in the conditional above.
418 # Further, HTTPBasicAuth doesn't accept None, so it makes sense to
419 # cache the value that is going to be used.
420 username = username or ""
421 password = password or ""
423 # Store any acquired credentials.
424 self.passwords[netloc] = (username, password)
426 assert (
427 # Credentials were found
428 (username is not None and password is not None)
429 # Credentials were not found
430 or (username is None and password is None)
431 ), f"Could not load credentials from url: {original_url}"
433 return url, username, password
435 def __call__(self, req: Request) -> Request:
436 # Get credentials for this request
437 url, username, password = self._get_url_and_credentials(req.url)
439 # Set the url of the request to the url without any credentials
440 req.url = url
442 if username is not None and password is not None:
443 # Send the basic auth with this request
444 req = HTTPBasicAuth(username, password)(req)
446 # Attach a hook to handle 401 responses
447 req.register_hook("response", self.handle_401)
449 return req
451 # Factored out to allow for easy patching in tests
452 def _prompt_for_password(
453 self, netloc: str
454 ) -> Tuple[Optional[str], Optional[str], bool]:
455 username = ask_input(f"User for {netloc}: ") if self.prompting else None
456 if not username:
457 return None, None, False
458 if self.use_keyring:
459 auth = self._get_keyring_auth(netloc, username)
460 if auth and auth[0] is not None and auth[1] is not None:
461 return auth[0], auth[1], False
462 password = ask_password("Password: ")
463 return username, password, True
465 # Factored out to allow for easy patching in tests
466 def _should_save_password_to_keyring(self) -> bool:
467 if (
468 not self.prompting
469 or not self.use_keyring
470 or not self.keyring_provider.has_keyring
471 ):
472 return False
473 return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
475 def handle_401(self, resp: Response, **kwargs: Any) -> Response:
476 # We only care about 401 responses, anything else we want to just
477 # pass through the actual response
478 if resp.status_code != 401:
479 return resp
481 username, password = None, None
483 # Query the keyring for credentials:
484 if self.use_keyring:
485 username, password = self._get_new_credentials(
486 resp.url,
487 allow_netrc=False,
488 allow_keyring=True,
489 )
491 # We are not able to prompt the user so simply return the response
492 if not self.prompting and not username and not password:
493 return resp
495 parsed = urllib.parse.urlparse(resp.url)
497 # Prompt the user for a new username and password
498 save = False
499 if not username and not password:
500 username, password, save = self._prompt_for_password(parsed.netloc)
502 # Store the new username and password to use for future requests
503 self._credentials_to_save = None
504 if username is not None and password is not None:
505 self.passwords[parsed.netloc] = (username, password)
507 # Prompt to save the password to keyring
508 if save and self._should_save_password_to_keyring():
509 self._credentials_to_save = Credentials(
510 url=parsed.netloc,
511 username=username,
512 password=password,
513 )
515 # Consume content and release the original connection to allow our new
516 # request to reuse the same one.
517 # The result of the assignment isn't used, it's just needed to consume
518 # the content.
519 _ = resp.content
520 resp.raw.release_conn()
522 # Add our new username and password to the request
523 req = HTTPBasicAuth(username or "", password or "")(resp.request)
524 req.register_hook("response", self.warn_on_401)
526 # On successful request, save the credentials that were used to
527 # keyring. (Note that if the user responded "no" above, this member
528 # is not set and nothing will be saved.)
529 if self._credentials_to_save:
530 req.register_hook("response", self.save_credentials)
532 # Send our new request
533 new_resp = resp.connection.send(req, **kwargs)
534 new_resp.history.append(resp)
536 return new_resp
538 def warn_on_401(self, resp: Response, **kwargs: Any) -> None:
539 """Response callback to warn about incorrect credentials."""
540 if resp.status_code == 401:
541 logger.warning(
542 "401 Error, Credentials not correct for %s",
543 resp.request.url,
544 )
546 def save_credentials(self, resp: Response, **kwargs: Any) -> None:
547 """Response callback to save credentials on success."""
548 assert (
549 self.keyring_provider.has_keyring
550 ), "should never reach here without keyring"
552 creds = self._credentials_to_save
553 self._credentials_to_save = None
554 if creds and resp.status_code < 400:
555 try:
556 logger.info("Saving credentials to keyring")
557 self.keyring_provider.save_auth_info(
558 creds.url, creds.username, creds.password
559 )
560 except Exception:
561 logger.exception("Failed to save credentials")