Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/connection.py: 30%

324 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:05 +0000

1from __future__ import annotations 

2 

3import datetime 

4import logging 

5import os 

6import re 

7import socket 

8import sys 

9import typing 

10import warnings 

11from http.client import HTTPConnection as _HTTPConnection 

12from http.client import HTTPException as HTTPException # noqa: F401 

13from http.client import ResponseNotReady 

14from socket import timeout as SocketTimeout 

15 

16if typing.TYPE_CHECKING: 

17 from typing import Literal 

18 

19 from .response import HTTPResponse 

20 from .util.ssl_ import _TYPE_PEER_CERT_RET_DICT 

21 from .util.ssltransport import SSLTransport 

22 

23from ._collections import HTTPHeaderDict 

24from .util.response import assert_header_parsing 

25from .util.timeout import _DEFAULT_TIMEOUT, _TYPE_TIMEOUT, Timeout 

26from .util.util import to_str 

27from .util.wait import wait_for_read 

28 

29try: # Compiled with SSL? 

30 import ssl 

31 

32 BaseSSLError = ssl.SSLError 

33except (ImportError, AttributeError): 

34 ssl = None # type: ignore[assignment] 

35 

36 class BaseSSLError(BaseException): # type: ignore[no-redef] 

37 pass 

38 

39 

40from ._base_connection import _TYPE_BODY 

41from ._base_connection import ProxyConfig as ProxyConfig 

42from ._base_connection import _ResponseOptions as _ResponseOptions 

43from ._version import __version__ 

44from .exceptions import ( 

45 ConnectTimeoutError, 

46 HeaderParsingError, 

47 NameResolutionError, 

48 NewConnectionError, 

49 ProxyError, 

50 SystemTimeWarning, 

51) 

52from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection, ssl_ 

53from .util.request import body_to_chunks 

54from .util.ssl_ import assert_fingerprint as _assert_fingerprint 

55from .util.ssl_ import ( 

56 create_urllib3_context, 

57 is_ipaddress, 

58 resolve_cert_reqs, 

59 resolve_ssl_version, 

60 ssl_wrap_socket, 

61) 

62from .util.ssl_match_hostname import CertificateError, match_hostname 

63from .util.url import Url 

64 

65# Not a no-op, we're adding this to the namespace so it can be imported. 

66ConnectionError = ConnectionError 

67BrokenPipeError = BrokenPipeError 

68 

69 

70log = logging.getLogger(__name__) 

71 

72port_by_scheme = {"http": 80, "https": 443} 

73 

74# When it comes time to update this value as a part of regular maintenance 

75# (ie test_recent_date is failing) update it to ~6 months before the current date. 

76RECENT_DATE = datetime.date(2022, 1, 1) 

77 

78_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

79 

80_HAS_SYS_AUDIT = hasattr(sys, "audit") 

81 

82 

83class HTTPConnection(_HTTPConnection): 

84 """ 

85 Based on :class:`http.client.HTTPConnection` but provides an extra constructor 

86 backwards-compatibility layer between older and newer Pythons. 

87 

88 Additional keyword parameters are used to configure attributes of the connection. 

89 Accepted parameters include: 

90 

91 - ``source_address``: Set the source address for the current connection. 

92 - ``socket_options``: Set specific options on the underlying socket. If not specified, then 

93 defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling 

94 Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy. 

95 

96 For example, if you wish to enable TCP Keep Alive in addition to the defaults, 

97 you might pass: 

98 

99 .. code-block:: python 

100 

101 HTTPConnection.default_socket_options + [ 

102 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), 

103 ] 

104 

105 Or you may want to disable the defaults by passing an empty list (e.g., ``[]``). 

106 """ 

107 

108 default_port: typing.ClassVar[int] = port_by_scheme["http"] # type: ignore[misc] 

109 

110 #: Disable Nagle's algorithm by default. 

111 #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]`` 

