Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/connection.py: 30%

336 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1from __future__ import annotations 

2 

3import datetime 

4import logging 

5import os 

6import re 

7import socket 

8import sys 

9import typing 

10import warnings 

11from http.client import HTTPConnection as _HTTPConnection 

12from http.client import HTTPException as HTTPException # noqa: F401 

13from http.client import ResponseNotReady 

14from socket import timeout as SocketTimeout 

15 

16if typing.TYPE_CHECKING: 

17 from typing import Literal 

18 

19 from .response import HTTPResponse 

20 from .util.ssl_ import _TYPE_PEER_CERT_RET_DICT 

21 from .util.ssltransport import SSLTransport 

22 

23from ._collections import HTTPHeaderDict 

24from .util.response import assert_header_parsing 

25from .util.timeout import _DEFAULT_TIMEOUT, _TYPE_TIMEOUT, Timeout 

26from .util.util import to_str 

27from .util.wait import wait_for_read 

28 

29try: # Compiled with SSL? 

30 import ssl 

31 

32 BaseSSLError = ssl.SSLError 

33except (ImportError, AttributeError): 

34 ssl = None # type: ignore[assignment] 

35 

36 class BaseSSLError(BaseException): # type: ignore[no-redef] 

37 pass 

38 

39 

40from ._base_connection import _TYPE_BODY 

41from ._base_connection import ProxyConfig as ProxyConfig 

42from ._base_connection import _ResponseOptions as _ResponseOptions 

43from ._version import __version__ 

44from .exceptions import ( 

45 ConnectTimeoutError, 

46 HeaderParsingError, 

47 NameResolutionError, 

48 NewConnectionError, 

49 ProxyError, 

50 SystemTimeWarning, 

51) 

52from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection, ssl_ 

53from .util.request import body_to_chunks 

54from .util.ssl_ import assert_fingerprint as _assert_fingerprint 

55from .util.ssl_ import ( 

56 create_urllib3_context, 

57 is_ipaddress, 

58 resolve_cert_reqs, 

59 resolve_ssl_version, 

60 ssl_wrap_socket, 

61) 

62from .util.ssl_match_hostname import CertificateError, match_hostname 

63from .util.url import Url 

64 

65# Not a no-op, we're adding this to the namespace so it can be imported. 

66ConnectionError = ConnectionError 

67BrokenPipeError = BrokenPipeError 

68 

69 

70log = logging.getLogger(__name__) 

71 

72port_by_scheme = {"http": 80, "https": 443} 

73 

74# When it comes time to update this value as a part of regular maintenance 

75# (ie test_recent_date is failing) update it to ~6 months before the current date. 

76RECENT_DATE = datetime.date(2023, 6, 1) 

77 

78_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

79 

80_HAS_SYS_AUDIT = hasattr(sys, "audit") 

81 

82 

83class HTTPConnection(_HTTPConnection): 

84 """ 

85 Based on :class:`http.client.HTTPConnection` but provides an extra constructor 

86 backwards-compatibility layer between older and newer Pythons. 

87 

88 Additional keyword parameters are used to configure attributes of the connection. 

89 Accepted parameters include: 

90 

91 - ``source_address``: Set the source address for the current connection. 

92 - ``socket_options``: Set specific options on the underlying socket. If not specified, then 

93 defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling 

94 Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy. 

95 

96 For example, if you wish to enable TCP Keep Alive in addition to the defaults, 

97 you might pass: 

98 

99 .. code-block:: python 

100 

101 HTTPConnection.default_socket_options + [ 

102 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), 

103 ] 

104 

105 Or you may want to disable the defaults by passing an empty list (e.g., ``[]``). 

106 """ 

107 

108 default_port: typing.ClassVar[int] = port_by_scheme["http"] # type: ignore[misc] 

109 

110 #: Disable Nagle's algorithm by default. 

111 #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]`` 

