1"""PipSession and supporting code, containing all pip-specific
2network request configuration and behavior.
3"""
4
5from __future__ import annotations
6
7import email.utils
8import functools
9import io
10import ipaddress
11import json
12import logging
13import mimetypes
14import os
15import platform
16import shutil
17import subprocess
18import sys
19import urllib.parse
20import warnings
21from collections.abc import Generator, Mapping, Sequence
22from typing import (
23 TYPE_CHECKING,
24 Any,
25 Optional,
26 Union,
27)
28
29from pip._vendor import requests, urllib3
30from pip._vendor.cachecontrol import CacheControlAdapter as _BaseCacheControlAdapter
31from pip._vendor.requests.adapters import DEFAULT_POOLBLOCK, BaseAdapter
32from pip._vendor.requests.adapters import HTTPAdapter as _BaseHTTPAdapter
33from pip._vendor.requests.models import PreparedRequest, Response
34from pip._vendor.requests.structures import CaseInsensitiveDict
35from pip._vendor.urllib3.connectionpool import ConnectionPool
36from pip._vendor.urllib3.exceptions import InsecureRequestWarning
37
38from pip import __version__
39from pip._internal.metadata import get_default_environment
40from pip._internal.models.link import Link
41from pip._internal.network.auth import MultiDomainBasicAuth
42from pip._internal.network.cache import SafeFileCache
43
44# Import ssl from compat so the initial import occurs in only one place.
45from pip._internal.utils.compat import has_tls
46from pip._internal.utils.glibc import libc_ver
47from pip._internal.utils.misc import build_url_from_netloc, parse_netloc
48from pip._internal.utils.urls import url_to_path
49
50if TYPE_CHECKING:
51 from ssl import SSLContext
52
53 from pip._vendor.urllib3 import ProxyManager
54 from pip._vendor.urllib3.poolmanager import PoolManager
55
56
57logger = logging.getLogger(__name__)
58
59SecureOrigin = tuple[str, str, Optional[Union[int, str]]]
60
61
62# Ignore warning raised when using --trusted-host.
63warnings.filterwarnings("ignore", category=InsecureRequestWarning)
64
65
66SECURE_ORIGINS: list[SecureOrigin] = [
67 # protocol, hostname, port
68 # Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
69 ("https", "*", "*"),
70 ("*", "localhost", "*"),
71 ("*", "127.0.0.0/8", "*"),
72 ("*", "::1/128", "*"),
73 ("file", "*", None),
74 # ssh is always secure.
75 ("ssh", "*", "*"),
76]
77
78
79# These are environment variables present when running under various
80# CI systems. For each variable, some CI systems that use the variable
81# are indicated. The collection was chosen so that for each of a number
82# of popular systems, at least one of the environment variables is used.
83# This list is used to provide some indication of and lower bound for
84# CI traffic to PyPI. Thus, it is okay if the list is not comprehensive.
85# For more background, see: https://github.com/pypa/pip/issues/5499
86CI_ENVIRONMENT_VARIABLES = (
87 # Azure Pipelines
88 "BUILD_BUILDID",
89 # Jenkins
90 "BUILD_ID",
91 # AppVeyor, CircleCI, Codeship, Gitlab CI, Shippable, Travis CI
92 "CI",
93 # Explicit environment variable.
94 "PIP_IS_CI",
95)
96
97
98def looks_like_ci() -> bool:
99 """
100 Return whether it looks like pip is running under CI.
101 """
102 # We don't use the method of checking for a tty (e.g. using isatty())
103 # because some CI systems mimic a tty (e.g. Travis CI). Thus that
104 # method doesn't provide definitive information in either direction.
105 return any(name in os.environ for name in CI_ENVIRONMENT_VARIABLES)
106
107
108@functools.lru_cache(maxsize=1)
109def user_agent() -> str:
110 """
111 Return a string representing the user agent.
112 """
113 data: dict[str, Any] = {
114 "installer": {"name": "pip", "version": __version__},
115 "python": platform.python_version(),
116 "implementation": {
117 "name": platform.python_implementation(),
118 },
119 }
120
121 if data["implementation"]["name"] == "CPython":
122 data["implementation"]["version"] = platform.python_version()
123 elif data["implementation"]["name"] == "PyPy":
124 pypy_version_info = sys.pypy_version_info # type: ignore
125 if pypy_version_info.releaselevel == "final":
126 pypy_version_info = pypy_version_info[:3]
127 data["implementation"]["version"] = ".".join(
128 [str(x) for x in pypy_version_info]
129 )
130 elif data["implementation"]["name"] == "Jython":
131 # Complete Guess
132 data["implementation"]["version"] = platform.python_version()
133 elif data["implementation"]["name"] == "IronPython":
134 # Complete Guess
135 data["implementation"]["version"] = platform.python_version()
136
137 if sys.platform.startswith("linux"):
138 from pip._vendor import distro
139
140 linux_distribution = distro.name(), distro.version(), distro.codename()
141 distro_infos: dict[str, Any] = dict(
142 filter(
143 lambda x: x[1],
144 zip(["name", "version", "id"], linux_distribution),
145 )
146 )
147 libc = dict(
148 filter(
149 lambda x: x[1],
150 zip(["lib", "version"], libc_ver()),
151 )
152 )
153 if libc:
154 distro_infos["libc"] = libc
155 if distro_infos:
156 data["distro"] = distro_infos
157
158 if sys.platform.startswith("darwin") and platform.mac_ver()[0]:
159 data["distro"] = {"name": "macOS", "version": platform.mac_ver()[0]}
160
161 if platform.system():
162 data.setdefault("system", {})["name"] = platform.system()
163
164 if platform.release():
165 data.setdefault("system", {})["release"] = platform.release()
166
167 if platform.machine():
168 data["cpu"] = platform.machine()
169
170 if has_tls():
171 import _ssl as ssl
172
173 data["openssl_version"] = ssl.OPENSSL_VERSION
174
175 setuptools_dist = get_default_environment().get_distribution("setuptools")
176 if setuptools_dist is not None:
177 data["setuptools_version"] = str(setuptools_dist.version)
178
179 if shutil.which("rustc") is not None:
180 # If for any reason `rustc --version` fails, silently ignore it
181 try:
182 rustc_output = subprocess.check_output(
183 ["rustc", "--version"], stderr=subprocess.STDOUT, timeout=0.5
184 )
185 except Exception:
186 pass
187 else:
188 if rustc_output.startswith(b"rustc "):
189 # The format of `rustc --version` is:
190 # `b'rustc 1.52.1 (9bc8c42bb 2021-05-09)\n'`
191 # We extract just the middle (1.52.1) part
192 data["rustc_version"] = rustc_output.split(b" ")[1].decode()
193
194 # Use None rather than False so as not to give the impression that
195 # pip knows it is not being run under CI. Rather, it is a null or
196 # inconclusive result. Also, we include some value rather than no
197 # value to make it easier to know that the check has been run.
198 data["ci"] = True if looks_like_ci() else None
199
200 user_data = os.environ.get("PIP_USER_AGENT_USER_DATA")
201 if user_data is not None:
202 data["user_data"] = user_data
203
204 return "{data[installer][name]}/{data[installer][version]} {json}".format(
205 data=data,
206 json=json.dumps(data, separators=(",", ":"), sort_keys=True),
207 )
208
209
210class LocalFSAdapter(BaseAdapter):
211 def send(
212 self,
213 request: PreparedRequest,
214 stream: bool = False,
215 timeout: float | tuple[float, float] | tuple[float, None] | None = None,
216 verify: bool | str = True,
217 cert: bytes | str | tuple[bytes | str, bytes | str] | None = None,
218 proxies: Mapping[str, str] | None = None,
219 ) -> Response:
220 assert request.url is not None
221 pathname = url_to_path(request.url)
222
223 resp = Response()
224 resp.status_code = 200
225 resp.url = request.url
226
227 try:
228 stats = os.stat(pathname)
229 except OSError as exc:
230 # format the exception raised as a io.BytesIO object,
231 # to return a better error message:
232 resp.status_code = 404
233 resp.reason = type(exc).__name__
234 resp.raw = io.BytesIO(f"{resp.reason}: {exc}".encode())
235 else:
236 modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
237 content_type = mimetypes.guess_type(pathname)[0] or "text/plain"
238 resp.headers = CaseInsensitiveDict(
239 {
240 "Content-Type": content_type,
241 "Content-Length": str(stats.st_size),
242 "Last-Modified": modified,
243 }
244 )
245
246 resp.raw = open(pathname, "rb")
247 resp.close = resp.raw.close # type: ignore[method-assign]
248
249 return resp
250
251 def close(self) -> None:
252 pass
253
254
255class _SSLContextAdapterMixin:
256 """Mixin to add the ``ssl_context`` constructor argument to HTTP adapters.
257
258 The additional argument is forwarded directly to the pool manager. This allows us
259 to dynamically decide what SSL store to use at runtime, which is used to implement
260 the optional ``truststore`` backend.
261 """
262
263 def __init__(
264 self,
265 *,
266 ssl_context: SSLContext | None = None,
267 **kwargs: Any,
268 ) -> None:
269 self._ssl_context = ssl_context
270 super().__init__(**kwargs)
271
272 def init_poolmanager(
273 self,
274 connections: int,
275 maxsize: int,
276 block: bool = DEFAULT_POOLBLOCK,
277 **pool_kwargs: Any,
278 ) -> PoolManager:
279 if self._ssl_context is not None:
280 pool_kwargs.setdefault("ssl_context", self._ssl_context)
281 return super().init_poolmanager( # type: ignore[misc, no-any-return]
282 connections=connections,
283 maxsize=maxsize,
284 block=block,
285 **pool_kwargs,
286 )
287
288 def proxy_manager_for(self, proxy: str, **proxy_kwargs: Any) -> ProxyManager:
289 # Proxy manager replaces the pool manager, so inject our SSL
290 # context here too. https://github.com/pypa/pip/issues/13288
291 if self._ssl_context is not None:
292 proxy_kwargs.setdefault("ssl_context", self._ssl_context)
293 return super().proxy_manager_for(proxy, **proxy_kwargs) # type: ignore[misc, no-any-return]
294
295
296class HTTPAdapter(_SSLContextAdapterMixin, _BaseHTTPAdapter):
297 pass
298
299
300class CacheControlAdapter(_SSLContextAdapterMixin, _BaseCacheControlAdapter):
301 pass
302
303
304class InsecureHTTPAdapter(HTTPAdapter):
305 def cert_verify(
306 self,
307 conn: ConnectionPool,
308 url: str,
309 verify: bool | str,
310 cert: str | tuple[str, str] | None,
311 ) -> None:
312 super().cert_verify(conn=conn, url=url, verify=False, cert=cert)
313
314
315class InsecureCacheControlAdapter(CacheControlAdapter):
316 def cert_verify(
317 self,
318 conn: ConnectionPool,
319 url: str,
320 verify: bool | str,
321 cert: str | tuple[str, str] | None,
322 ) -> None:
323 super().cert_verify(conn=conn, url=url, verify=False, cert=cert)
324
325
326class PipSession(requests.Session):
327 timeout: int | None = None
328
329 def __init__(
330 self,
331 *args: Any,
332 retries: int = 0,
333 resume_retries: int = 0,
334 cache: str | None = None,
335 trusted_hosts: Sequence[str] = (),
336 index_urls: list[str] | None = None,
337 ssl_context: SSLContext | None = None,
338 **kwargs: Any,
339 ) -> None:
340 """
341 :param trusted_hosts: Domains not to emit warnings for when not using
342 HTTPS.
343 """
344 super().__init__(*args, **kwargs)
345
346 # Namespace the attribute with "pip_" just in case to prevent
347 # possible conflicts with the base class.
348 self.pip_trusted_origins: list[tuple[str, int | None]] = []
349 self.pip_proxy = None
350
351 # Attach our User Agent to the request
352 self.headers["User-Agent"] = user_agent()
353
354 # Attach our Authentication handler to the session
355 self.auth: MultiDomainBasicAuth = MultiDomainBasicAuth(index_urls=index_urls)
356
357 # Create our urllib3.Retry instance which will allow us to customize
358 # how we handle retries.
359 retries = urllib3.Retry(
360 # Set the total number of retries that a particular request can
361 # have.
362 total=retries,
363 # A 503 error from PyPI typically means that the Fastly -> Origin
364 # connection got interrupted in some way. A 503 error in general
365 # is typically considered a transient error so we'll go ahead and
366 # retry it.
367 # A 500 may indicate transient error in Amazon S3
368 # A 502 may be a transient error from a CDN like CloudFlare or CloudFront
369 # A 520 or 527 - may indicate transient error in CloudFlare
370 status_forcelist=[500, 502, 503, 520, 527],
371 # Add a small amount of back off between failed requests in
372 # order to prevent hammering the service.
373 backoff_factor=0.25,
374 ) # type: ignore
375 self.resume_retries = resume_retries
376
377 # Our Insecure HTTPAdapter disables HTTPS validation. It does not
378 # support caching so we'll use it for all http:// URLs.
379 # If caching is disabled, we will also use it for
380 # https:// hosts that we've marked as ignoring
381 # TLS errors for (trusted-hosts).
382 insecure_adapter = InsecureHTTPAdapter(max_retries=retries)
383
384 # We want to _only_ cache responses on securely fetched origins or when
385 # the host is specified as trusted. We do this because
386 # we can't validate the response of an insecurely/untrusted fetched
387 # origin, and we don't want someone to be able to poison the cache and
388 # require manual eviction from the cache to fix it.
389 self._trusted_host_adapter: InsecureCacheControlAdapter | InsecureHTTPAdapter
390 if cache:
391 secure_adapter: _BaseHTTPAdapter = CacheControlAdapter(
392 cache=SafeFileCache(cache),
393 max_retries=retries,
394 ssl_context=ssl_context,
395 )
396 self._trusted_host_adapter = InsecureCacheControlAdapter(
397 cache=SafeFileCache(cache),
398 max_retries=retries,
399 )
400 else:
401 secure_adapter = HTTPAdapter(max_retries=retries, ssl_context=ssl_context)
402 self._trusted_host_adapter = insecure_adapter
403
404 self.mount("https://", secure_adapter)
405 self.mount("http://", insecure_adapter)
406
407 # Enable file:// urls
408 self.mount("file://", LocalFSAdapter())
409
410 for host in trusted_hosts:
411 self.add_trusted_host(host, suppress_logging=True)
412
413 def update_index_urls(self, new_index_urls: list[str]) -> None:
414 """
415 :param new_index_urls: New index urls to update the authentication
416 handler with.
417 """
418 self.auth.index_urls = new_index_urls
419
420 def add_trusted_host(
421 self, host: str, source: str | None = None, suppress_logging: bool = False
422 ) -> None:
423 """
424 :param host: It is okay to provide a host that has previously been
425 added.
426 :param source: An optional source string, for logging where the host
427 string came from.
428 """
429 if not suppress_logging:
430 msg = f"adding trusted host: {host!r}"
431 if source is not None:
432 msg += f" (from {source})"
433 logger.info(msg)
434
435 parsed_host, parsed_port = parse_netloc(host)
436 if parsed_host is None:
437 raise ValueError(f"Trusted host URL must include a host part: {host!r}")
438 if (parsed_host, parsed_port) not in self.pip_trusted_origins:
439 self.pip_trusted_origins.append((parsed_host, parsed_port))
440
441 self.mount(
442 build_url_from_netloc(host, scheme="http") + "/", self._trusted_host_adapter
443 )
444 self.mount(build_url_from_netloc(host) + "/", self._trusted_host_adapter)
445 if not parsed_port:
446 self.mount(
447 build_url_from_netloc(host, scheme="http") + ":",
448 self._trusted_host_adapter,
449 )
450 # Mount wildcard ports for the same host.
451 self.mount(build_url_from_netloc(host) + ":", self._trusted_host_adapter)
452
453 def iter_secure_origins(self) -> Generator[SecureOrigin, None, None]:
454 yield from SECURE_ORIGINS
455 for host, port in self.pip_trusted_origins:
456 yield ("*", host, "*" if port is None else port)
457
458 def is_secure_origin(self, location: Link) -> bool:
459 # Determine if this url used a secure transport mechanism
460 parsed = urllib.parse.urlparse(str(location))
461 origin_protocol, origin_host, origin_port = (
462 parsed.scheme,
463 parsed.hostname,
464 parsed.port,
465 )
466
467 # The protocol to use to see if the protocol matches.
468 # Don't count the repository type as part of the protocol: in
469 # cases such as "git+ssh", only use "ssh". (I.e., Only verify against
470 # the last scheme.)
471 origin_protocol = origin_protocol.rsplit("+", 1)[-1]
472
473 # Determine if our origin is a secure origin by looking through our
474 # hardcoded list of secure origins, as well as any additional ones
475 # configured on this PackageFinder instance.
476 for secure_origin in self.iter_secure_origins():
477 secure_protocol, secure_host, secure_port = secure_origin
478 if origin_protocol != secure_protocol and secure_protocol != "*":
479 continue
480
481 try:
482 addr = ipaddress.ip_address(origin_host or "")
483 network = ipaddress.ip_network(secure_host)
484 except ValueError:
485 # We don't have both a valid address or a valid network, so
486 # we'll check this origin against hostnames.
487 if (
488 origin_host
489 and origin_host.lower() != secure_host.lower()
490 and secure_host != "*"
491 ):
492 continue
493 else:
494 # We have a valid address and network, so see if the address
495 # is contained within the network.
496 if addr not in network:
497 continue
498
499 # Check to see if the port matches.
500 if (
501 origin_port != secure_port
502 and secure_port != "*"
503 and secure_port is not None
504 ):
505 continue
506
507 # If we've gotten here, then this origin matches the current
508 # secure origin and we should return True
509 return True
510
511 # If we've gotten to this point, then the origin isn't secure and we
512 # will not accept it as a valid location to search. We will however
513 # log a warning that we are ignoring it.
514 logger.warning(
515 "The repository located at %s is not a trusted or secure host and "
516 "is being ignored. If this repository is available via HTTPS we "
517 "recommend you use HTTPS instead, otherwise you may silence "
518 "this warning and allow it anyway with '--trusted-host %s'.",
519 origin_host,
520 origin_host,
521 )
522
523 return False
524
525 def request(self, method: str, url: str, *args: Any, **kwargs: Any) -> Response: # type: ignore[override]
526 # Allow setting a default timeout on a session
527 kwargs.setdefault("timeout", self.timeout)
528 # Allow setting a default proxies on a session
529 kwargs.setdefault("proxies", self.proxies)
530
531 # Dispatch the actual request
532 return super().request(method, url, *args, **kwargs)