Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/connection.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

335 statements  

1from __future__ import annotations 

2 

3import datetime 

4import logging 

5import os 

6import re 

7import socket 

8import sys 

9import typing 

10import warnings 

11from http.client import HTTPConnection as _HTTPConnection 

12from http.client import HTTPException as HTTPException # noqa: F401 

13from http.client import ResponseNotReady 

14from socket import timeout as SocketTimeout 

15 

16if typing.TYPE_CHECKING: 

17 from .response import HTTPResponse 

18 from .util.ssl_ import _TYPE_PEER_CERT_RET_DICT 

19 from .util.ssltransport import SSLTransport 

20 

21from ._collections import HTTPHeaderDict 

22from .util.response import assert_header_parsing 

23from .util.timeout import _DEFAULT_TIMEOUT, _TYPE_TIMEOUT, Timeout 

24from .util.util import to_str 

25from .util.wait import wait_for_read 

26 

27try: # Compiled with SSL? 

28 import ssl 

29 

30 BaseSSLError = ssl.SSLError 

31except (ImportError, AttributeError): 

32 ssl = None # type: ignore[assignment] 

33 

34 class BaseSSLError(BaseException): # type: ignore[no-redef] 

35 pass 

36 

37 

38from ._base_connection import _TYPE_BODY 

39from ._base_connection import ProxyConfig as ProxyConfig 

40from ._base_connection import _ResponseOptions as _ResponseOptions 

41from ._version import __version__ 

42from .exceptions import ( 

43 ConnectTimeoutError, 

44 HeaderParsingError, 

45 NameResolutionError, 

46 NewConnectionError, 

47 ProxyError, 

48 SystemTimeWarning, 

49) 

50from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection, ssl_ 

51from .util.request import body_to_chunks 

52from .util.ssl_ import assert_fingerprint as _assert_fingerprint 

53from .util.ssl_ import ( 

54 create_urllib3_context, 

55 is_ipaddress, 

56 resolve_cert_reqs, 

57 resolve_ssl_version, 

58 ssl_wrap_socket, 

59) 

60from .util.ssl_match_hostname import CertificateError, match_hostname 

61from .util.url import Url 

62 

63# Not a no-op, we're adding this to the namespace so it can be imported. 

64ConnectionError = ConnectionError 

65BrokenPipeError = BrokenPipeError 

66 

67 

68log = logging.getLogger(__name__) 

69 

70port_by_scheme = {"http": 80, "https": 443} 

71 

72# When it comes time to update this value as a part of regular maintenance 

73# (ie test_recent_date is failing) update it to ~6 months before the current date. 

74RECENT_DATE = datetime.date(2023, 6, 1) 

75 

76_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

77 

78_HAS_SYS_AUDIT = hasattr(sys, "audit") 

79 

80 

81class HTTPConnection(_HTTPConnection): 

82 """ 

83 Based on :class:`http.client.HTTPConnection` but provides an extra constructor 

84 backwards-compatibility layer between older and newer Pythons. 

85 

86 Additional keyword parameters are used to configure attributes of the connection. 

87 Accepted parameters include: 

88 

89 - ``source_address``: Set the source address for the current connection. 

90 - ``socket_options``: Set specific options on the underlying socket. If not specified, then 

91 defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling 

92 Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy. 

93 

94 For example, if you wish to enable TCP Keep Alive in addition to the defaults, 

95 you might pass: 

96 

97 .. code-block:: python 

98 

99 HTTPConnection.default_socket_options + [ 

100 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), 

101 ] 

102 

103 Or you may want to disable the defaults by passing an empty list (e.g., ``[]``). 

104 """ 

105 

106 default_port: typing.ClassVar[int] = port_by_scheme["http"] # type: ignore[misc] 

107 

108 #: Disable Nagle's algorithm by default. 

109 #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]`` 