112 default_socket_options: typing.ClassVar[connection._TYPE_SOCKET_OPTIONS] = [ 

113 (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

114 ] 

115 

116 #: Whether this connection verifies the host's certificate. 

117 is_verified: bool = False 

118 

119 #: Whether this proxy connection verified the proxy host's certificate. 

120 # If no proxy is currently connected to the value will be ``None``. 

121 proxy_is_verified: bool | None = None 

122 

123 blocksize: int 

124 source_address: tuple[str, int] | None 

125 socket_options: connection._TYPE_SOCKET_OPTIONS | None 

126 

127 _has_connected_to_proxy: bool 

128 _response_options: _ResponseOptions | None 

129 _tunnel_host: str | None 

130 _tunnel_port: int | None 

131 _tunnel_scheme: str | None 

132 

133 def __init__( 

134 self, 

135 host: str, 

136 port: int | None = None, 

137 *, 

138 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, 

139 source_address: tuple[str, int] | None = None, 

140 blocksize: int = 16384, 

141 socket_options: None 

142 | (connection._TYPE_SOCKET_OPTIONS) = default_socket_options, 

143 proxy: Url | None = None, 

144 proxy_config: ProxyConfig | None = None, 

145 ) -> None: 

146 super().__init__( 

147 host=host, 

148 port=port, 

149 timeout=Timeout.resolve_default_timeout(timeout), 

150 source_address=source_address, 

151 blocksize=blocksize, 

152 ) 

153 self.socket_options = socket_options 

154 self.proxy = proxy 

155 self.proxy_config = proxy_config 

156 

157 self._has_connected_to_proxy = False 

158 self._response_options = None 

159 self._tunnel_host: str | None = None 

160 self._tunnel_port: int | None = None 

161 self._tunnel_scheme: str | None = None 

162 

163 @property 

164 def host(self) -> str: 

165 """ 

166 Getter method to remove any trailing dots that indicate the hostname is an FQDN. 

167 

168 In general, SSL certificates don't include the trailing dot indicating a 

169 fully-qualified domain name, and thus, they don't validate properly when 

170 checked against a domain name that includes the dot. In addition, some 

171 servers may not expect to receive the trailing dot when provided. 

172 

173 However, the hostname with trailing dot is critical to DNS resolution; doing a 

174 lookup with the trailing dot will properly only resolve the appropriate FQDN, 

175 whereas a lookup without a trailing dot will search the system's search domain 

176 list. Thus, it's important to keep the original host around for use only in 

177 those cases where it's appropriate (i.e., when doing DNS lookup to establish the 

178 actual TCP connection across which we're going to send HTTP requests). 

179 """ 

180 return self._dns_host.rstrip(".") 

181 

182 @host.setter 

183 def host(self, value: str) -> None: 

184 """ 

185 Setter for the `host` property. 

186 

187 We assume that only urllib3 uses the _dns_host attribute; httplib itself 

188 only uses `host`, and it seems reasonable that other libraries follow suit. 

189 """ 

190 self._dns_host = value 

191 

192 def _new_conn(self) -> socket.socket: 

193 """Establish a socket connection and set nodelay settings on it. 

194 

195 :return: New socket connection. 

196 """ 

197 try: 

198 sock = connection.create_connection( 

199 (self._dns_host, self.port), 

200 self.timeout, 

201 source_address=self.source_address, 

202 socket_options=self.socket_options, 

203 ) 

204 except socket.gaierror as e: 

205 raise NameResolutionError(self.host, self, e) from e 

206 except SocketTimeout as e: 

207 raise ConnectTimeoutError( 

208 self, 

209 f"Connection to {self.host} timed out. (connect timeout={self.timeout})", 

210 ) from e 

211 

212 except OSError as e: 

213 raise NewConnectionError( 

214 self, f"Failed to establish a new connection: {e}" 

215 ) from e 

216 

217 # Audit hooks are only available in Python 3.8+ 

218 if _HAS_SYS_AUDIT: 

219 sys.audit("http.client.connect", self, self.host, self.port) 

220 

221 return sock 

222 

223 def set_tunnel( 

224 self, 

225 host: str, 

226 port: int | None = None, 

227 headers: typing.Mapping[str, str] | None = None, 

228 scheme: str = "http", 

229 ) -> None: 

230 if scheme not in ("http", "https"): 

231 raise ValueError( 

232 f"Invalid proxy scheme for tunneling: {scheme!r}, must be either 'http' or 'https'" 

233 ) 

234 super().set_tunnel(host, port=port, headers=headers) 

235 self._tunnel_scheme = scheme 

236 

237 def connect(self) -> None: 

238 self.sock = self._new_conn() 

239 if self._tunnel_host: 

240 # If we're tunneling it means we're connected to our proxy. 

241 self._has_connected_to_proxy = True 

242 

243 # TODO: Fix tunnel so it doesn't depend on self.sock state. 

244 self._tunnel() # type: ignore[attr-defined] 

245 

246 # If there's a proxy to be connected to we are fully connected. 

247 # This is set twice (once above and here) due to forwarding proxies 

248 # not using tunnelling. 

249 self._has_connected_to_proxy = bool(self.proxy) 

250 

251 if self._has_connected_to_proxy: 

252 self.proxy_is_verified = False 

253 

254 @property 

255 def is_closed(self) -> bool: 

256 return self.sock is None 

257 

258 @property 

259 def is_connected(self) -> bool: 

260 if self.sock is None: 

261 return False 

262 return not wait_for_read(self.sock, timeout=0.0) 

263 

264 @property 

265 def has_connected_to_proxy(self) -> bool: 

266 return self._has_connected_to_proxy 

267 

268 @property 

269 def proxy_is_forwarding(self) -> bool: 

270 """ 

271 Return True if a forwarding proxy is configured, else return False 

272 """ 

273 return bool(self.proxy) and self._tunnel_host is None 

274 

275 def close(self) -> None: 

276 try: 

277 super().close() 

278 finally: 

279 # Reset all stateful properties so connection 

280 # can be re-used without leaking prior configs. 

281 self.sock = None 

282 self.is_verified = False 

283 self.proxy_is_verified = None 

284 self._has_connected_to_proxy = False 

285 self._response_options = None 

286 self._tunnel_host = None 

287 self._tunnel_port = None 

288 self._tunnel_scheme = None 

289 

290 def putrequest( 

291 self, 

292 method: str, 

293 url: str, 

294 skip_host: bool = False, 

295 skip_accept_encoding: bool = False, 

296 ) -> None: 

297 """""" 

298 # Empty docstring because the indentation of CPython's implementation 

299 # is broken but we don't want this method in our documentation. 

300 match = _CONTAINS_CONTROL_CHAR_RE.search(method) 

301 if match: 

302 raise ValueError( 

303 f"Method cannot contain non-token characters {method!r} (found at least {match.group()!r})" 

304 ) 

305 

306 return super().putrequest( 

307 method, url, skip_host=skip_host, skip_accept_encoding=skip_accept_encoding 

308 ) 

309 

310 def putheader(self, header: str, *values: str) -> None: # type: ignore[override] 

311 """""" 

312 if not any(isinstance(v, str) and v == SKIP_HEADER for v in values): 

313 super().putheader(header, *values) 

314 elif to_str(header.lower()) not in SKIPPABLE_HEADERS: 

315 skippable_headers = "', '".join( 

316 [str.title(header) for header in sorted(SKIPPABLE_HEADERS)] 

317 ) 

318 raise ValueError( 

319 f"urllib3.util.SKIP_HEADER only supports '{skippable_headers}'" 

320 ) 

321 

322 # `request` method's signature intentionally violates LSP. 

323 # urllib3's API is different from `http.client.HTTPConnection` and the subclassing is only incidental. 

324 def request( # type: ignore[override] 

325 self, 

326 method: str, 

327 url: str, 

328 body: _TYPE_BODY | None = None, 

329 headers: typing.Mapping[str, str] | None = None, 

330 *, 

331 chunked: bool = False, 

332 preload_content: bool = True, 

333 decode_content: bool = True, 

334 enforce_content_length: bool = True, 

335 ) -> None: 

336 # Update the inner socket's timeout value to send the request. 

337 # This only triggers if the connection is re-used. 

338 if self.sock is not None: 

339 self.sock.settimeout(self.timeout) 

340 

341 # Store these values to be fed into the HTTPResponse 

342 # object later. TODO: Remove this in favor of a real 

343 # HTTP lifecycle mechanism. 

344 

345 # We have to store these before we call .request() 

346 # because sometimes we can still salvage a response 

347 # off the wire even if we aren't able to completely 

348 # send the request body. 

349 self._response_options = _ResponseOptions( 

350 request_method=method, 

351 request_url=url, 

352 preload_content=preload_content, 

353 decode_content=decode_content, 

354 enforce_content_length=enforce_content_length, 

355 ) 

356 

357 if headers is None: 

358 headers = {} 

359 header_keys = frozenset(to_str(k.lower()) for k in headers) 

360 skip_accept_encoding = "accept-encoding" in header_keys 

361 skip_host = "host" in header_keys 

362 self.putrequest( 

363 method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host 

364 ) 

365 

366 # Transform the body into an iterable of sendall()-able chunks 

367 # and detect if an explicit Content-Length is doable. 

368 chunks_and_cl = body_to_chunks(body, method=method, blocksize=self.blocksize) 

369 chunks = chunks_and_cl.chunks 

370 content_length = chunks_and_cl.content_length 

371 

372 # When chunked is explicit set to 'True' we respect that. 

373 if chunked: 

374 if "transfer-encoding" not in header_keys: 

375 self.putheader("Transfer-Encoding", "chunked") 

376 else: 

377 # Detect whether a framing mechanism is already in use. If so 

378 # we respect that value, otherwise we pick chunked vs content-length 

379 # depending on the type of 'body'. 

380 if "content-length" in header_keys: 

381 chunked = False 

382 elif "transfer-encoding" in header_keys: 

383 chunked = True 

384 

385 # Otherwise we go off the recommendation of 'body_to_chunks()'. 

386 else: 

387 chunked = False 

388 if content_length is None: 

389 if chunks is not None: 

390 chunked = True 

391 self.putheader("Transfer-Encoding", "chunked") 

392 else: 

393 self.putheader("Content-Length", str(content_length)) 

394 

395 # Now that framing headers are out of the way we send all the other headers. 

396 if "user-agent" not in header_keys: 

397 self.putheader("User-Agent", _get_default_user_agent()) 

398 for header, value in headers.items(): 

399 self.putheader(header, value) 

400 self.endheaders() 

401 

402 # If we're given a body we start sending that in chunks. 

403 if chunks is not None: 

404 for chunk in chunks: 

405 # Sending empty chunks isn't allowed for TE: chunked 

406 # as it indicates the end of the body. 

407 if not chunk: 

408 continue 

409 if isinstance(chunk, str): 

410 chunk = chunk.encode("utf-8") 

411 if chunked: 

412 self.send(b"%x\r\n%b\r\n" % (len(chunk), chunk)) 

413 else: 

414 self.send(chunk) 

415 

416 # Regardless of whether we have a body or not, if we're in 

417 # chunked mode we want to send an explicit empty chunk. 

418 if chunked: 

419 self.send(b"0\r\n\r\n") 

420 

421 def request_chunked( 

422 self, 

423 method: str, 

424 url: str, 

425 body: _TYPE_BODY | None = None, 

426 headers: typing.Mapping[str, str] | None = None, 

427 ) -> None: 

428 """ 

429 Alternative to the common request method, which sends the 

430 body with chunked encoding and not as one block 

431 """ 

432 warnings.warn( 

433 "HTTPConnection.request_chunked() is deprecated and will be removed " 

434 "in urllib3 v2.1.0. Instead use HTTPConnection.request(..., chunked=True).", 

435 category=DeprecationWarning, 

436 stacklevel=2, 

437 ) 

438 self.request(method, url, body=body, headers=headers, chunked=True) 

439 

440 def getresponse( # type: ignore[override] 

441 self, 

442 ) -> HTTPResponse: 

443 """ 

444 Get the response from the server. 

445 

446 If the HTTPConnection is in the correct state, returns an instance of HTTPResponse or of whatever object is returned by the response_class variable. 

447 

448 If a request has not been sent or if a previous response has not be handled, ResponseNotReady is raised. If the HTTP response indicates that the connection should be closed, then it will be closed before the response is returned. When the connection is closed, the underlying socket is closed. 

449 """ 

450 # Raise the same error as http.client.HTTPConnection 

451 if self._response_options is None: 

452 raise ResponseNotReady() 

453 

454 # Reset this attribute for being used again. 

455 resp_options = self._response_options 

456 self._response_options = None 

457 

458 # Since the connection's timeout value may have been updated 

459 # we need to set the timeout on the socket. 

460 self.sock.settimeout(self.timeout) 

461 

462 # This is needed here to avoid circular import errors 

463 from .response import HTTPResponse 

464 

465 # Get the response from http.client.HTTPConnection 

466 httplib_response = super().getresponse() 

467 

468 try: 

469 assert_header_parsing(httplib_response.msg) 

470 except (HeaderParsingError, TypeError) as hpe: 

471 log.warning( 

472 "Failed to parse headers (url=%s): %s", 

473 _url_from_connection(self, resp_options.request_url), 

474 hpe, 

475 exc_info=True, 

476 ) 

477 

478 headers = HTTPHeaderDict(httplib_response.msg.items()) 

479 

480 response = HTTPResponse( 

481 body=httplib_response, 

482 headers=headers, 

483 status=httplib_response.status, 

484 version=httplib_response.version, 

485 reason=httplib_response.reason, 

486 preload_content=resp_options.preload_content, 

487 decode_content=resp_options.decode_content, 

488 original_response=httplib_response, 

489 enforce_content_length=resp_options.enforce_content_length, 

490 request_method=resp_options.request_method, 

491 request_url=resp_options.request_url, 

492 ) 

493 return response 

494 

495 

496class HTTPSConnection(HTTPConnection): 

497 """ 

498 Many of the parameters to this constructor are passed to the underlying SSL 

499 socket by means of :py:func:`urllib3.util.ssl_wrap_socket`. 

500 """ 

501 

502 default_port = port_by_scheme["https"] # type: ignore[misc] 

503 

504 cert_reqs: int | str | None = None 

505 ca_certs: str | None = None 

506 ca_cert_dir: str | None = None 

507 ca_cert_data: None | str | bytes = None 

508 ssl_version: int | str | None = None 

509 ssl_minimum_version: int | None = None 

510 ssl_maximum_version: int | None = None 

511 assert_fingerprint: str | None = None 

512 

513 def __init__( 

514 self, 

515 host: str, 

516 port: int | None = None, 

517 *, 

518 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, 

519 source_address: tuple[str, int] | None = None, 

520 blocksize: int = 16384, 

521 socket_options: None 

522 | (connection._TYPE_SOCKET_OPTIONS) = HTTPConnection.default_socket_options, 

523 proxy: Url | None = None, 

524 proxy_config: ProxyConfig | None = None, 

525 cert_reqs: int | str | None = None, 

526 assert_hostname: None | str | Literal[False] = None, 

527 assert_fingerprint: str | None = None, 

528 server_hostname: str | None = None, 

529 ssl_context: ssl.SSLContext | None = None, 

530 ca_certs: str | None = None, 

531 ca_cert_dir: str | None = None, 

532 ca_cert_data: None | str | bytes = None, 

533 ssl_minimum_version: int | None = None, 

534 ssl_maximum_version: int | None = None, 

535 ssl_version: int | str | None = None, # Deprecated 

536 cert_file: str | None = None, 

537 key_file: str | None = None, 

538 key_password: str | None = None, 

539 ) -> None: 

540 super().__init__( 

541 host, 

542 port=port, 

543 timeout=timeout, 

544 source_address=source_address, 

545 blocksize=blocksize, 

546 socket_options=socket_options, 

547 proxy=proxy, 

548 proxy_config=proxy_config, 

549 ) 

550 

551 self.key_file = key_file 

552 self.cert_file = cert_file 

553 self.key_password = key_password 

554 self.ssl_context = ssl_context 

555 self.server_hostname = server_hostname 

556 self.assert_hostname = assert_hostname 

557 self.assert_fingerprint = assert_fingerprint 

558 self.ssl_version = ssl_version 

559 self.ssl_minimum_version = ssl_minimum_version 

560 self.ssl_maximum_version = ssl_maximum_version 

561 self.ca_certs = ca_certs and os.path.expanduser(ca_certs) 

562 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir) 