112 default_socket_options: typing.ClassVar[connection._TYPE_SOCKET_OPTIONS] = [ 

113 (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

114 ] 

115 

116 #: Whether this connection verifies the host's certificate. 

117 is_verified: bool = False 

118 

119 #: Whether this proxy connection verified the proxy host's certificate. 

120 # If no proxy is currently connected to the value will be ``None``. 

121 proxy_is_verified: bool | None = None 

122 

123 blocksize: int 

124 source_address: tuple[str, int] | None 

125 socket_options: connection._TYPE_SOCKET_OPTIONS | None 

126 

127 _has_connected_to_proxy: bool 

128 _response_options: _ResponseOptions | None 

129 _tunnel_host: str | None 

130 _tunnel_port: int | None 

131 _tunnel_scheme: str | None 

132 

133 def __init__( 

134 self, 

135 host: str, 

136 port: int | None = None, 

137 *, 

138 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, 

139 source_address: tuple[str, int] | None = None, 

140 blocksize: int = 16384, 

141 socket_options: None 

142 | (connection._TYPE_SOCKET_OPTIONS) = default_socket_options, 

143 proxy: Url | None = None, 

144 proxy_config: ProxyConfig | None = None, 

145 ) -> None: 

146 super().__init__( 

147 host=host, 

148 port=port, 

149 timeout=Timeout.resolve_default_timeout(timeout), 

150 source_address=source_address, 

151 blocksize=blocksize, 

152 ) 

153 self.socket_options = socket_options 

154 self.proxy = proxy 

155 self.proxy_config = proxy_config 

156 

157 self._has_connected_to_proxy = False 

158 self._response_options = None 

159 self._tunnel_host: str | None = None 

160 self._tunnel_port: int | None = None 

161 self._tunnel_scheme: str | None = None 

162 

163 # https://github.com/python/mypy/issues/4125 

164 # Mypy treats this as LSP violation, which is considered a bug. 

165 # If `host` is made a property it violates LSP, because a writeable attribute is overridden with a read-only one. 

166 # However, there is also a `host` setter so LSP is not violated. 

167 # Potentially, a `@host.deleter` might be needed depending on how this issue will be fixed. 

168 @property 

169 def host(self) -> str: 

170 """ 

171 Getter method to remove any trailing dots that indicate the hostname is an FQDN. 

172 

173 In general, SSL certificates don't include the trailing dot indicating a 

174 fully-qualified domain name, and thus, they don't validate properly when 

175 checked against a domain name that includes the dot. In addition, some 

176 servers may not expect to receive the trailing dot when provided. 

177 

178 However, the hostname with trailing dot is critical to DNS resolution; doing a 

179 lookup with the trailing dot will properly only resolve the appropriate FQDN, 

180 whereas a lookup without a trailing dot will search the system's search domain 

181 list. Thus, it's important to keep the original host around for use only in 

182 those cases where it's appropriate (i.e., when doing DNS lookup to establish the 

183 actual TCP connection across which we're going to send HTTP requests). 

184 """ 

185 return self._dns_host.rstrip(".") 

186 

187 @host.setter 

188 def host(self, value: str) -> None: 

189 """ 

190 Setter for the `host` property. 

191 

192 We assume that only urllib3 uses the _dns_host attribute; httplib itself 

193 only uses `host`, and it seems reasonable that other libraries follow suit. 

194 """ 

195 self._dns_host = value 

196 

197 def _new_conn(self) -> socket.socket: 

198 """Establish a socket connection and set nodelay settings on it. 

199 

200 :return: New socket connection. 

201 """ 

202 try: 

203 sock = connection.create_connection( 

204 (self._dns_host, self.port), 

205 self.timeout, 

206 source_address=self.source_address, 

207 socket_options=self.socket_options, 

208 ) 

209 except socket.gaierror as e: 

210 raise NameResolutionError(self.host, self, e) from e 

211 except SocketTimeout as e: 

212 raise ConnectTimeoutError( 

213 self, 

214 f"Connection to {self.host} timed out. (connect timeout={self.timeout})", 

215 ) from e 

216 

217 except OSError as e: 

218 raise NewConnectionError( 

219 self, f"Failed to establish a new connection: {e}" 

220 ) from e 

221 

222 # Audit hooks are only available in Python 3.8+ 

223 if _HAS_SYS_AUDIT: 

224 sys.audit("http.client.connect", self, self.host, self.port) 

225 

226 return sock 

227 

228 def set_tunnel( 

229 self, 

230 host: str, 

231 port: int | None = None, 

232 headers: typing.Mapping[str, str] | None = None, 

233 scheme: str = "http", 

234 ) -> None: 

235 if scheme not in ("http", "https"): 

236 raise ValueError( 

237 f"Invalid proxy scheme for tunneling: {scheme!r}, must be either 'http' or 'https'" 

238 ) 

239 super().set_tunnel(host, port=port, headers=headers) 

240 self._tunnel_scheme = scheme 

241 

242 def connect(self) -> None: 

243 self.sock = self._new_conn() 

244 if self._tunnel_host: 

245 # If we're tunneling it means we're connected to our proxy. 

246 self._has_connected_to_proxy = True 

247 

248 # TODO: Fix tunnel so it doesn't depend on self.sock state. 

249 self._tunnel() # type: ignore[attr-defined] 

250 

251 # If there's a proxy to be connected to we are fully connected. 

252 # This is set twice (once above and here) due to forwarding proxies 

253 # not using tunnelling. 

254 self._has_connected_to_proxy = bool(self.proxy) 

255 

256 @property 

257 def is_closed(self) -> bool: 

258 return self.sock is None 

259 

260 @property 

261 def is_connected(self) -> bool: 

262 if self.sock is None: 

263 return False 

264 return not wait_for_read(self.sock, timeout=0.0) 

265 

266 @property 

267 def has_connected_to_proxy(self) -> bool: 

268 return self._has_connected_to_proxy 

269 

270 def close(self) -> None: 

271 try: 

272 super().close() 

273 finally: 

274 # Reset all stateful properties so connection 

275 # can be re-used without leaking prior configs. 

276 self.sock = None 

277 self.is_verified = False 

278 self.proxy_is_verified = None 

279 self._has_connected_to_proxy = False 

280 self._response_options = None 

281 self._tunnel_host = None 

282 self._tunnel_port = None 

283 self._tunnel_scheme = None 

284 

285 def putrequest( 

286 self, 

287 method: str, 

288 url: str, 

289 skip_host: bool = False, 

290 skip_accept_encoding: bool = False, 

291 ) -> None: 

292 """""" 

293 # Empty docstring because the indentation of CPython's implementation 

294 # is broken but we don't want this method in our documentation. 

295 match = _CONTAINS_CONTROL_CHAR_RE.search(method) 

296 if match: 

297 raise ValueError( 

298 f"Method cannot contain non-token characters {method!r} (found at least {match.group()!r})" 

299 ) 

300 

301 return super().putrequest( 

302 method, url, skip_host=skip_host, skip_accept_encoding=skip_accept_encoding 

303 ) 

304 

305 def putheader(self, header: str, *values: str) -> None: 

306 """""" 

307 if not any(isinstance(v, str) and v == SKIP_HEADER for v in values): 

308 super().putheader(header, *values) 

309 elif to_str(header.lower()) not in SKIPPABLE_HEADERS: 

310 skippable_headers = "', '".join( 

311 [str.title(header) for header in sorted(SKIPPABLE_HEADERS)] 

312 ) 

313 raise ValueError( 

314 f"urllib3.util.SKIP_HEADER only supports '{skippable_headers}'" 

315 ) 

316 

317 # `request` method's signature intentionally violates LSP. 

318 # urllib3's API is different from `http.client.HTTPConnection` and the subclassing is only incidental. 

319 def request( # type: ignore[override] 

320 self, 

321 method: str, 

322 url: str, 

323 body: _TYPE_BODY | None = None, 

324 headers: typing.Mapping[str, str] | None = None, 

325 *, 

326 chunked: bool = False, 

327 preload_content: bool = True, 

328 decode_content: bool = True, 

329 enforce_content_length: bool = True, 

330 ) -> None: 

331 # Update the inner socket's timeout value to send the request. 

332 # This only triggers if the connection is re-used. 

333 if self.sock is not None: 

334 self.sock.settimeout(self.timeout) 

335 

336 # Store these values to be fed into the HTTPResponse 

337 # object later. TODO: Remove this in favor of a real 

338 # HTTP lifecycle mechanism. 

339 

340 # We have to store these before we call .request() 

341 # because sometimes we can still salvage a response 

342 # off the wire even if we aren't able to completely 

343 # send the request body. 

344 self._response_options = _ResponseOptions( 

345 request_method=method, 

346 request_url=url, 

347 preload_content=preload_content, 

348 decode_content=decode_content, 

349 enforce_content_length=enforce_content_length, 

350 ) 

351 

352 if headers is None: 

353 headers = {} 

354 header_keys = frozenset(to_str(k.lower()) for k in headers) 

355 skip_accept_encoding = "accept-encoding" in header_keys 

356 skip_host = "host" in header_keys 

357 self.putrequest( 

358 method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host 

359 ) 

360 

361 # Transform the body into an iterable of sendall()-able chunks 

362 # and detect if an explicit Content-Length is doable. 

363 chunks_and_cl = body_to_chunks(body, method=method, blocksize=self.blocksize) 

364 chunks = chunks_and_cl.chunks 

365 content_length = chunks_and_cl.content_length 

366 

367 # When chunked is explicit set to 'True' we respect that. 

368 if chunked: 

369 if "transfer-encoding" not in header_keys: 

370 self.putheader("Transfer-Encoding", "chunked") 

371 else: 

372 # Detect whether a framing mechanism is already in use. If so 

373 # we respect that value, otherwise we pick chunked vs content-length 

374 # depending on the type of 'body'. 

375 if "content-length" in header_keys: 

376 chunked = False 

377 elif "transfer-encoding" in header_keys: 

378 chunked = True 

379 

380 # Otherwise we go off the recommendation of 'body_to_chunks()'. 

381 else: 

382 chunked = False 

383 if content_length is None: 

384 if chunks is not None: 

385 chunked = True 

386 self.putheader("Transfer-Encoding", "chunked") 

387 else: 

388 self.putheader("Content-Length", str(content_length)) 

389 

390 # Now that framing headers are out of the way we send all the other headers. 

391 if "user-agent" not in header_keys: 

392 self.putheader("User-Agent", _get_default_user_agent()) 

393 for header, value in headers.items(): 

394 self.putheader(header, value) 

395 self.endheaders() 

396 

397 # If we're given a body we start sending that in chunks. 

398 if chunks is not None: 

399 for chunk in chunks: 

400 # Sending empty chunks isn't allowed for TE: chunked 

401 # as it indicates the end of the body. 

402 if not chunk: 

403 continue 

404 if isinstance(chunk, str): 

405 chunk = chunk.encode("utf-8") 

406 if chunked: 

407 self.send(b"%x\r\n%b\r\n" % (len(chunk), chunk)) 

408 else: 

409 self.send(chunk) 

410 

411 # Regardless of whether we have a body or not, if we're in 

412 # chunked mode we want to send an explicit empty chunk. 

413 if chunked: 

414 self.send(b"0\r\n\r\n") 

415 

416 def request_chunked( 

417 self, 

418 method: str, 

419 url: str, 

420 body: _TYPE_BODY | None = None, 

421 headers: typing.Mapping[str, str] | None = None, 

422 ) -> None: 

423 """ 

424 Alternative to the common request method, which sends the 

425 body with chunked encoding and not as one block 

426 """ 

427 warnings.warn( 

428 "HTTPConnection.request_chunked() is deprecated and will be removed " 

429 "in urllib3 v2.1.0. Instead use HTTPConnection.request(..., chunked=True).", 

430 category=DeprecationWarning, 

431 stacklevel=2, 

432 ) 

433 self.request(method, url, body=body, headers=headers, chunked=True) 

434 

435 def getresponse( # type: ignore[override] 

436 self, 

437 ) -> HTTPResponse: 

438 """ 

439 Get the response from the server. 

440 

441 If the HTTPConnection is in the correct state, returns an instance of HTTPResponse or of whatever object is returned by the response_class variable. 

442 

443 If a request has not been sent or if a previous response has not be handled, ResponseNotReady is raised. If the HTTP response indicates that the connection should be closed, then it will be closed before the response is returned. When the connection is closed, the underlying socket is closed. 

444 """ 

445 # Raise the same error as http.client.HTTPConnection 

446 if self._response_options is None: 

447 raise ResponseNotReady() 

448 

449 # Reset this attribute for being used again. 

450 resp_options = self._response_options 

451 self._response_options = None 

452 

453 # Since the connection's timeout value may have been updated 

454 # we need to set the timeout on the socket. 

455 self.sock.settimeout(self.timeout) 

456 

457 # This is needed here to avoid circular import errors 

458 from .response import HTTPResponse 

459 

460 # Get the response from http.client.HTTPConnection 

461 httplib_response = super().getresponse() 

462 

463 try: 

464 assert_header_parsing(httplib_response.msg) 

465 except (HeaderParsingError, TypeError) as hpe: 

466 log.warning( 

467 "Failed to parse headers (url=%s): %s", 

468 _url_from_connection(self, resp_options.request_url), 

469 hpe, 

470 exc_info=True, 

471 ) 

472 

473 headers = HTTPHeaderDict(httplib_response.msg.items()) 

474 

475 response = HTTPResponse( 

476 body=httplib_response, 

477 headers=headers, 

478 status=httplib_response.status, 

479 version=httplib_response.version, 

480 reason=httplib_response.reason, 

481 preload_content=resp_options.preload_content, 

482 decode_content=resp_options.decode_content, 

483 original_response=httplib_response, 

484 enforce_content_length=resp_options.enforce_content_length, 

485 request_method=resp_options.request_method, 

486 request_url=resp_options.request_url, 

487 ) 

488 return response 

489 

490 

491class HTTPSConnection(HTTPConnection): 

492 """ 

493 Many of the parameters to this constructor are passed to the underlying SSL 

494 socket by means of :py:func:`urllib3.util.ssl_wrap_socket`. 

495 """ 

496 

497 default_port = port_by_scheme["https"] # type: ignore[misc] 

498 

499 cert_reqs: int | str | None = None 

500 ca_certs: str | None = None 

501 ca_cert_dir: str | None = None 

502 ca_cert_data: None | str | bytes = None 

503 ssl_version: int | str | None = None 

504 ssl_minimum_version: int | None = None 

505 ssl_maximum_version: int | None = None 

506 assert_fingerprint: str | None = None 

507 

508 def __init__( 

509 self, 

510 host: str, 

511 port: int | None = None, 

512 *, 

513 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, 

514 source_address: tuple[str, int] | None = None, 

515 blocksize: int = 16384, 

516 socket_options: None 

517 | (connection._TYPE_SOCKET_OPTIONS) = HTTPConnection.default_socket_options, 

518 proxy: Url | None = None, 

519 proxy_config: ProxyConfig | None = None, 

520 cert_reqs: int | str | None = None, 

521 assert_hostname: None | str | Literal[False] = None, 

522 assert_fingerprint: str | None = None, 

523 server_hostname: str | None = None, 

524 ssl_context: ssl.SSLContext | None = None, 

525 ca_certs: str | None = None, 

526 ca_cert_dir: str | None = None, 

527 ca_cert_data: None | str | bytes = None, 

528 ssl_minimum_version: int | None = None, 

529 ssl_maximum_version: int | None = None, 

530 ssl_version: int | str | None = None, # Deprecated 

531 cert_file: str | None = None, 

532 key_file: str | None = None, 

533 key_password: str | None = None, 

534 ) -> None: 

535 super().__init__( 

536 host, 

537 port=port, 

538 timeout=timeout, 

539 source_address=source_address, 

540 blocksize=blocksize, 

541 socket_options=socket_options, 

542 proxy=proxy, 

543 proxy_config=proxy_config, 

544 ) 

545 

546 self.key_file = key_file 

547 self.cert_file = cert_file 

548 self.key_password = key_password 

549 self.ssl_context = ssl_context 

550 self.server_hostname = server_hostname 

551 self.assert_hostname = assert_hostname 

552 self.assert_fingerprint = assert_fingerprint 

553 self.ssl_version = ssl_version 

554 self.ssl_minimum_version = ssl_minimum_version 

555 self.ssl_maximum_version = ssl_maximum_version 

556 self.ca_certs = ca_certs and os.path.expanduser(ca_certs) 

557 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir) 