110 default_socket_options: typing.ClassVar[connection._TYPE_SOCKET_OPTIONS] = [ 

111 (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

112 ] 

113 

114 #: Whether this connection verifies the host's certificate. 

115 is_verified: bool = False 

116 

117 #: Whether this proxy connection verified the proxy host's certificate. 

118 # If no proxy is currently connected to the value will be ``None``. 

119 proxy_is_verified: bool | None = None 

120 

121 blocksize: int 

122 source_address: tuple[str, int] | None 

123 socket_options: connection._TYPE_SOCKET_OPTIONS | None 

124 

125 _has_connected_to_proxy: bool 

126 _response_options: _ResponseOptions | None 

127 _tunnel_host: str | None 

128 _tunnel_port: int | None 

129 _tunnel_scheme: str | None 

130 

131 def __init__( 

132 self, 

133 host: str, 

134 port: int | None = None, 

135 *, 

136 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, 

137 source_address: tuple[str, int] | None = None, 

138 blocksize: int = 16384, 

139 socket_options: None 

140 | (connection._TYPE_SOCKET_OPTIONS) = default_socket_options, 

141 proxy: Url | None = None, 

142 proxy_config: ProxyConfig | None = None, 

143 ) -> None: 

144 super().__init__( 

145 host=host, 

146 port=port, 

147 timeout=Timeout.resolve_default_timeout(timeout), 

148 source_address=source_address, 

149 blocksize=blocksize, 

150 ) 

151 self.socket_options = socket_options 

152 self.proxy = proxy 

153 self.proxy_config = proxy_config 

154 

155 self._has_connected_to_proxy = False 

156 self._response_options = None 

157 self._tunnel_host: str | None = None 

158 self._tunnel_port: int | None = None 

159 self._tunnel_scheme: str | None = None 

160 

161 @property 

162 def host(self) -> str: 

163 """ 

164 Getter method to remove any trailing dots that indicate the hostname is an FQDN. 

165 

166 In general, SSL certificates don't include the trailing dot indicating a 

167 fully-qualified domain name, and thus, they don't validate properly when 

168 checked against a domain name that includes the dot. In addition, some 

169 servers may not expect to receive the trailing dot when provided. 

170 

171 However, the hostname with trailing dot is critical to DNS resolution; doing a 

172 lookup with the trailing dot will properly only resolve the appropriate FQDN, 

173 whereas a lookup without a trailing dot will search the system's search domain 

174 list. Thus, it's important to keep the original host around for use only in 

175 those cases where it's appropriate (i.e., when doing DNS lookup to establish the 

176 actual TCP connection across which we're going to send HTTP requests). 

177 """ 

178 return self._dns_host.rstrip(".") 

179 

180 @host.setter 

181 def host(self, value: str) -> None: 

182 """ 

183 Setter for the `host` property. 

184 

185 We assume that only urllib3 uses the _dns_host attribute; httplib itself 

186 only uses `host`, and it seems reasonable that other libraries follow suit. 

187 """ 

188 self._dns_host = value 

189 

190 def _new_conn(self) -> socket.socket: 

191 """Establish a socket connection and set nodelay settings on it. 

192 

193 :return: New socket connection. 

194 """ 

195 try: 

196 sock = connection.create_connection( 

197 (self._dns_host, self.port), 

198 self.timeout, 

199 source_address=self.source_address, 

200 socket_options=self.socket_options, 

201 ) 

202 except socket.gaierror as e: 

203 raise NameResolutionError(self.host, self, e) from e 

204 except SocketTimeout as e: 

205 raise ConnectTimeoutError( 

206 self, 

207 f"Connection to {self.host} timed out. (connect timeout={self.timeout})", 

208 ) from e 

209 

210 except OSError as e: 

211 raise NewConnectionError( 

212 self, f"Failed to establish a new connection: {e}" 

213 ) from e 

214 

215 # Audit hooks are only available in Python 3.8+ 

216 if _HAS_SYS_AUDIT: 

217 sys.audit("http.client.connect", self, self.host, self.port) 

218 

219 return sock 

220 

221 def set_tunnel( 

222 self, 

223 host: str, 

224 port: int | None = None, 

225 headers: typing.Mapping[str, str] | None = None, 

226 scheme: str = "http", 

227 ) -> None: 

228 if scheme not in ("http", "https"): 

229 raise ValueError( 

230 f"Invalid proxy scheme for tunneling: {scheme!r}, must be either 'http' or 'https'" 

231 ) 

232 super().set_tunnel(host, port=port, headers=headers) 

233 self._tunnel_scheme = scheme 

234 

235 def connect(self) -> None: 

236 self.sock = self._new_conn() 

237 if self._tunnel_host: 

238 # If we're tunneling it means we're connected to our proxy. 

239 self._has_connected_to_proxy = True 

240 

241 # TODO: Fix tunnel so it doesn't depend on self.sock state. 

242 self._tunnel() # type: ignore[attr-defined] 

243 

244 # If there's a proxy to be connected to we are fully connected. 

245 # This is set twice (once above and here) due to forwarding proxies 

246 # not using tunnelling. 

247 self._has_connected_to_proxy = bool(self.proxy) 

248 

249 if self._has_connected_to_proxy: 

250 self.proxy_is_verified = False 

251 

252 @property 

253 def is_closed(self) -> bool: 

254 return self.sock is None 

255 

256 @property 

257 def is_connected(self) -> bool: 

258 if self.sock is None: 

259 return False 

260 return not wait_for_read(self.sock, timeout=0.0) 

261 

262 @property 

263 def has_connected_to_proxy(self) -> bool: 

264 return self._has_connected_to_proxy 

265 

266 @property 

267 def proxy_is_forwarding(self) -> bool: 

268 """ 

269 Return True if a forwarding proxy is configured, else return False 

270 """ 

271 return bool(self.proxy) and self._tunnel_host is None 

272 

273 def close(self) -> None: 

274 try: 

275 super().close() 

276 finally: 

277 # Reset all stateful properties so connection 

278 # can be re-used without leaking prior configs. 

279 self.sock = None 

280 self.is_verified = False 

281 self.proxy_is_verified = None 

282 self._has_connected_to_proxy = False 

283 self._response_options = None 

284 self._tunnel_host = None 

285 self._tunnel_port = None 

286 self._tunnel_scheme = None 

287 

288 def putrequest( 

289 self, 

290 method: str, 

291 url: str, 

292 skip_host: bool = False, 

293 skip_accept_encoding: bool = False, 

294 ) -> None: 

295 """""" 

296 # Empty docstring because the indentation of CPython's implementation 

297 # is broken but we don't want this method in our documentation. 

298 match = _CONTAINS_CONTROL_CHAR_RE.search(method) 

299 if match: 

300 raise ValueError( 

301 f"Method cannot contain non-token characters {method!r} (found at least {match.group()!r})" 

302 ) 

303 

304 return super().putrequest( 

305 method, url, skip_host=skip_host, skip_accept_encoding=skip_accept_encoding 

306 ) 

307 

308 def putheader(self, header: str, *values: str) -> None: # type: ignore[override] 

309 """""" 

310 if not any(isinstance(v, str) and v == SKIP_HEADER for v in values): 

311 super().putheader(header, *values) 

312 elif to_str(header.lower()) not in SKIPPABLE_HEADERS: 

313 skippable_headers = "', '".join( 

314 [str.title(header) for header in sorted(SKIPPABLE_HEADERS)] 

315 ) 

316 raise ValueError( 

317 f"urllib3.util.SKIP_HEADER only supports '{skippable_headers}'" 

318 ) 

319 

320 # `request` method's signature intentionally violates LSP. 

321 # urllib3's API is different from `http.client.HTTPConnection` and the subclassing is only incidental. 

322 def request( # type: ignore[override] 

323 self, 

324 method: str, 

325 url: str, 

326 body: _TYPE_BODY | None = None, 

327 headers: typing.Mapping[str, str] | None = None, 

328 *, 

329 chunked: bool = False, 

330 preload_content: bool = True, 

331 decode_content: bool = True, 

332 enforce_content_length: bool = True, 

333 ) -> None: 

334 # Update the inner socket's timeout value to send the request. 

335 # This only triggers if the connection is re-used. 

336 if self.sock is not None: 

337 self.sock.settimeout(self.timeout) 

338 

339 # Store these values to be fed into the HTTPResponse 

340 # object later. TODO: Remove this in favor of a real 

341 # HTTP lifecycle mechanism. 

342 

343 # We have to store these before we call .request() 

344 # because sometimes we can still salvage a response 

345 # off the wire even if we aren't able to completely 

346 # send the request body. 

347 self._response_options = _ResponseOptions( 

348 request_method=method, 

349 request_url=url, 

350 preload_content=preload_content, 

351 decode_content=decode_content, 

352 enforce_content_length=enforce_content_length, 

353 ) 

354 

355 if headers is None: 

356 headers = {} 

357 header_keys = frozenset(to_str(k.lower()) for k in headers) 

358 skip_accept_encoding = "accept-encoding" in header_keys 

359 skip_host = "host" in header_keys 

360 self.putrequest( 

361 method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host 

362 ) 

363 

364 # Transform the body into an iterable of sendall()-able chunks 

365 # and detect if an explicit Content-Length is doable. 

366 chunks_and_cl = body_to_chunks(body, method=method, blocksize=self.blocksize) 

367 chunks = chunks_and_cl.chunks 

368 content_length = chunks_and_cl.content_length 

369 

370 # When chunked is explicit set to 'True' we respect that. 

371 if chunked: 

372 if "transfer-encoding" not in header_keys: 

373 self.putheader("Transfer-Encoding", "chunked") 

374 else: 

375 # Detect whether a framing mechanism is already in use. If so 

376 # we respect that value, otherwise we pick chunked vs content-length 

377 # depending on the type of 'body'. 

378 if "content-length" in header_keys: 

379 chunked = False 

380 elif "transfer-encoding" in header_keys: 

381 chunked = True 

382 

383 # Otherwise we go off the recommendation of 'body_to_chunks()'. 

384 else: 

385 chunked = False 

386 if content_length is None: 

387 if chunks is not None: 

388 chunked = True 

389 self.putheader("Transfer-Encoding", "chunked") 

390 else: 

391 self.putheader("Content-Length", str(content_length)) 

392 

393 # Now that framing headers are out of the way we send all the other headers. 

394 if "user-agent" not in header_keys: 

395 self.putheader("User-Agent", _get_default_user_agent()) 

396 for header, value in headers.items(): 

397 self.putheader(header, value) 

398 self.endheaders() 

399 

400 # If we're given a body we start sending that in chunks. 

401 if chunks is not None: 

402 for chunk in chunks: 

403 # Sending empty chunks isn't allowed for TE: chunked 

404 # as it indicates the end of the body. 

405 if not chunk: 

406 continue 

407 if isinstance(chunk, str): 

408 chunk = chunk.encode("utf-8") 

409 if chunked: 

410 self.send(b"%x\r\n%b\r\n" % (len(chunk), chunk)) 

411 else: 

412 self.send(chunk) 

413 

414 # Regardless of whether we have a body or not, if we're in 

415 # chunked mode we want to send an explicit empty chunk. 

416 if chunked: 

417 self.send(b"0\r\n\r\n") 

418 

419 def request_chunked( 

420 self, 

421 method: str, 

422 url: str, 

423 body: _TYPE_BODY | None = None, 

424 headers: typing.Mapping[str, str] | None = None, 

425 ) -> None: 

426 """ 

427 Alternative to the common request method, which sends the 

428 body with chunked encoding and not as one block 

429 """ 

430 warnings.warn( 

431 "HTTPConnection.request_chunked() is deprecated and will be removed " 

432 "in urllib3 v2.1.0. Instead use HTTPConnection.request(..., chunked=True).", 

433 category=DeprecationWarning, 

434 stacklevel=2, 

435 ) 

436 self.request(method, url, body=body, headers=headers, chunked=True) 

437 

438 def getresponse( # type: ignore[override] 

439 self, 

440 ) -> HTTPResponse: 

441 """ 

442 Get the response from the server. 

443 

444 If the HTTPConnection is in the correct state, returns an instance of HTTPResponse or of whatever object is returned by the response_class variable. 

445 

446 If a request has not been sent or if a previous response has not be handled, ResponseNotReady is raised. If the HTTP response indicates that the connection should be closed, then it will be closed before the response is returned. When the connection is closed, the underlying socket is closed. 

447 """ 

448 # Raise the same error as http.client.HTTPConnection 

449 if self._response_options is None: 

450 raise ResponseNotReady() 

451 

452 # Reset this attribute for being used again. 

453 resp_options = self._response_options 

454 self._response_options = None 

455 

456 # Since the connection's timeout value may have been updated 

457 # we need to set the timeout on the socket. 

458 self.sock.settimeout(self.timeout) 

459 

460 # This is needed here to avoid circular import errors 

461 from .response import HTTPResponse 

462 

463 # Get the response from http.client.HTTPConnection 

464 httplib_response = super().getresponse() 

465 

466 try: 

467 assert_header_parsing(httplib_response.msg) 

468 except (HeaderParsingError, TypeError) as hpe: 

469 log.warning( 

470 "Failed to parse headers (url=%s): %s", 

471 _url_from_connection(self, resp_options.request_url), 

472 hpe, 

473 exc_info=True, 

474 ) 

475 

476 headers = HTTPHeaderDict(httplib_response.msg.items()) 

477 

478 response = HTTPResponse( 

479 body=httplib_response, 

480 headers=headers, 

481 status=httplib_response.status, 

482 version=httplib_response.version, 

483 version_string=getattr(self, "_http_vsn_str", "HTTP/?"), 

484 reason=httplib_response.reason, 

485 preload_content=resp_options.preload_content, 

486 decode_content=resp_options.decode_content, 

487 original_response=httplib_response, 

488 enforce_content_length=resp_options.enforce_content_length, 

489 request_method=resp_options.request_method, 

490 request_url=resp_options.request_url, 

491 ) 

492 return response 

493 

494 

495class HTTPSConnection(HTTPConnection): 

496 """ 

497 Many of the parameters to this constructor are passed to the underlying SSL 

498 socket by means of :py:func:`urllib3.util.ssl_wrap_socket`. 

499 """ 

500 

501 default_port = port_by_scheme["https"] # type: ignore[misc] 

502 

503 cert_reqs: int | str | None = None 

504 ca_certs: str | None = None 

505 ca_cert_dir: str | None = None 

506 ca_cert_data: None | str | bytes = None 

507 ssl_version: int | str | None = None 

508 ssl_minimum_version: int | None = None 

509 ssl_maximum_version: int | None = None 

510 assert_fingerprint: str | None = None 

511 

512 def __init__( 

513 self, 

514 host: str, 

515 port: int | None = None, 

516 *, 

517 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, 

518 source_address: tuple[str, int] | None = None, 

519 blocksize: int = 16384, 

520 socket_options: None 

521 | (connection._TYPE_SOCKET_OPTIONS) = HTTPConnection.default_socket_options, 

522 proxy: Url | None = None, 

523 proxy_config: ProxyConfig | None = None, 

524 cert_reqs: int | str | None = None, 

525 assert_hostname: None | str | typing.Literal[False] = None, 

526 assert_fingerprint: str | None = None, 

527 server_hostname: str | None = None, 

528 ssl_context: ssl.SSLContext | None = None, 

529 ca_certs: str | None = None, 

530 ca_cert_dir: str | None = None, 

531 ca_cert_data: None | str | bytes = None, 

532 ssl_minimum_version: int | None = None, 

533 ssl_maximum_version: int | None = None, 

534 ssl_version: int | str | None = None, # Deprecated 

535 cert_file: str | None = None, 

536 key_file: str | None = None, 

537 key_password: str | None = None, 

538 ) -> None: 

539 super().__init__( 

540 host, 

541 port=port, 

542 timeout=timeout, 

543 source_address=source_address, 

544 blocksize=blocksize, 

545 socket_options=socket_options, 

546 proxy=proxy, 

547 proxy_config=proxy_config, 

548 ) 

549 

550 self.key_file = key_file 

551 self.cert_file = cert_file 

552 self.key_password = key_password 

553 self.ssl_context = ssl_context 

554 self.server_hostname = server_hostname 

555 self.assert_hostname = assert_hostname 

556 self.assert_fingerprint = assert_fingerprint 

557 self.ssl_version = ssl_version 

558 self.ssl_minimum_version = ssl_minimum_version 

559 self.ssl_maximum_version = ssl_maximum_version 

560 self.ca_certs = ca_certs and os.path.expanduser(ca_certs) 

561 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir) 