563 self.ca_cert_data = ca_cert_data 

564 

565 # cert_reqs depends on ssl_context so calculate last. 

566 if cert_reqs is None: 

567 if self.ssl_context is not None: 

568 cert_reqs = self.ssl_context.verify_mode 

569 else: 

570 cert_reqs = resolve_cert_reqs(None) 

571 self.cert_reqs = cert_reqs 

572 

573 def set_cert( 

574 self, 

575 key_file: str | None = None, 

576 cert_file: str | None = None, 

577 cert_reqs: int | str | None = None, 

578 key_password: str | None = None, 

579 ca_certs: str | None = None, 

580 assert_hostname: None | str | Literal[False] = None, 

581 assert_fingerprint: str | None = None, 

582 ca_cert_dir: str | None = None, 

583 ca_cert_data: None | str | bytes = None, 

584 ) -> None: 

585 """ 

586 This method should only be called once, before the connection is used. 

587 """ 

588 warnings.warn( 

589 "HTTPSConnection.set_cert() is deprecated and will be removed " 

590 "in urllib3 v2.1.0. Instead provide the parameters to the " 

591 "HTTPSConnection constructor.", 

592 category=DeprecationWarning, 

593 stacklevel=2, 

594 ) 

595 

596 # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also 

597 # have an SSLContext object in which case we'll use its verify_mode. 