558 self.ca_cert_data = ca_cert_data 

559 

560 # cert_reqs depends on ssl_context so calculate last. 

561 if cert_reqs is None: 

562 if self.ssl_context is not None: 

563 cert_reqs = self.ssl_context.verify_mode 

564 else: 

565 cert_reqs = resolve_cert_reqs(None) 

566 self.cert_reqs = cert_reqs 

567 

568 def set_cert( 

569 self, 

570 key_file: str | None = None, 

571 cert_file: str | None = None, 

572 cert_reqs: int | str | None = None, 

573 key_password: str | None = None, 

574 ca_certs: str | None = None, 

575 assert_hostname: None | str | Literal[False] = None, 

576 assert_fingerprint: str | None = None, 

577 ca_cert_dir: str | None = None, 

578 ca_cert_data: None | str | bytes = None, 

579 ) -> None: 

580 """ 

581 This method should only be called once, before the connection is used. 

582 """ 

583 warnings.warn( 

584 "HTTPSConnection.set_cert() is deprecated and will be removed " 

585 "in urllib3 v2.1.0. Instead provide the parameters to the " 

586 "HTTPSConnection constructor.", 

587 category=DeprecationWarning, 

588 stacklevel=2, 

589 ) 

590 

591 # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also 

592 # have an SSLContext object in which case we'll use its verify_mode. 