562 self.ca_cert_data = ca_cert_data 

563 

564 # cert_reqs depends on ssl_context so calculate last. 

565 if cert_reqs is None: 

566 if self.ssl_context is not None: 

567 cert_reqs = self.ssl_context.verify_mode 

568 else: 

569 cert_reqs = resolve_cert_reqs(None) 

570 self.cert_reqs = cert_reqs 

571 

572 def set_cert( 

573 self, 

574 key_file: str | None = None, 

575 cert_file: str | None = None, 

576 cert_reqs: int | str | None = None, 

577 key_password: str | None = None, 

578 ca_certs: str | None = None, 

579 assert_hostname: None | str | typing.Literal[False] = None, 

580 assert_fingerprint: str | None = None, 

581 ca_cert_dir: str | None = None, 

582 ca_cert_data: None | str | bytes = None, 

583 ) -> None: 

584 """ 

585 This method should only be called once, before the connection is used. 

586 """ 

587 warnings.warn( 

588 "HTTPSConnection.set_cert() is deprecated and will be removed " 

589 "in urllib3 v2.1.0. Instead provide the parameters to the " 

590 "HTTPSConnection constructor.", 

591 category=DeprecationWarning, 

592 stacklevel=2, 

593 ) 

594 

595 # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also 

596 # have an SSLContext object in which case we'll use its verify_mode. 