598 if cert_reqs is None: 

599 if self.ssl_context is not None: 

600 cert_reqs = self.ssl_context.verify_mode 

601 else: 

602 cert_reqs = resolve_cert_reqs(None) 

603 

604 self.key_file = key_file 

605 self.cert_file = cert_file 

606 self.cert_reqs = cert_reqs 

607 self.key_password = key_password 

608 self.assert_hostname = assert_hostname 

609 self.assert_fingerprint = assert_fingerprint 

610 self.ca_certs = ca_certs and os.path.expanduser(ca_certs) 

611 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir) 

612 self.ca_cert_data = ca_cert_data 

613 

614 def connect(self) -> None: 

615 sock: socket.socket | ssl.SSLSocket 

616 self.sock = sock = self._new_conn() 

617 server_hostname: str = self.host 

618 tls_in_tls = False 

619 

620 # Do we need to establish a tunnel? 

621 if self._tunnel_host is not None: 

622 # We're tunneling to an HTTPS origin so need to do TLS-in-TLS. 

623 if self._tunnel_scheme == "https": 

624 # _connect_tls_proxy will verify and assign proxy_is_verified 

625 self.sock = sock = self._connect_tls_proxy(self.host, sock) 

626 tls_in_tls = True 

627 elif self._tunnel_scheme == "http": 