593 if cert_reqs is None: 

594 if self.ssl_context is not None: 

595 cert_reqs = self.ssl_context.verify_mode 

596 else: 

597 cert_reqs = resolve_cert_reqs(None) 

598 

599 self.key_file = key_file 

600 self.cert_file = cert_file 

601 self.cert_reqs = cert_reqs 

602 self.key_password = key_password 

603 self.assert_hostname = assert_hostname 

604 self.assert_fingerprint = assert_fingerprint 

605 self.ca_certs = ca_certs and os.path.expanduser(ca_certs) 

606 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir) 

607 self.ca_cert_data = ca_cert_data 

608 

609 def connect(self) -> None: 

610 sock: socket.socket | ssl.SSLSocket 

611 self.sock = sock = self._new_conn() 

612 server_hostname: str = self.host 

613 tls_in_tls = False 

614 

615 # Do we need to establish a tunnel? 

616 if self._tunnel_host is not None: 

617 # We're tunneling to an HTTPS origin so need to do TLS-in-TLS. 

618 if self._tunnel_scheme == "https": 

619 self.sock = sock = self._connect_tls_proxy(self.host, sock) 

620 tls_in_tls = True 

621 

622 # If we're tunneling it means we're connected to our proxy. 

623 self._has_connected_to_proxy = True 