597 if cert_reqs is None: 

598 if self.ssl_context is not None: 

599 cert_reqs = self.ssl_context.verify_mode 

600 else: 

601 cert_reqs = resolve_cert_reqs(None) 

602 

603 self.key_file = key_file 

604 self.cert_file = cert_file 

605 self.cert_reqs = cert_reqs 

606 self.key_password = key_password 

607 self.assert_hostname = assert_hostname 

608 self.assert_fingerprint = assert_fingerprint 

609 self.ca_certs = ca_certs and os.path.expanduser(ca_certs) 

610 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir) 

611 self.ca_cert_data = ca_cert_data 

612 

613 def connect(self) -> None: 

614 sock: socket.socket | ssl.SSLSocket 

615 self.sock = sock = self._new_conn() 

616 server_hostname: str = self.host 

617 tls_in_tls = False 

618 

619 # Do we need to establish a tunnel? 

620 if self._tunnel_host is not None: 

621 # We're tunneling to an HTTPS origin so need to do TLS-in-TLS. 

622 if self._tunnel_scheme == "https": 

623 # _connect_tls_proxy will verify and assign proxy_is_verified 

624 self.sock = sock = self._connect_tls_proxy(self.host, sock) 

625 tls_in_tls = True 

626 elif self._tunnel_scheme == "http": 