628 self.proxy_is_verified = False 

629 

630 # If we're tunneling it means we're connected to our proxy. 

631 self._has_connected_to_proxy = True 

632 

633 self._tunnel() # type: ignore[attr-defined] 

634 # Override the host with the one we're requesting data from. 

635 server_hostname = self._tunnel_host 

636 

637 if self.server_hostname is not None: 

638 server_hostname = self.server_hostname 

639 

640 is_time_off = datetime.date.today() < RECENT_DATE 

641 if is_time_off: 

642 warnings.warn( 

643 ( 

644 f"System time is way off (before {RECENT_DATE}). This will probably " 

645 "lead to SSL verification errors" 

646 ), 

647 SystemTimeWarning, 

648 ) 

649 

650 # Remove trailing '.' from fqdn hostnames to allow certificate validation 

651 server_hostname_rm_dot = server_hostname.rstrip(".") 

652 

653 sock_and_verified = _ssl_wrap_socket_and_match_hostname( 

654 sock=sock, 

655 cert_reqs=self.cert_reqs, 

656 ssl_version=self.ssl_version, 

657 ssl_minimum_version=self.ssl_minimum_version, 

658 ssl_maximum_version=self.ssl_maximum_version, 

659 ca_certs=self.ca_certs, 

660 ca_cert_dir=self.ca_cert_dir, 

661 ca_cert_data=self.ca_cert_data, 

662 cert_file=self.cert_file, 

663 key_file=self.key_file, 

664 key_password=self.key_password, 

665 server_hostname=server_hostname_rm_dot, 

666 ssl_context=self.ssl_context, 

667 tls_in_tls=tls_in_tls, 

668 assert_hostname=self.assert_hostname, 

669 assert_fingerprint=self.assert_fingerprint, 

670 ) 

