Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/network/session.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

207 statements  

1"""PipSession and supporting code, containing all pip-specific 

2network request configuration and behavior. 

3""" 

4 

5from __future__ import annotations 

6 

7import email.utils 

8import functools 

9import io 

10import ipaddress 

11import json 

12import logging 

13import mimetypes 

14import os 

15import platform 

16import shutil 

17import subprocess 

18import sys 

19import urllib.parse 

20import warnings 

21from collections.abc import Generator, Mapping, Sequence 

22from typing import ( 

23 TYPE_CHECKING, 

24 Any, 

25 Optional, 

26 Union, 

27) 

28 

29from pip._vendor import requests, urllib3 

30from pip._vendor.cachecontrol import CacheControlAdapter as _BaseCacheControlAdapter 

31from pip._vendor.requests.adapters import DEFAULT_POOLBLOCK, BaseAdapter 

32from pip._vendor.requests.adapters import HTTPAdapter as _BaseHTTPAdapter 

33from pip._vendor.requests.models import PreparedRequest, Response 

34from pip._vendor.requests.structures import CaseInsensitiveDict 

35from pip._vendor.urllib3.connectionpool import ConnectionPool 

36from pip._vendor.urllib3.exceptions import InsecureRequestWarning 

37 

38from pip import __version__ 

39from pip._internal.metadata import get_default_environment 

40from pip._internal.models.link import Link 

41from pip._internal.network.auth import MultiDomainBasicAuth 

42from pip._internal.network.cache import SafeFileCache 

43 

44# Import ssl from compat so the initial import occurs in only one place. 

45from pip._internal.utils.compat import has_tls 

46from pip._internal.utils.glibc import libc_ver 

47from pip._internal.utils.misc import build_url_from_netloc, parse_netloc 

48from pip._internal.utils.urls import url_to_path 

49 

50if TYPE_CHECKING: 

51 from ssl import SSLContext 

52 

53 from pip._vendor.urllib3 import ProxyManager 

54 from pip._vendor.urllib3.poolmanager import PoolManager 

55 

56 

57logger = logging.getLogger(__name__) 

58 

59SecureOrigin = tuple[str, str, Optional[Union[int, str]]] 

60 

61 

62# Ignore warning raised when using --trusted-host. 

63warnings.filterwarnings("ignore", category=InsecureRequestWarning) 

64 

65 

66SECURE_ORIGINS: list[SecureOrigin] = [ 

67 # protocol, hostname, port 

68 # Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC) 

69 ("https", "*", "*"), 

70 ("*", "localhost", "*"), 

71 ("*", "127.0.0.0/8", "*"), 

72 ("*", "::1/128", "*"), 

73 ("file", "*", None), 

74 # ssh is always secure. 

75 ("ssh", "*", "*"), 

76] 

77 

78 

79# These are environment variables present when running under various 

80# CI systems. For each variable, some CI systems that use the variable 

81# are indicated. The collection was chosen so that for each of a number 

82# of popular systems, at least one of the environment variables is used. 

83# This list is used to provide some indication of and lower bound for 

84# CI traffic to PyPI. Thus, it is okay if the list is not comprehensive. 

85# For more background, see: https://github.com/pypa/pip/issues/5499 

86CI_ENVIRONMENT_VARIABLES = ( 

87 # Azure Pipelines 

88 "BUILD_BUILDID", 

89 # Jenkins 

90 "BUILD_ID", 

91 # AppVeyor, CircleCI, Codeship, Gitlab CI, Shippable, Travis CI 

92 "CI", 

93 # Explicit environment variable. 

94 "PIP_IS_CI", 

95) 

96 

97 

98def looks_like_ci() -> bool: 

99 """ 

100 Return whether it looks like pip is running under CI. 

101 """ 

102 # We don't use the method of checking for a tty (e.g. using isatty()) 

103 # because some CI systems mimic a tty (e.g. Travis CI). Thus that 

104 # method doesn't provide definitive information in either direction. 

105 return any(name in os.environ for name in CI_ENVIRONMENT_VARIABLES) 

106 

107 

108@functools.lru_cache(maxsize=1) 

109def user_agent() -> str: 

