1"""PipSession and supporting code, containing all pip-specific
2network request configuration and behavior.
3"""
4
5from __future__ import annotations
6
7import email.utils
8import functools
9import io
10import ipaddress
11import json
12import logging
13import mimetypes
14import os
15import platform
16import shutil
17import subprocess
18import sys
19import urllib.parse
20import warnings
21from collections.abc import Generator, Mapping, Sequence
22from typing import (
23 TYPE_CHECKING,
24 Any,
25)
26
27from pip._vendor import requests, urllib3
28from pip._vendor.cachecontrol import CacheControlAdapter as _BaseCacheControlAdapter
29from pip._vendor.requests.adapters import DEFAULT_POOLBLOCK, BaseAdapter
30from pip._vendor.requests.adapters import HTTPAdapter as _BaseHTTPAdapter
31from pip._vendor.requests.models import PreparedRequest, Response
32from pip._vendor.requests.structures import CaseInsensitiveDict
33from pip._vendor.urllib3.connectionpool import ConnectionPool
34from pip._vendor.urllib3.exceptions import InsecureRequestWarning
35
36from pip import __version__
37from pip._internal.metadata import get_default_environment
38from pip._internal.models.link import Link
39from pip._internal.network.auth import MultiDomainBasicAuth
40from pip._internal.network.cache import SafeFileCache
41
42# Import ssl from compat so the initial import occurs in only one place.
43from pip._internal.utils.compat import has_tls
44from pip._internal.utils.glibc import libc_ver
45from pip._internal.utils.misc import build_url_from_netloc, parse_netloc
46from pip._internal.utils.urls import url_to_path
47
48if TYPE_CHECKING:
49 from ssl import SSLContext
50
51 from pip._vendor.urllib3 import ProxyManager
52
53
54logger = logging.getLogger(__name__)
55
56SecureOrigin = tuple[str, str, int | str | None]
57
58
59# Ignore warning raised when using --trusted-host.
60warnings.filterwarnings("ignore", category=InsecureRequestWarning)
61
62
63SECURE_ORIGINS: list[SecureOrigin] = [
64 # protocol, hostname, port
65 # Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
66 ("https", "*", "*"),
67 ("*", "localhost", "*"),
68 ("*", "127.0.0.0/8", "*"),
69 ("*", "::1/128", "*"),
70 ("file", "*", None),
71 # ssh is always secure.
72 ("ssh", "*", "*"),
73]
74
75
76# These are environment variables present when running under various
77# CI systems. For each variable, some CI systems that use the variable
78# are indicated. The collection was chosen so that for each of a number
79# of popular systems, at least one of the environment variables is used.
80# This list is used to provide some indication of and lower bound for
81# CI traffic to PyPI. Thus, it is okay if the list is not comprehensive.
82# For more background, see: https://github.com/pypa/pip/issues/5499
83CI_ENVIRONMENT_VARIABLES = (
84 # Azure Pipelines
85 "BUILD_BUILDID",
86 # Jenkins
87 "BUILD_ID",
88 # AppVeyor, CircleCI, Codeship, Gitlab CI, Shippable, Travis CI
89 "CI",
90 # Explicit environment variable.
91 "PIP_IS_CI",
92)
93
94
95def looks_like_ci() -> bool:
96 """
97 Return whether it looks like pip is running under CI.
98 """
99 # We don't use the method of checking for a tty (e.g. using isatty())
100 # because some CI systems mimic a tty (e.g. Travis CI). Thus that
101 # method doesn't provide definitive information in either direction.
102 return any(name in os.environ for name in CI_ENVIRONMENT_VARIABLES)
103
104
105@functools.lru_cache(maxsize=1)
106def user_agent() -> str:
107 """
108 Return a string representing the user agent.
109 """
110 data: dict[str, Any] = {
111 "installer": {"name": "pip", "version": __version__},
112 "python": platform.python_version(),
113 "implementation": {
114 "name": platform.python_implementation(),
115 },
116 }
117
118 if data["implementation"]["name"] == "CPython":
119 data["implementation"]["version"] = platform.python_version()
120 elif data["implementation"]["name"] == "PyPy":
121 pypy_version_info = sys.pypy_version_info # type: ignore
122 if pypy_version_info.releaselevel == "final":
123 pypy_version_info = pypy_version_info[:3]
124 data["implementation"]["version"] = ".".join(
125 [str(x) for x in pypy_version_info]
126 )
127 elif data["implementation"]["name"] == "Jython":
128 # Complete Guess
129 data["implementation"]["version"] = platform.python_version()
130 elif data["implementation"]["name"] == "IronPython":
131 # Complete Guess
132 data["implementation"]["version"] = platform.python_version()
133
134 if sys.platform.startswith("linux"):
135 from pip._vendor import distro
136
137 linux_distribution = distro.name(), distro.version(), distro.codename()
138 distro_infos: dict[str, Any] = dict(
139 filter(
140 lambda x: x[1],
141 zip(["name", "version", "id"], linux_distribution),
142 )
143 )
144 libc = dict(
145 filter(
146 lambda x: x[1],
147 zip(["lib", "version"], libc_ver()),
148 )
149 )
150 if libc:
151 distro_infos["libc"] = libc
152 if distro_infos:
153 data["distro"] = distro_infos
154
155 if sys.platform.startswith("darwin") and platform.mac_ver()[0]:
156 data["distro"] = {"name": "macOS", "version": platform.mac_ver()[0]}
157
158 if platform.system():
159 data.setdefault("system", {})["name"] = platform.system()
160
161 if platform.release():
162 data.setdefault("system", {})["release"] = platform.release()
163
164 if platform.machine():
165 data["cpu"] = platform.machine()
166
167 if has_tls():
168 import _ssl as ssl
169
170 data["openssl_version"] = ssl.OPENSSL_VERSION
171
172 setuptools_dist = get_default_environment().get_distribution("setuptools")
173 if setuptools_dist is not None:
174 data["setuptools_version"] = str(setuptools_dist.version)
175
176 if shutil.which("rustc") is not None:
177 # If for any reason `rustc --version` fails, silently ignore it
178 try:
179 rustc_output = subprocess.check_output(
180 ["rustc", "--version"], stderr=subprocess.STDOUT, timeout=0.5
181 )
182 except Exception:
183 pass
184 else:
185 if rustc_output.startswith(b"rustc "):
186 # The format of `rustc --version` is:
187 # `b'rustc 1.52.1 (9bc8c42bb 2021-05-09)\n'`
188 # We extract just the middle (1.52.1) part
189 data["rustc_version"] = rustc_output.split(b" ")[1].decode()
190
191 # Use None rather than False so as not to give the impression that
192 # pip knows it is not being run under CI. Rather, it is a null or
193 # inconclusive result. Also, we include some value rather than no
194 # value to make it easier to know that the check has been run.
195 data["ci"] = True if looks_like_ci() else None
196
197 user_data = os.environ.get("PIP_USER_AGENT_USER_DATA")
198 if user_data is not None:
199 data["user_data"] = user_data
200
201 return "{data[installer][name]}/{data[installer][version]} {json}".format(
202 data=data,
203 json=json.dumps(data, separators=(",", ":"), sort_keys=True),
204 )
205
206
207class LocalFSAdapter(BaseAdapter):
208 def send(
209 self,
210 request: PreparedRequest,
211 stream: bool = False,
212 timeout: float | tuple[float | None, float | None] | None = None,
213 verify: bool | str = True,
214 cert: bytes | str | tuple[bytes | str, bytes | str] | None = None,
215 proxies: Mapping[str, str] | None = None,
216 ) -> Response:
217 assert request.url is not None
218 pathname = url_to_path(request.url)
219
220 resp = Response()
221 resp.status_code = 200
222 resp.url = request.url
223
224 try:
225 stats = os.stat(pathname)
226 except OSError as exc:
227 # format the exception raised as a io.BytesIO object,
228 # to return a better error message:
229 resp.status_code = 404
230 resp.reason = type(exc).__name__
231 resp.raw = io.BytesIO(f"{resp.reason}: {exc}".encode())
232 else:
233 modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
234 content_type = mimetypes.guess_type(pathname)[0] or "text/plain"
235 resp.headers = CaseInsensitiveDict(
236 {
237 "Content-Type": content_type,
238 "Content-Length": str(stats.st_size),
239 "Last-Modified": modified,
240 }
241 )
242
243 resp.raw = open(pathname, "rb")
244 resp.close = resp.raw.close # type: ignore[method-assign]
245
246 return resp
247
248 def close(self) -> None:
249 pass
250
251
252class _SSLContextAdapterMixin:
253 """Mixin to add the ``ssl_context`` constructor argument to HTTP adapters.
254
255 The additional argument is forwarded directly to the pool manager. This allows us
256 to dynamically decide what SSL store to use at runtime, which is used to implement
257 the optional ``truststore`` backend.
258 """
259
260 def __init__(
261 self,
262 *,
263 ssl_context: SSLContext | None = None,
264 **kwargs: Any,
265 ) -> None:
266 self._ssl_context = ssl_context
267 super().__init__(**kwargs)
268
269 def init_poolmanager(
270 self,
271 connections: int,
272 maxsize: int,
273 block: bool = DEFAULT_POOLBLOCK,
274 **pool_kwargs: Any,
275 ) -> None:
276 if self._ssl_context is not None:
277 pool_kwargs.setdefault("ssl_context", self._ssl_context)
278 super().init_poolmanager( # type: ignore[misc]
279 connections=connections,
280 maxsize=maxsize,
281 block=block,
282 **pool_kwargs,
283 )
284
285 def proxy_manager_for(self, proxy: str, **proxy_kwargs: Any) -> ProxyManager:
286 # Proxy manager replaces the pool manager, so inject our SSL
287 # context here too. https://github.com/pypa/pip/issues/13288
288 if self._ssl_context is not None:
289 proxy_kwargs.setdefault("ssl_context", self._ssl_context)
290 return super().proxy_manager_for(proxy, **proxy_kwargs) # type: ignore[misc]
291
292
293class HTTPAdapter(_SSLContextAdapterMixin, _BaseHTTPAdapter):
294 pass
295
296
297class CacheControlAdapter(_SSLContextAdapterMixin, _BaseCacheControlAdapter):
298 pass
299
300
301class InsecureHTTPAdapter(HTTPAdapter):
302 def cert_verify(
303 self,
304 conn: ConnectionPool,
305 url: str,
306 verify: bool | str,
307 cert: str | tuple[str, str] | None,
308 ) -> None:
309 super().cert_verify(conn=conn, url=url, verify=False, cert=cert)
310
311
312class InsecureCacheControlAdapter(CacheControlAdapter):
313 def cert_verify(
314 self,
315 conn: ConnectionPool,
316 url: str,
317 verify: bool | str,
318 cert: str | tuple[str, str] | None,
319 ) -> None:
320 super().cert_verify(conn=conn, url=url, verify=False, cert=cert)
321
322
323class PipSession(requests.Session):
324 timeout: int | None = None
325
326 def __init__(
327 self,
328 *args: Any,
329 retries: int = 0,
330 resume_retries: int = 0,
331 cache: str | None = None,
332 trusted_hosts: Sequence[str] = (),
333 index_urls: list[str] | None = None,
334 ssl_context: SSLContext | None = None,
335 **kwargs: Any,
336 ) -> None:
337 """
338 :param trusted_hosts: Domains not to emit warnings for when not using
339 HTTPS.
340 """
341 super().__init__(*args, **kwargs)
342
343 # Namespace the attribute with "pip_" just in case to prevent
344 # possible conflicts with the base class.
345 self.pip_trusted_origins: list[tuple[str, int | None]] = []
346 self.pip_proxy = None
347
348 # Attach our User Agent to the request
349 self.headers["User-Agent"] = user_agent()
350
351 # Pin Accept-Encoding so it doesn't vary with zstd availability (Python
352 # 3.14+ or backports.zstd); a varying value misses the cache for "Vary:
353 # Accept-Encoding" responses shared across interpreters (pypa/pip#13979).
354 self.headers["Accept-Encoding"] = "gzip, deflate"
355
356 # Attach our Authentication handler to the session
357 self.auth: MultiDomainBasicAuth = MultiDomainBasicAuth(index_urls=index_urls)
358
359 # Create our urllib3.Retry instance which will allow us to customize
360 # how we handle retries.
361 retries = urllib3.Retry(
362 # Set the total number of retries that a particular request can
363 # have.
364 total=retries,
365 # A 503 error from PyPI typically means that the Fastly -> Origin
366 # connection got interrupted in some way. A 503 error in general
367 # is typically considered a transient error so we'll go ahead and
368 # retry it.
369 # A 500 may indicate transient error in Amazon S3
370 # A 502 may be a transient error from a CDN like CloudFlare or CloudFront
371 # A 520 or 527 - may indicate transient error in CloudFlare
372 status_forcelist=[500, 502, 503, 520, 527],
373 # Add a small amount of back off between failed requests in
374 # order to prevent hammering the service.
375 backoff_factor=0.25,
376 ) # type: ignore
377 self.resume_retries = resume_retries
378
379 # Our Insecure HTTPAdapter disables HTTPS validation. It does not
380 # support caching so we'll use it for all http:// URLs.
381 # If caching is disabled, we will also use it for
382 # https:// hosts that we've marked as ignoring
383 # TLS errors for (trusted-hosts).
384 insecure_adapter = InsecureHTTPAdapter(max_retries=retries)
385
386 # We want to _only_ cache responses on securely fetched origins or when
387 # the host is specified as trusted. We do this because
388 # we can't validate the response of an insecurely/untrusted fetched
389 # origin, and we don't want someone to be able to poison the cache and
390 # require manual eviction from the cache to fix it.
391 self._trusted_host_adapter: InsecureCacheControlAdapter | InsecureHTTPAdapter
392 if cache:
393 secure_adapter: _BaseHTTPAdapter = CacheControlAdapter(
394 cache=SafeFileCache(cache),
395 max_retries=retries,
396 ssl_context=ssl_context,
397 )
398 self._trusted_host_adapter = InsecureCacheControlAdapter(
399 cache=SafeFileCache(cache),
400 max_retries=retries,
401 )
402 else:
403 secure_adapter = HTTPAdapter(max_retries=retries, ssl_context=ssl_context)
404 self._trusted_host_adapter = insecure_adapter
405
406 self.mount("https://", secure_adapter)
407 self.mount("http://", insecure_adapter)
408
409 # Enable file:// urls
410 self.mount("file://", LocalFSAdapter())
411
412 for host in trusted_hosts:
413 self.add_trusted_host(host, suppress_logging=True)
414
415 def update_index_urls(self, new_index_urls: list[str]) -> None:
416 """
417 :param new_index_urls: New index urls to update the authentication
418 handler with.
419 """
420 self.auth.index_urls = new_index_urls
421
422 def add_trusted_host(
423 self, host: str, source: str | None = None, suppress_logging: bool = False
424 ) -> None:
425 """
426 :param host: It is okay to provide a host that has previously been
427 added.
428 :param source: An optional source string, for logging where the host
429 string came from.
430 """
431 if not suppress_logging:
432 msg = f"adding trusted host: {host!r}"
433 if source is not None:
434 msg += f" (from {source})"
435 logger.info(msg)
436
437 parsed_host, parsed_port = parse_netloc(host)
438 if parsed_host is None:
439 raise ValueError(f"Trusted host URL must include a host part: {host!r}")
440 if (parsed_host, parsed_port) not in self.pip_trusted_origins:
441 self.pip_trusted_origins.append((parsed_host, parsed_port))
442
443 self.mount(
444 build_url_from_netloc(host, scheme="http") + "/", self._trusted_host_adapter
445 )
446 self.mount(build_url_from_netloc(host) + "/", self._trusted_host_adapter)
447 if not parsed_port:
448 self.mount(
449 build_url_from_netloc(host, scheme="http") + ":",
450 self._trusted_host_adapter,
451 )
452 # Mount wildcard ports for the same host.
453 self.mount(build_url_from_netloc(host) + ":", self._trusted_host_adapter)
454
455 def iter_secure_origins(self) -> Generator[SecureOrigin, None, None]:
456 yield from SECURE_ORIGINS
457 for host, port in self.pip_trusted_origins:
458 yield ("*", host, "*" if port is None else port)
459
460 def is_secure_origin(self, location: Link) -> bool:
461 # Determine if this url used a secure transport mechanism
462 parsed = urllib.parse.urlparse(str(location))
463 origin_protocol, origin_host, origin_port = (
464 parsed.scheme,
465 parsed.hostname,
466 parsed.port,
467 )
468
469 # The protocol to use to see if the protocol matches.
470 # Don't count the repository type as part of the protocol: in
471 # cases such as "git+ssh", only use "ssh". (I.e., Only verify against
472 # the last scheme.)
473 origin_protocol = origin_protocol.rsplit("+", 1)[-1]
474
475 # Determine if our origin is a secure origin by looking through our
476 # hardcoded list of secure origins, as well as any additional ones
477 # configured on this PackageFinder instance.
478 for secure_origin in self.iter_secure_origins():
479 secure_protocol, secure_host, secure_port = secure_origin
480 if origin_protocol != secure_protocol and secure_protocol != "*":
481 continue
482
483 try:
484 addr = ipaddress.ip_address(origin_host or "")
485 network = ipaddress.ip_network(secure_host)
486 except ValueError:
487 # We don't have both a valid address or a valid network, so
488 # we'll check this origin against hostnames.
489 if (
490 origin_host
491 and origin_host.lower() != secure_host.lower()
492 and secure_host != "*"
493 ):
494 continue
495 else:
496 # We have a valid address and network, so see if the address
497 # is contained within the network.
498 if addr not in network:
499 continue
500
501 # Check to see if the port matches.
502 if (
503 origin_port != secure_port
504 and secure_port != "*"
505 and secure_port is not None
506 ):
507 continue
508
509 # If we've gotten here, then this origin matches the current
510 # secure origin and we should return True
511 return True
512
513 # If we've gotten to this point, then the origin isn't secure and we
514 # will not accept it as a valid location to search. We will however
515 # log a warning that we are ignoring it.
516 logger.warning(
517 "The repository located at %s is not a trusted or secure host and "
518 "is being ignored. If this repository is available via HTTPS we "
519 "recommend you use HTTPS instead, otherwise you may silence "
520 "this warning and allow it anyway with '--trusted-host %s'.",
521 origin_host,
522 origin_host,
523 )
524
525 return False
526
527 def request(self, method: str, url: str, *args: Any, **kwargs: Any) -> Response: # type: ignore[override]
528 # Allow setting a default timeout on a session
529 kwargs.setdefault("timeout", self.timeout)
530 # Allow setting a default proxies on a session
531 kwargs.setdefault("proxies", self.proxies)
532
533 # Dispatch the actual request
534 return super().request(method, url, *args, **kwargs)