671 self.sock = sock_and_verified.socket 

672 

673 # Forwarding proxies can never have a verified target since 

674 # the proxy is the one doing the verification. Should instead 

675 # use a CONNECT tunnel in order to verify the target. 

676 # See: https://github.com/urllib3/urllib3/issues/3267. 

677 if self.proxy_is_forwarding: 

678 self.is_verified = False 

679 else: 

680 self.is_verified = sock_and_verified.is_verified 

681 

682 # If there's a proxy to be connected to we are fully connected. 

683 # This is set twice (once above and here) due to forwarding proxies 

684 # not using tunnelling. 

685 self._has_connected_to_proxy = bool(self.proxy) 

686 

687 # Set `self.proxy_is_verified` unless it's already set while 

688 # establishing a tunnel. 

689 if self._has_connected_to_proxy and self.proxy_is_verified is None: 

690 self.proxy_is_verified = sock_and_verified.is_verified 

691 

692 def _connect_tls_proxy(self, hostname: str, sock: socket.socket) -> ssl.SSLSocket: 

693 """ 

694 Establish a TLS connection to the proxy using the provided SSL context. 

695 """ 

696 # `_connect_tls_proxy` is called when self._tunnel_host is truthy. 

697 proxy_config = typing.cast(ProxyConfig, self.proxy_config) 

698 ssl_context = proxy_config.ssl_context 