110 """ 

111 Return a string representing the user agent. 

112 """ 

113 data: dict[str, Any] = { 

114 "installer": {"name": "pip", "version": __version__}, 

115 "python": platform.python_version(), 

116 "implementation": { 

117 "name": platform.python_implementation(), 

118 }, 

119 } 

120 

121 if data["implementation"]["name"] == "CPython": 

122 data["implementation"]["version"] = platform.python_version() 

123 elif data["implementation"]["name"] == "PyPy": 

124 pypy_version_info = sys.pypy_version_info # type: ignore 

125 if pypy_version_info.releaselevel == "final": 

126 pypy_version_info = pypy_version_info[:3] 

127 data["implementation"]["version"] = ".".join( 

128 [str(x) for x in pypy_version_info] 

129 ) 

130 elif data["implementation"]["name"] == "Jython": 

131 # Complete Guess 

132 data["implementation"]["version"] = platform.python_version() 

133 elif data["implementation"]["name"] == "IronPython": 

134 # Complete Guess 

135 data["implementation"]["version"] = platform.python_version() 

136 

137 if sys.platform.startswith("linux"): 

138 from pip._vendor import distro 

139 

140 linux_distribution = distro.name(), distro.version(), distro.codename() 

141 distro_infos: dict[str, Any] = dict( 

142 filter( 

143 lambda x: x[1], 

144 zip(["name", "version", "id"], linux_distribution), 

145 ) 

146 ) 

147 libc = dict( 

148 filter( 

149 lambda x: x[1], 

150 zip(["lib", "version"], libc_ver()), 

151 ) 

152 ) 

153 if libc: 

154 distro_infos["libc"] = libc 

155 if distro_infos: 

156 data["distro"] = distro_infos 

157 

158 if sys.platform.startswith("darwin") and platform.mac_ver()[0]: 

159 data["distro"] = {"name": "macOS", "version": platform.mac_ver()[0]} 

160 

161 if platform.system(): 

162 data.setdefault("system", {})["name"] = platform.system() 

163 

164 if platform.release(): 

165 data.setdefault("system", {})["release"] = platform.release() 

166 

167 if platform.machine(): 

168 data["cpu"] = platform.machine() 

169 

170 if has_tls(): 

171 import _ssl as ssl 

172 

173 data["openssl_version"] = ssl.OPENSSL_VERSION 

174 

175 setuptools_dist = get_default_environment().get_distribution("setuptools") 

176 if setuptools_dist is not None: 

177 data["setuptools_version"] = str(setuptools_dist.version) 

178 

179 if shutil.which("rustc") is not None: 

180 # If for any reason `rustc --version` fails, silently ignore it 

181 try: 

182 rustc_output = subprocess.check_output( 

183 ["rustc", "--version"], stderr=subprocess.STDOUT, timeout=0.5 

184 ) 

185 except Exception: 

186 pass 

187 else: 

188 if rustc_output.startswith(b"rustc "): 

189 # The format of `rustc --version` is: 

190 # `b'rustc 1.52.1 (9bc8c42bb 2021-05-09)\n'` 

191 # We extract just the middle (1.52.1) part 

192 data["rustc_version"] = rustc_output.split(b" ")[1].decode() 

193 

194 # Use None rather than False so as not to give the impression that 

195 # pip knows it is not being run under CI. Rather, it is a null or 

196 # inconclusive result. Also, we include some value rather than no 

197 # value to make it easier to know that the check has been run. 

198 data["ci"] = True if looks_like_ci() else None 

199 

200 user_data = os.environ.get("PIP_USER_AGENT_USER_DATA") 

201 if user_data is not None: 

202 data["user_data"] = user_data 

203 

204 return "{data[installer][name]}/{data[installer][version]} {json}".format( 

205 data=data, 

206 json=json.dumps(data, separators=(",", ":"), sort_keys=True), 

207 ) 

208 

209 

210class LocalFSAdapter(BaseAdapter): 

211 def send( 

212 self, 

213 request: PreparedRequest, 

214 stream: bool = False, 

215 timeout: float | tuple[float, float] | tuple[float, None] | None = None, 

216 verify: bool | str = True, 

217 cert: bytes | str | tuple[bytes | str, bytes | str] | None = None, 

218 proxies: Mapping[str, str] | None = None, 

219 ) -> Response: 