624 

625 self._tunnel() # type: ignore[attr-defined] 

626 # Override the host with the one we're requesting data from. 

627 server_hostname = self._tunnel_host 

628 

629 if self.server_hostname is not None: 

630 server_hostname = self.server_hostname 

631 

632 is_time_off = datetime.date.today() < RECENT_DATE 

633 if is_time_off: 

634 warnings.warn( 

635 ( 

636 f"System time is way off (before {RECENT_DATE}). This will probably " 

637 "lead to SSL verification errors" 

638 ), 

639 SystemTimeWarning, 

640 ) 

641 

642 sock_and_verified = _ssl_wrap_socket_and_match_hostname( 

643 sock=sock, 

644 cert_reqs=self.cert_reqs, 

645 ssl_version=self.ssl_version, 

646 ssl_minimum_version=self.ssl_minimum_version, 

647 ssl_maximum_version=self.ssl_maximum_version, 

648 ca_certs=self.ca_certs, 

649 ca_cert_dir=self.ca_cert_dir, 

650 ca_cert_data=self.ca_cert_data, 

651 cert_file=self.cert_file, 

652 key_file=self.key_file, 

653 key_password=self.key_password, 

654 server_hostname=server_hostname, 

655 ssl_context=self.ssl_context, 

656 tls_in_tls=tls_in_tls, 

657 assert_hostname=self.assert_hostname, 

658 assert_fingerprint=self.assert_fingerprint, 

659 ) 