699 sock_and_verified = _ssl_wrap_socket_and_match_hostname( 

700 sock, 

701 cert_reqs=self.cert_reqs, 

702 ssl_version=self.ssl_version, 

703 ssl_minimum_version=self.ssl_minimum_version, 

704 ssl_maximum_version=self.ssl_maximum_version, 

705 ca_certs=self.ca_certs, 

706 ca_cert_dir=self.ca_cert_dir, 

707 ca_cert_data=self.ca_cert_data, 

708 server_hostname=hostname, 

709 ssl_context=ssl_context, 

710 assert_hostname=proxy_config.assert_hostname, 

711 assert_fingerprint=proxy_config.assert_fingerprint, 

712 # Features that aren't implemented for proxies yet: 

713 cert_file=None, 

714 key_file=None, 

715 key_password=None, 

716 tls_in_tls=False, 

717 ) 

718 self.proxy_is_verified = sock_and_verified.is_verified 

719 return sock_and_verified.socket # type: ignore[return-value] 

720 

721 

722class _WrappedAndVerifiedSocket(typing.NamedTuple): 

723 """ 

724 Wrapped socket and whether the connection is 

725 verified after the TLS handshake 

726 """ 

727 

728 socket: ssl.SSLSocket | SSLTransport 

729 is_verified: bool 

730 

731 

732def _ssl_wrap_socket_and_match_hostname( 

733 sock: socket.socket, 

734 *, 

735 cert_reqs: None | str | int, 

736 ssl_version: None | str | int, 

737 ssl_minimum_version: int | None, 

738 ssl_maximum_version: int | None, 

739 cert_file: str | None, 

740 key_file: str | None, 

741 key_password: str | None, 

742 ca_certs: str | None, 

743 ca_cert_dir: str | None, 

744 ca_cert_data: None | str | bytes, 

745 assert_hostname: None | str | Literal[False], 

746 assert_fingerprint: str | None, 

747 server_hostname: str | None, 

748 ssl_context: ssl.SSLContext | None, 

749 tls_in_tls: bool = False, 

750) -> _WrappedAndVerifiedSocket: 

751 """Logic for constructing an SSLContext from all TLS parameters, passing 

752 that down into ssl_wrap_socket, and then doing certificate verification 

753 either via hostname or fingerprint. This function exists to guarantee 

754 that both proxies and targets have the same behavior when connecting via TLS. 

755 """ 

756 default_ssl_context = False 

757 if ssl_context is None: 

758 default_ssl_context = True 

759 context = create_urllib3_context( 

760 ssl_version=resolve_ssl_version(ssl_version), 

761 ssl_minimum_version=ssl_minimum_version, 

762 ssl_maximum_version=ssl_maximum_version, 

763 cert_reqs=resolve_cert_reqs(cert_reqs), 

764 ) 

765 else: 

766 context = ssl_context 

767 

768 context.verify_mode = resolve_cert_reqs(cert_reqs) 

769 

770 # In some cases, we want to verify hostnames ourselves 

771 if ( 

772 # `ssl` can't verify fingerprints or alternate hostnames 

773 assert_fingerprint 

774 or assert_hostname 

775 # assert_hostname can be set to False to disable hostname checking 

776 or assert_hostname is False 

777 # We still support OpenSSL 1.0.2, which prevents us from verifying 

778 # hostnames easily: https://github.com/pyca/pyopenssl/pull/933 

779 or ssl_.IS_PYOPENSSL 

780 or not ssl_.HAS_NEVER_CHECK_COMMON_NAME 

781 ): 

782 context.check_hostname = False 

783 

784 # Try to load OS default certs if none are given. We need to do the hasattr() check 

785 # for custom pyOpenSSL SSLContext objects because they don't support 

786 # load_default_certs(). 

787 if ( 

788 not ca_certs 

789 and not ca_cert_dir 

790 and not ca_cert_data 

791 and default_ssl_context 

792 and hasattr(context, "load_default_certs") 

793 ): 

794 context.load_default_certs() 

795 

796 # Ensure that IPv6 addresses are in the proper format and don't have a 

797 # scope ID. Python's SSL module fails to recognize scoped IPv6 addresses 

798 # and interprets them as DNS hostnames. 

799 if server_hostname is not None: 

800 normalized = server_hostname.strip("[]") 

801 if "%" in normalized: 

802 normalized = normalized[: normalized.rfind("%")] 

803 if is_ipaddress(normalized): 

804 server_hostname = normalized 

805 

806 ssl_sock = ssl_wrap_socket( 

807 sock=sock, 

808 keyfile=key_file, 

809 certfile=cert_file, 

810 key_password=key_password, 

811 ca_certs=ca_certs, 

812 ca_cert_dir=ca_cert_dir, 

813 ca_cert_data=ca_cert_data, 

814 server_hostname=server_hostname, 

815 ssl_context=context, 

816 tls_in_tls=tls_in_tls, 

817 ) 