220 assert request.url is not None 

221 pathname = url_to_path(request.url) 

222 

223 resp = Response() 

224 resp.status_code = 200 

225 resp.url = request.url 

226 

227 try: 

228 stats = os.stat(pathname) 

229 except OSError as exc: 

230 # format the exception raised as a io.BytesIO object, 

231 # to return a better error message: 

232 resp.status_code = 404 

233 resp.reason = type(exc).__name__ 

234 resp.raw = io.BytesIO(f"{resp.reason}: {exc}".encode()) 

235 else: 

236 modified = email.utils.formatdate(stats.st_mtime, usegmt=True) 

237 content_type = mimetypes.guess_type(pathname)[0] or "text/plain" 

238 resp.headers = CaseInsensitiveDict( 

239 { 

240 "Content-Type": content_type, 

241 "Content-Length": str(stats.st_size), 

242 "Last-Modified": modified, 

243 } 

244 ) 

245 

246 resp.raw = open(pathname, "rb") 

247 resp.close = resp.raw.close # type: ignore[method-assign] 

248 

249 return resp 

250 

251 def close(self) -> None: 

252 pass 

253 

254 

255class _SSLContextAdapterMixin: 

256 """Mixin to add the ``ssl_context`` constructor argument to HTTP adapters. 

257 

258 The additional argument is forwarded directly to the pool manager. This allows us 

259 to dynamically decide what SSL store to use at runtime, which is used to implement 

260 the optional ``truststore`` backend. 

261 """ 

262 

263 def __init__( 

264 self, 

265 *, 

266 ssl_context: SSLContext | None = None, 

267 **kwargs: Any, 

268 ) -> None: 

269 self._ssl_context = ssl_context 

270 super().__init__(**kwargs) 

271 

272 def init_poolmanager( 

273 self, 

274 connections: int, 

275 maxsize: int, 

276 block: bool = DEFAULT_POOLBLOCK, 

277 **pool_kwargs: Any, 

278 ) -> PoolManager: 

279 if self._ssl_context is not None: 

280 pool_kwargs.setdefault("ssl_context", self._ssl_context) 

281 return super().init_poolmanager( # type: ignore[misc, no-any-return] 

282 connections=connections, 

283 maxsize=maxsize, 

284 block=block, 

285 **pool_kwargs, 

286 ) 

287 

288 def proxy_manager_for(self, proxy: str, **proxy_kwargs: Any) -> ProxyManager: 

289 # Proxy manager replaces the pool manager, so inject our SSL 

290 # context here too. https://github.com/pypa/pip/issues/13288 

291 if self._ssl_context is not None: 

292 proxy_kwargs.setdefault("ssl_context", self._ssl_context) 

293 return super().proxy_manager_for(proxy, **proxy_kwargs) # type: ignore[misc, no-any-return] 

294 

295 

296class HTTPAdapter(_SSLContextAdapterMixin, _BaseHTTPAdapter): 

297 pass 

298 

299 

300class CacheControlAdapter(_SSLContextAdapterMixin, _BaseCacheControlAdapter): 

301 pass 

302 

303 

304class InsecureHTTPAdapter(HTTPAdapter): 

305 def cert_verify( 

306 self, 

307 conn: ConnectionPool, 

308 url: str, 

309 verify: bool | str, 

310 cert: str | tuple[str, str] | None, 

311 ) -> None: 

312 super().cert_verify(conn=conn, url=url, verify=False, cert=cert) 

313 

314 

315class InsecureCacheControlAdapter(CacheControlAdapter): 

316 def cert_verify( 

317 self, 

318 conn: ConnectionPool, 

319 url: str, 

320 verify: bool | str, 

321 cert: str | tuple[str, str] | None, 

322 ) -> None: 

323 super().cert_verify(conn=conn, url=url, verify=False, cert=cert) 

324 

325 

326class PipSession(requests.Session): 

327 timeout: int | None = None 

328 