627 self.proxy_is_verified = False 

628 

629 # If we're tunneling it means we're connected to our proxy. 

630 self._has_connected_to_proxy = True 

631 

632 self._tunnel() # type: ignore[attr-defined] 

633 # Override the host with the one we're requesting data from. 

634 server_hostname = self._tunnel_host 

635 

636 if self.server_hostname is not None: 

637 server_hostname = self.server_hostname 

638 

639 is_time_off = datetime.date.today() < RECENT_DATE 

640 if is_time_off: 

641 warnings.warn( 

642 ( 

643 f"System time is way off (before {RECENT_DATE}). This will probably " 

644 "lead to SSL verification errors" 

645 ), 

646 SystemTimeWarning, 

647 ) 

648 

649 # Remove trailing '.' from fqdn hostnames to allow certificate validation 

650 server_hostname_rm_dot = server_hostname.rstrip(".") 

651 

652 sock_and_verified = _ssl_wrap_socket_and_match_hostname( 

653 sock=sock, 

654 cert_reqs=self.cert_reqs, 

655 ssl_version=self.ssl_version, 

656 ssl_minimum_version=self.ssl_minimum_version, 

657 ssl_maximum_version=self.ssl_maximum_version, 

658 ca_certs=self.ca_certs, 

659 ca_cert_dir=self.ca_cert_dir, 

660 ca_cert_data=self.ca_cert_data, 

661 cert_file=self.cert_file, 

662 key_file=self.key_file, 

663 key_password=self.key_password, 

664 server_hostname=server_hostname_rm_dot, 

665 ssl_context=self.ssl_context, 

666 tls_in_tls=tls_in_tls, 

667 assert_hostname=self.assert_hostname, 

668 assert_fingerprint=self.assert_fingerprint, 

669 ) 