660 self.sock = sock_and_verified.socket 

661 self.is_verified = sock_and_verified.is_verified 

662 

663 # If there's a proxy to be connected to we are fully connected. 

664 # This is set twice (once above and here) due to forwarding proxies 

665 # not using tunnelling. 

666 self._has_connected_to_proxy = bool(self.proxy) 

667 

668 def _connect_tls_proxy(self, hostname: str, sock: socket.socket) -> ssl.SSLSocket: 

669 """ 

670 Establish a TLS connection to the proxy using the provided SSL context. 

671 """ 

672 # `_connect_tls_proxy` is called when self._tunnel_host is truthy. 

673 proxy_config = typing.cast(ProxyConfig, self.proxy_config) 

674 ssl_context = proxy_config.ssl_context 

675 sock_and_verified = _ssl_wrap_socket_and_match_hostname( 

676 sock, 

677 cert_reqs=self.cert_reqs, 

678 ssl_version=self.ssl_version, 

679 ssl_minimum_version=self.ssl_minimum_version, 

680 ssl_maximum_version=self.ssl_maximum_version, 

681 ca_certs=self.ca_certs, 

682 ca_cert_dir=self.ca_cert_dir, 

683 ca_cert_data=self.ca_cert_data, 

684 server_hostname=hostname, 

685 ssl_context=ssl_context, 

686 assert_hostname=proxy_config.assert_hostname, 

687 assert_fingerprint=proxy_config.assert_fingerprint, 

688 # Features that aren't implemented for proxies yet: 

689 cert_file=None, 

690 key_file=None, 

691 key_password=None, 

692 tls_in_tls=False, 

693 ) 