329 def __init__( 

330 self, 

331 *args: Any, 

332 retries: int = 0, 

333 resume_retries: int = 0, 

334 cache: str | None = None, 

335 trusted_hosts: Sequence[str] = (), 

336 index_urls: list[str] | None = None, 

337 ssl_context: SSLContext | None = None, 

338 **kwargs: Any, 

339 ) -> None: 

340 """ 

341 :param trusted_hosts: Domains not to emit warnings for when not using 

342 HTTPS. 

343 """ 

344 super().__init__(*args, **kwargs) 

345 

346 # Namespace the attribute with "pip_" just in case to prevent 

347 # possible conflicts with the base class. 

348 self.pip_trusted_origins: list[tuple[str, int | None]] = [] 

349 self.pip_proxy = None 

350 

351 # Attach our User Agent to the request 

352 self.headers["User-Agent"] = user_agent() 

353 

354 # Attach our Authentication handler to the session 

355 self.auth: MultiDomainBasicAuth = MultiDomainBasicAuth(index_urls=index_urls) 

356 

357 # Create our urllib3.Retry instance which will allow us to customize 

358 # how we handle retries. 

359 retries = urllib3.Retry( 

360 # Set the total number of retries that a particular request can 

361 # have. 

362 total=retries, 

363 # A 503 error from PyPI typically means that the Fastly -> Origin 

364 # connection got interrupted in some way. A 503 error in general 

365 # is typically considered a transient error so we'll go ahead and 

366 # retry it. 

367 # A 500 may indicate transient error in Amazon S3 

368 # A 502 may be a transient error from a CDN like CloudFlare or CloudFront 

369 # A 520 or 527 - may indicate transient error in CloudFlare 

370 status_forcelist=[500, 502, 503, 520, 527], 

371 # Add a small amount of back off between failed requests in 

372 # order to prevent hammering the service. 

373 backoff_factor=0.25, 

374 ) # type: ignore 

375 self.resume_retries = resume_retries 

376 

377 # Our Insecure HTTPAdapter disables HTTPS validation. It does not 

378 # support caching so we'll use it for all http:// URLs. 

379 # If caching is disabled, we will also use it for 

380 # https:// hosts that we've marked as ignoring 

381 # TLS errors for (trusted-hosts). 

382 insecure_adapter = InsecureHTTPAdapter(max_retries=retries) 

383 

384 # We want to _only_ cache responses on securely fetched origins or when 

385 # the host is specified as trusted. We do this because 

386 # we can't validate the response of an insecurely/untrusted fetched 

387 # origin, and we don't want someone to be able to poison the cache and 

388 # require manual eviction from the cache to fix it. 

389 self._trusted_host_adapter: InsecureCacheControlAdapter | InsecureHTTPAdapter 

390 if cache: 

391 secure_adapter: _BaseHTTPAdapter = CacheControlAdapter( 

392 cache=SafeFileCache(cache), 

393 max_retries=retries, 

394 ssl_context=ssl_context, 

395 ) 

396 self._trusted_host_adapter = InsecureCacheControlAdapter( 

397 cache=SafeFileCache(cache), 

398 max_retries=retries, 

399 ) 

400 else: 

401 secure_adapter = HTTPAdapter(max_retries=retries, ssl_context=ssl_context) 

402 self._trusted_host_adapter = insecure_adapter 

403 

404 self.mount("https://", secure_adapter) 

405 self.mount("http://", insecure_adapter) 

406 

407 # Enable file:// urls 

408 self.mount("file://", LocalFSAdapter()) 

409 

410 for host in trusted_hosts: 

411 self.add_trusted_host(host, suppress_logging=True) 

412 

413 def update_index_urls(self, new_index_urls: list[str]) -> None: 

414 """ 

415 :param new_index_urls: New index urls to update the authentication 

416 handler with. 

417 """ 

418 self.auth.index_urls = new_index_urls 

419 

420 def add_trusted_host( 

421 self, host: str, source: str | None = None, suppress_logging: bool = False 

422 ) -> None: 

423 """ 

424 :param host: It is okay to provide a host that has previously been 

425 added. 

426 :param source: An optional source string, for logging where the host 

427 string came from. 

428 """ 

429 if not suppress_logging: 