670 self.sock = sock_and_verified.socket 

671 

672 # Forwarding proxies can never have a verified target since 

673 # the proxy is the one doing the verification. Should instead 

674 # use a CONNECT tunnel in order to verify the target. 

675 # See: https://github.com/urllib3/urllib3/issues/3267. 

676 if self.proxy_is_forwarding: 

677 self.is_verified = False 

678 else: 

679 self.is_verified = sock_and_verified.is_verified 

680 

681 # If there's a proxy to be connected to we are fully connected. 

682 # This is set twice (once above and here) due to forwarding proxies 

683 # not using tunnelling. 

684 self._has_connected_to_proxy = bool(self.proxy) 

685 

686 # Set `self.proxy_is_verified` unless it's already set while 

687 # establishing a tunnel. 

688 if self._has_connected_to_proxy and self.proxy_is_verified is None: 

689 self.proxy_is_verified = sock_and_verified.is_verified 

690 

691 def _connect_tls_proxy(self, hostname: str, sock: socket.socket) -> ssl.SSLSocket: 

692 """ 

693 Establish a TLS connection to the proxy using the provided SSL context. 

694 """ 

695 # `_connect_tls_proxy` is called when self._tunnel_host is truthy. 

696 proxy_config = typing.cast(ProxyConfig, self.proxy_config) 

697 ssl_context = proxy_config.ssl_context 