694 self.proxy_is_verified = sock_and_verified.is_verified 

695 return sock_and_verified.socket # type: ignore[return-value] 

696 

697 

698class _WrappedAndVerifiedSocket(typing.NamedTuple): 

699 """ 

700 Wrapped socket and whether the connection is 

701 verified after the TLS handshake 

702 """ 

703 

704 socket: ssl.SSLSocket | SSLTransport 

705 is_verified: bool 

706 

707 

708def _ssl_wrap_socket_and_match_hostname( 

709 sock: socket.socket, 

710 *, 

711 cert_reqs: None | str | int, 

712 ssl_version: None | str | int, 

713 ssl_minimum_version: int | None, 

714 ssl_maximum_version: int | None, 

715 cert_file: str | None, 

716 key_file: str | None, 

717 key_password: str | None, 

718 ca_certs: str | None, 

719 ca_cert_dir: str | None, 

720 ca_cert_data: None | str | bytes, 

721 assert_hostname: None | str | Literal[False], 

722 assert_fingerprint: str | None, 

723 server_hostname: str | None, 

724 ssl_context: ssl.SSLContext | None, 

725 tls_in_tls: bool = False, 

726) -> _WrappedAndVerifiedSocket: 

727 """Logic for constructing an SSLContext from all TLS parameters, passing 

728 that down into ssl_wrap_socket, and then doing certificate verification 

729 either via hostname or fingerprint. This function exists to guarantee 

730 that both proxies and targets have the same behavior when connecting via TLS. 

731 """ 

732 default_ssl_context = False 

733 if ssl_context is None: 

734 default_ssl_context = True 

735 context = create_urllib3_context( 

736 ssl_version=resolve_ssl_version(ssl_version), 

737 ssl_minimum_version=ssl_minimum_version, 

738 ssl_maximum_version=ssl_maximum_version, 

739 cert_reqs=resolve_cert_reqs(cert_reqs), 

740 ) 

741 else: 

742 context = ssl_context 

743 

744 context.verify_mode = resolve_cert_reqs(cert_reqs) 

745 

746 # In some cases, we want to verify hostnames ourselves 

747 if ( 

748 # `ssl` can't verify fingerprints or alternate hostnames 

749 assert_fingerprint 

750 or assert_hostname 

751 # assert_hostname can be set to False to disable hostname checking 

752 or assert_hostname is False 

753 # We still support OpenSSL 1.0.2, which prevents us from verifying 

754 # hostnames easily: https://github.com/pyca/pyopenssl/pull/933 

755 or ssl_.IS_PYOPENSSL 

756 or not ssl_.HAS_NEVER_CHECK_COMMON_NAME 

757 ): 

758 context.check_hostname = False 

759 

760 # Try to load OS default certs if none are given. We need to do the hasattr() check 

761 # for custom pyOpenSSL SSLContext objects because they don't support 

762 # load_default_certs(). 

763 if ( 

764 not ca_certs 

765 and not ca_cert_dir 

766 and not ca_cert_data 

767 and default_ssl_context 

768 and hasattr(context, "load_default_certs") 

769 ): 

770 context.load_default_certs() 

771 

772 # Ensure that IPv6 addresses are in the proper format and don't have a 

773 # scope ID. Python's SSL module fails to recognize scoped IPv6 addresses 

774 # and interprets them as DNS hostnames. 

775 if server_hostname is not None: 

776 normalized = server_hostname.strip("[]") 

777 if "%" in normalized: 

778 normalized = normalized[: normalized.rfind("%")] 

779 if is_ipaddress(normalized): 

780 server_hostname = normalized 

781 

782 ssl_sock = ssl_wrap_socket( 

783 sock=sock, 

784 keyfile=key_file, 

785 certfile=cert_file, 

786 key_password=key_password, 

787 ca_certs=ca_certs, 

788 ca_cert_dir=ca_cert_dir, 

789 ca_cert_data=ca_cert_data, 

790 server_hostname=server_hostname, 

791 ssl_context=context, 

792 tls_in_tls=tls_in_tls, 

793 ) 

794 

795 try: 

796 if assert_fingerprint: 