430 msg = f"adding trusted host: {host!r}" 

431 if source is not None: 

432 msg += f" (from {source})" 

433 logger.info(msg) 

434 

435 parsed_host, parsed_port = parse_netloc(host) 

436 if parsed_host is None: 

437 raise ValueError(f"Trusted host URL must include a host part: {host!r}") 

438 if (parsed_host, parsed_port) not in self.pip_trusted_origins: 

439 self.pip_trusted_origins.append((parsed_host, parsed_port)) 

440 

441 self.mount( 

442 build_url_from_netloc(host, scheme="http") + "/", self._trusted_host_adapter 

443 ) 

444 self.mount(build_url_from_netloc(host) + "/", self._trusted_host_adapter) 

445 if not parsed_port: 

446 self.mount( 

447 build_url_from_netloc(host, scheme="http") + ":", 

448 self._trusted_host_adapter, 

449 ) 

450 # Mount wildcard ports for the same host. 

451 self.mount(build_url_from_netloc(host) + ":", self._trusted_host_adapter) 

452 

453 def iter_secure_origins(self) -> Generator[SecureOrigin, None, None]: 

454 yield from SECURE_ORIGINS 

455 for host, port in self.pip_trusted_origins: 

456 yield ("*", host, "*" if port is None else port) 

457 

458 def is_secure_origin(self, location: Link) -> bool: 

459 # Determine if this url used a secure transport mechanism 

460 parsed = urllib.parse.urlparse(str(location)) 

461 origin_protocol, origin_host, origin_port = ( 

462 parsed.scheme, 

463 parsed.hostname, 

464 parsed.port, 

465 ) 

466 

467 # The protocol to use to see if the protocol matches. 

468 # Don't count the repository type as part of the protocol: in 

469 # cases such as "git+ssh", only use "ssh". (I.e., Only verify against 

470 # the last scheme.) 

471 origin_protocol = origin_protocol.rsplit("+", 1)[-1] 

472 

473 # Determine if our origin is a secure origin by looking through our 

474 # hardcoded list of secure origins, as well as any additional ones 

475 # configured on this PackageFinder instance. 

476 for secure_origin in self.iter_secure_origins(): 

477 secure_protocol, secure_host, secure_port = secure_origin 

478 if origin_protocol != secure_protocol and secure_protocol != "*": 

479 continue 

480 

481 try: 

482 addr = ipaddress.ip_address(origin_host or "") 

483 network = ipaddress.ip_network(secure_host) 

484 except ValueError: 

485 # We don't have both a valid address or a valid network, so 

486 # we'll check this origin against hostnames. 

487 if ( 

488 origin_host 

489 and origin_host.lower() != secure_host.lower() 

490 and secure_host != "*" 

491 ): 

492 continue 

493 else: 

494 # We have a valid address and network, so see if the address 

495 # is contained within the network. 

496 if addr not in network: 

497 continue 

498 

499 # Check to see if the port matches. 

500 if ( 

501 origin_port != secure_port 

502 and secure_port != "*" 

503 and secure_port is not None 

504 ): 

505 continue 

506 

507 # If we've gotten here, then this origin matches the current 

508 # secure origin and we should return True 

509 return True 

510 

511 # If we've gotten to this point, then the origin isn't secure and we 

512 # will not accept it as a valid location to search. We will however 

513 # log a warning that we are ignoring it. 

514 logger.warning( 

515 "The repository located at %s is not a trusted or secure host and " 

516 "is being ignored. If this repository is available via HTTPS we " 

517 "recommend you use HTTPS instead, otherwise you may silence " 

518 "this warning and allow it anyway with '--trusted-host %s'.", 

519 origin_host, 

520 origin_host, 

521 ) 

522 

523 return False 

524 

525 def request(self, method: str, url: str, *args: Any, **kwargs: Any) -> Response: # type: ignore[override] 

526 # Allow setting a default timeout on a session 

527 kwargs.setdefault("timeout", self.timeout) 

528 # Allow setting a default proxies on a session 

529 kwargs.setdefault("proxies", self.proxies) 

530 

531 # Dispatch the actual request 

532 return super().request(method, url, *args, **kwargs)