818 

819 try: 

820 if assert_fingerprint: 

821 _assert_fingerprint( 

822 ssl_sock.getpeercert(binary_form=True), assert_fingerprint 

823 ) 

824 elif ( 

825 context.verify_mode != ssl.CERT_NONE 

826 and not context.check_hostname 

827 and assert_hostname is not False 

828 ): 

829 cert: _TYPE_PEER_CERT_RET_DICT = ssl_sock.getpeercert() # type: ignore[assignment] 

830 

831 # Need to signal to our match_hostname whether to use 'commonName' or not. 

832 # If we're using our own constructed SSLContext we explicitly set 'False' 

833 # because PyPy hard-codes 'True' from SSLContext.hostname_checks_common_name. 

834 if default_ssl_context: 

835 hostname_checks_common_name = False 

836 else: 

837 hostname_checks_common_name = ( 

838 getattr(context, "hostname_checks_common_name", False) or False 

839 ) 

840 

841 _match_hostname( 

842 cert, 

843 assert_hostname or server_hostname, # type: ignore[arg-type] 

844 hostname_checks_common_name, 

845 ) 

846 

847 return _WrappedAndVerifiedSocket( 

848 socket=ssl_sock, 

849 is_verified=context.verify_mode == ssl.CERT_REQUIRED 

850 or bool(assert_fingerprint), 

851 ) 

852 except BaseException: 

853 ssl_sock.close() 

854 raise 

855 

856 

857def _match_hostname( 

858 cert: _TYPE_PEER_CERT_RET_DICT | None, 

859 asserted_hostname: str, 

860 hostname_checks_common_name: bool = False, 

861) -> None: 

862 # Our upstream implementation of ssl.match_hostname() 

863 # only applies this normalization to IP addresses so it doesn't 

864 # match DNS SANs so we do the same thing! 

865 stripped_hostname = asserted_hostname.strip("[]") 

866 if is_ipaddress(stripped_hostname): 

867 asserted_hostname = stripped_hostname 

868 

869 try: 

870 match_hostname(cert, asserted_hostname, hostname_checks_common_name) 

871 except CertificateError as e: 

872 log.warning( 

873 "Certificate did not match expected hostname: %s. Certificate: %s", 

874 asserted_hostname, 

875 cert, 

876 ) 

877 # Add cert to exception and reraise so client code can inspect 

878 # the cert when catching the exception, if they want to 

879 e._peer_cert = cert # type: ignore[attr-defined] 

880 raise 

881 

882 

883def _wrap_proxy_error(err: Exception, proxy_scheme: str | None) -> ProxyError: 

884 # Look for the phrase 'wrong version number', if found 

885 # then we should warn the user that we're very sure that 

886 # this proxy is HTTP-only and they have a configuration issue. 

887 error_normalized = " ".join(re.split("[^a-z]", str(err).lower())) 

888 is_likely_http_proxy = ( 

889 "wrong version number" in error_normalized 

890 or "unknown protocol" in error_normalized 

891 or "record layer failure" in error_normalized 

892 ) 

893 http_proxy_warning = ( 

894 ". Your proxy appears to only use HTTP and not HTTPS, " 

895 "try changing your proxy URL to be HTTP. See: " 

896 "https://urllib3.readthedocs.io/en/latest/advanced-usage.html" 

897 "#https-proxy-error-http-proxy" 

898 ) 

899 new_err = ProxyError( 

900 f"Unable to connect to proxy" 

901 f"{http_proxy_warning if is_likely_http_proxy and proxy_scheme == 'https' else ''}", 

902 err, 

903 ) 

904 new_err.__cause__ = err 

905 return new_err 

906 

907 

908def _get_default_user_agent() -> str: 

909 return f"python-urllib3/{__version__}" 

910 

911 

912class DummyConnection: 

913 """Used to detect a failed ConnectionCls import.""" 

914 

915 

916if not ssl: 

917 HTTPSConnection = DummyConnection # type: ignore[misc, assignment] # noqa: F811 

918 

919 

920VerifiedHTTPSConnection = HTTPSConnection 

921 

922 

923def _url_from_connection( 

924 conn: HTTPConnection | HTTPSConnection, path: str | None = None 

925) -> str: 

926 """Returns the URL from a given connection. This is mainly used for testing and logging.""" 

927 

928 scheme = "https" if isinstance(conn, HTTPSConnection) else "http" 

929 

930 return Url(scheme=scheme, host=conn.host, port=conn.port, path=path).url