797 _assert_fingerprint( 

798 ssl_sock.getpeercert(binary_form=True), assert_fingerprint 

799 ) 

800 elif ( 

801 context.verify_mode != ssl.CERT_NONE 

802 and not context.check_hostname 

803 and assert_hostname is not False 

804 ): 

805 cert: _TYPE_PEER_CERT_RET_DICT = ssl_sock.getpeercert() # type: ignore[assignment] 

806 

807 # Need to signal to our match_hostname whether to use 'commonName' or not. 

808 # If we're using our own constructed SSLContext we explicitly set 'False' 

809 # because PyPy hard-codes 'True' from SSLContext.hostname_checks_common_name. 

810 if default_ssl_context: 

811 hostname_checks_common_name = False 

812 else: 

813 hostname_checks_common_name = ( 

814 getattr(context, "hostname_checks_common_name", False) or False 

815 ) 

816 

817 _match_hostname( 

818 cert, 

819 assert_hostname or server_hostname, # type: ignore[arg-type] 

820 hostname_checks_common_name, 

821 ) 

822 

823 return _WrappedAndVerifiedSocket( 

824 socket=ssl_sock, 

825 is_verified=context.verify_mode == ssl.CERT_REQUIRED 

826 or bool(assert_fingerprint), 

827 ) 

828 except BaseException: 

829 ssl_sock.close() 

830 raise 

831 

832 

833def _match_hostname( 

834 cert: _TYPE_PEER_CERT_RET_DICT | None, 

835 asserted_hostname: str, 

836 hostname_checks_common_name: bool = False, 

837) -> None: 

838 # Our upstream implementation of ssl.match_hostname() 

839 # only applies this normalization to IP addresses so it doesn't 

840 # match DNS SANs so we do the same thing! 

841 stripped_hostname = asserted_hostname.strip("[]") 

842 if is_ipaddress(stripped_hostname): 

843 asserted_hostname = stripped_hostname 

844 

845 try: 

846 match_hostname(cert, asserted_hostname, hostname_checks_common_name) 

847 except CertificateError as e: 

848 log.warning( 

849 "Certificate did not match expected hostname: %s. Certificate: %s", 

850 asserted_hostname, 

851 cert, 

852 ) 

853 # Add cert to exception and reraise so client code can inspect 

854 # the cert when catching the exception, if they want to 

855 e._peer_cert = cert # type: ignore[attr-defined] 

856 raise 

857 

858 

859def _wrap_proxy_error(err: Exception, proxy_scheme: str | None) -> ProxyError: 

860 # Look for the phrase 'wrong version number', if found 

861 # then we should warn the user that we're very sure that 

862 # this proxy is HTTP-only and they have a configuration issue. 

863 error_normalized = " ".join(re.split("[^a-z]", str(err).lower())) 

864 is_likely_http_proxy = ( 

865 "wrong version number" in error_normalized 

866 or "unknown protocol" in error_normalized 

867 ) 

868 http_proxy_warning = ( 

869 ". Your proxy appears to only use HTTP and not HTTPS, " 

870 "try changing your proxy URL to be HTTP. See: " 

871 "https://urllib3.readthedocs.io/en/latest/advanced-usage.html" 

872 "#https-proxy-error-http-proxy" 

873 ) 

874 new_err = ProxyError( 

875 f"Unable to connect to proxy" 

876 f"{http_proxy_warning if is_likely_http_proxy and proxy_scheme == 'https' else ''}", 

877 err, 

878 ) 

879 new_err.__cause__ = err 

880 return new_err 

881 

882 

883def _get_default_user_agent() -> str: 

884 return f"python-urllib3/{__version__}" 

885 

886 

887class DummyConnection: 

888 """Used to detect a failed ConnectionCls import.""" 

889 

890 

891if not ssl: 

892 HTTPSConnection = DummyConnection # type: ignore[misc, assignment] # noqa: F811 

893 

894 

895VerifiedHTTPSConnection = HTTPSConnection 

896 

897 

898def _url_from_connection( 

899 conn: HTTPConnection | HTTPSConnection, path: str | None = None 

900) -> str: 

901 """Returns the URL from a given connection. This is mainly used for testing and logging.""" 

902 

903 scheme = "https" if isinstance(conn, HTTPSConnection) else "http" 

904 

905 return Url(scheme=scheme, host=conn.host, port=conn.port, path=path).url