698 sock_and_verified = _ssl_wrap_socket_and_match_hostname( 

699 sock, 

700 cert_reqs=self.cert_reqs, 

701 ssl_version=self.ssl_version, 

702 ssl_minimum_version=self.ssl_minimum_version, 

703 ssl_maximum_version=self.ssl_maximum_version, 

704 ca_certs=self.ca_certs, 

705 ca_cert_dir=self.ca_cert_dir, 

706 ca_cert_data=self.ca_cert_data, 

707 server_hostname=hostname, 

708 ssl_context=ssl_context, 

709 assert_hostname=proxy_config.assert_hostname, 

710 assert_fingerprint=proxy_config.assert_fingerprint, 

711 # Features that aren't implemented for proxies yet: 

712 cert_file=None, 

713 key_file=None, 

714 key_password=None, 

715 tls_in_tls=False, 

716 ) 

717 self.proxy_is_verified = sock_and_verified.is_verified 

718 return sock_and_verified.socket # type: ignore[return-value] 

719 

720 

721class _WrappedAndVerifiedSocket(typing.NamedTuple): 

722 """ 

723 Wrapped socket and whether the connection is 

724 verified after the TLS handshake 

725 """ 

726 

727 socket: ssl.SSLSocket | SSLTransport 

728 is_verified: bool 

729 

730 

731def _ssl_wrap_socket_and_match_hostname( 

732 sock: socket.socket, 

733 *, 

734 cert_reqs: None | str | int, 

735 ssl_version: None | str | int, 

736 ssl_minimum_version: int | None, 

737 ssl_maximum_version: int | None, 

738 cert_file: str | None, 

739 key_file: str | None, 

740 key_password: str | None, 

741 ca_certs: str | None, 

742 ca_cert_dir: str | None, 

743 ca_cert_data: None | str | bytes, 

744 assert_hostname: None | str | typing.Literal[False], 

745 assert_fingerprint: str | None, 

746 server_hostname: str | None, 

747 ssl_context: ssl.SSLContext | None, 

748 tls_in_tls: bool = False, 

749) -> _WrappedAndVerifiedSocket: 

750 """Logic for constructing an SSLContext from all TLS parameters, passing 

751 that down into ssl_wrap_socket, and then doing certificate verification 

752 either via hostname or fingerprint. This function exists to guarantee 

753 that both proxies and targets have the same behavior when connecting via TLS. 

754 """ 

755 default_ssl_context = False 

756 if ssl_context is None: 

757 default_ssl_context = True 

758 context = create_urllib3_context( 

759 ssl_version=resolve_ssl_version(ssl_version), 

760 ssl_minimum_version=ssl_minimum_version, 

761 ssl_maximum_version=ssl_maximum_version, 

762 cert_reqs=resolve_cert_reqs(cert_reqs), 

763 ) 

764 else: 

765 context = ssl_context 

766 

767 context.verify_mode = resolve_cert_reqs(cert_reqs) 

768 

769 # In some cases, we want to verify hostnames ourselves 

770 if ( 

771 # `ssl` can't verify fingerprints or alternate hostnames 

772 assert_fingerprint 

773 or assert_hostname 

774 # assert_hostname can be set to False to disable hostname checking 

775 or assert_hostname is False 

776 # We still support OpenSSL 1.0.2, which prevents us from verifying 

777 # hostnames easily: https://github.com/pyca/pyopenssl/pull/933 

778 or ssl_.IS_PYOPENSSL 

779 or not ssl_.HAS_NEVER_CHECK_COMMON_NAME 

780 ): 

781 context.check_hostname = False 

782 

783 # Try to load OS default certs if none are given. We need to do the hasattr() check 

784 # for custom pyOpenSSL SSLContext objects because they don't support 

785 # load_default_certs(). 

786 if ( 

787 not ca_certs 

788 and not ca_cert_dir 

789 and not ca_cert_data 

790 and default_ssl_context 

791 and hasattr(context, "load_default_certs") 

792 ): 

793 context.load_default_certs() 

794 

795 # Ensure that IPv6 addresses are in the proper format and don't have a 

796 # scope ID. Python's SSL module fails to recognize scoped IPv6 addresses 

797 # and interprets them as DNS hostnames. 

798 if server_hostname is not None: 

799 normalized = server_hostname.strip("[]") 

800 if "%" in normalized: 

801 normalized = normalized[: normalized.rfind("%")] 

802 if is_ipaddress(normalized): 

803 server_hostname = normalized 

804 

805 ssl_sock = ssl_wrap_socket( 

806 sock=sock, 

807 keyfile=key_file, 

808 certfile=cert_file, 

809 key_password=key_password, 

810 ca_certs=ca_certs, 

811 ca_cert_dir=ca_cert_dir, 

812 ca_cert_data=ca_cert_data, 

813 server_hostname=server_hostname, 

814 ssl_context=context, 

815 tls_in_tls=tls_in_tls, 

816 ) 

817 

818 try: 

819 if assert_fingerprint: 

820 _assert_fingerprint( 

821 ssl_sock.getpeercert(binary_form=True), assert_fingerprint 

822 ) 

823 elif ( 

824 context.verify_mode != ssl.CERT_NONE 

825 and not context.check_hostname 

826 and assert_hostname is not False 

827 ): 

828 cert: _TYPE_PEER_CERT_RET_DICT = ssl_sock.getpeercert() # type: ignore[assignment] 

829 

830 # Need to signal to our match_hostname whether to use 'commonName' or not. 

831 # If we're using our own constructed SSLContext we explicitly set 'False' 

832 # because PyPy hard-codes 'True' from SSLContext.hostname_checks_common_name. 

833 if default_ssl_context: 

834 hostname_checks_common_name = False 

835 else: 

836 hostname_checks_common_name = ( 

837 getattr(context, "hostname_checks_common_name", False) or False 

838 ) 

839 

840 _match_hostname( 

841 cert, 

842 assert_hostname or server_hostname, # type: ignore[arg-type] 

843 hostname_checks_common_name, 

844 ) 

845 

846 return _WrappedAndVerifiedSocket( 

847 socket=ssl_sock, 

848 is_verified=context.verify_mode == ssl.CERT_REQUIRED 

849 or bool(assert_fingerprint), 

850 ) 

851 except BaseException: 

852 ssl_sock.close() 

853 raise 

854 

855 

856def _match_hostname( 

857 cert: _TYPE_PEER_CERT_RET_DICT | None, 

858 asserted_hostname: str, 

859 hostname_checks_common_name: bool = False, 

860) -> None: 

861 # Our upstream implementation of ssl.match_hostname() 

862 # only applies this normalization to IP addresses so it doesn't 

863 # match DNS SANs so we do the same thing! 

864 stripped_hostname = asserted_hostname.strip("[]") 

865 if is_ipaddress(stripped_hostname): 

866 asserted_hostname = stripped_hostname 

867 

868 try: 

869 match_hostname(cert, asserted_hostname, hostname_checks_common_name) 

870 except CertificateError as e: 

871 log.warning( 

872 "Certificate did not match expected hostname: %s. Certificate: %s", 

873 asserted_hostname, 

874 cert, 

875 ) 

876 # Add cert to exception and reraise so client code can inspect 

877 # the cert when catching the exception, if they want to 

878 e._peer_cert = cert # type: ignore[attr-defined] 

879 raise 

880 

881 

882def _wrap_proxy_error(err: Exception, proxy_scheme: str | None) -> ProxyError: 

883 # Look for the phrase 'wrong version number', if found 

884 # then we should warn the user that we're very sure that 

885 # this proxy is HTTP-only and they have a configuration issue. 

886 error_normalized = " ".join(re.split("[^a-z]", str(err).lower())) 

887 is_likely_http_proxy = ( 

888 "wrong version number" in error_normalized 

889 or "unknown protocol" in error_normalized 

890 or "record layer failure" in error_normalized 

891 ) 

892 http_proxy_warning = ( 

893 ". Your proxy appears to only use HTTP and not HTTPS, " 

894 "try changing your proxy URL to be HTTP. See: " 

895 "https://urllib3.readthedocs.io/en/latest/advanced-usage.html" 

896 "#https-proxy-error-http-proxy" 

897 ) 

898 new_err = ProxyError( 

899 f"Unable to connect to proxy" 

900 f"{http_proxy_warning if is_likely_http_proxy and proxy_scheme == 'https' else ''}", 

901 err, 

902 ) 

903 new_err.__cause__ = err 

904 return new_err 

905 

906 

907def _get_default_user_agent() -> str: 

908 return f"python-urllib3/{__version__}" 

909 

910 

911class DummyConnection: 

912 """Used to detect a failed ConnectionCls import.""" 

913 

914 

915if not ssl: 

916 HTTPSConnection = DummyConnection # type: ignore[misc, assignment] # noqa: F811 

917 

918 

919VerifiedHTTPSConnection = HTTPSConnection 

920 

921 

922def _url_from_connection( 

923 conn: HTTPConnection | HTTPSConnection, path: str | None = None 

924) -> str: 

925 """Returns the URL from a given connection. This is mainly used for testing and logging.""" 

926 

927 scheme = "https" if isinstance(conn, HTTPSConnection) else "http" 

928 

929 return Url(scheme=scheme, host=conn.host, port=conn.port, path=path).url