Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/urllib3/connection.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

418 statements  

1from __future__ import annotations 

2 

3import datetime 

4import http.client 

5import logging 

6import os 

7import re 

8import socket 

9import sys 

10import threading 

11import typing 

12import warnings 

13from http.client import HTTPConnection as _HTTPConnection 

14from http.client import HTTPException as HTTPException # noqa: F401 

15from http.client import ResponseNotReady 

16from socket import timeout as SocketTimeout 

17 

18if typing.TYPE_CHECKING: 

19 from .response import HTTPResponse 

20 from .util.ssl_ import _TYPE_PEER_CERT_RET_DICT 

21 from .util.ssltransport import SSLTransport 

22 

23from ._collections import HTTPHeaderDict 

24from .http2 import probe as http2_probe 

25from .util.response import assert_header_parsing 

26from .util.timeout import _DEFAULT_TIMEOUT, _TYPE_TIMEOUT, Timeout 

27from .util.util import to_str 

28from .util.wait import wait_for_read 

29 

30try: # Compiled with SSL? 

31 import ssl 

32 

33 BaseSSLError = ssl.SSLError 

34except (ImportError, AttributeError): 

35 ssl = None # type: ignore[assignment] 

36 

37 class BaseSSLError(BaseException): # type: ignore[no-redef] 

38 pass 

39 

40 

41from ._base_connection import _TYPE_BODY 

42from ._base_connection import ProxyConfig as ProxyConfig 

43from ._base_connection import _ResponseOptions as _ResponseOptions 

44from ._version import __version__ 

45from .exceptions import ( 

46 ConnectTimeoutError, 

47 HeaderParsingError, 

48 NameResolutionError, 

49 NewConnectionError, 

50 ProxyError, 

51 SystemTimeWarning, 

52) 

53from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection, ssl_ 

54from .util.request import body_to_chunks 

55from .util.ssl_ import assert_fingerprint as _assert_fingerprint 

56from .util.ssl_ import ( 

57 create_urllib3_context, 

58 is_ipaddress, 

59 resolve_cert_reqs, 

60 resolve_ssl_version, 

61 ssl_wrap_socket, 

62) 

63from .util.ssl_match_hostname import CertificateError, match_hostname 

64from .util.url import Url 

65 

66# Not a no-op, we're adding this to the namespace so it can be imported. 

67ConnectionError = ConnectionError 

68BrokenPipeError = BrokenPipeError 

69 

70 

71log = logging.getLogger(__name__) 

72 

73port_by_scheme = {"http": 80, "https": 443} 

74 

75# When it comes time to update this value as a part of regular maintenance 

76# (ie test_recent_date is failing) update it to ~6 months before the current date. 

77RECENT_DATE = datetime.date(2025, 1, 1) 

78 

79_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

80 

81 

82class HTTPConnection(_HTTPConnection): 

83 """ 

84 Based on :class:`http.client.HTTPConnection` but provides an extra constructor 

85 backwards-compatibility layer between older and newer Pythons. 

86 

87 Additional keyword parameters are used to configure attributes of the connection. 

88 Accepted parameters include: 

89 

90 - ``source_address``: Set the source address for the current connection. 

91 - ``socket_options``: Set specific options on the underlying socket. If not specified, then 

92 defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling 

93 Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy. 

94 

95 For example, if you wish to enable TCP Keep Alive in addition to the defaults, 

96 you might pass: 

97 

98 .. code-block:: python 

99 

100 HTTPConnection.default_socket_options + [ 

101 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), 

102 ] 

103 

104 Or you may want to disable the defaults by passing an empty list (e.g., ``[]``). 

105 """ 

106 

107 default_port: typing.ClassVar[int] = port_by_scheme["http"] # type: ignore[misc] 

108 

109 #: Disable Nagle's algorithm by default. 

110 #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]`` 

111 default_socket_options: typing.ClassVar[connection._TYPE_SOCKET_OPTIONS] = [ 

112 (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

113 ] 

114 

115 #: Whether this connection verifies the host's certificate. 

116 is_verified: bool = False 

117 

118 #: Whether this proxy connection verified the proxy host's certificate. 

119 # If no proxy is currently connected to the value will be ``None``. 

120 proxy_is_verified: bool | None = None 

121 

122 blocksize: int 

123 source_address: tuple[str, int] | None 

124 socket_options: connection._TYPE_SOCKET_OPTIONS | None 

125 

126 _has_connected_to_proxy: bool 

127 _response_options: _ResponseOptions | None 

128 _tunnel_host: str | None 

129 _tunnel_port: int | None 

130 _tunnel_scheme: str | None 

131 

132 def __init__( 

133 self, 

134 host: str, 

135 port: int | None = None, 

136 *, 

137 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, 

138 source_address: tuple[str, int] | None = None, 

139 blocksize: int = 16384, 

140 socket_options: None | ( 

141 connection._TYPE_SOCKET_OPTIONS 

142 ) = default_socket_options, 

143 proxy: Url | None = None, 

144 proxy_config: ProxyConfig | None = None, 

145 ) -> None: 

146 super().__init__( 

147 host=host, 

148 port=port, 

149 timeout=Timeout.resolve_default_timeout(timeout), 

150 source_address=source_address, 

151 blocksize=blocksize, 

152 ) 

153 self.socket_options = socket_options 

154 self.proxy = proxy 

155 self.proxy_config = proxy_config 

156 

157 self._has_connected_to_proxy = False 

158 self._response_options = None 

159 self._tunnel_host: str | None = None 

160 self._tunnel_port: int | None = None 

161 self._tunnel_scheme: str | None = None 

162 

163 def __str__(self) -> str: 

164 return f"{type(self).__name__}(host={self.host!r}, port={self.port!r})" 

165 

166 def __repr__(self) -> str: 

167 return f"<{self} at {id(self):#x}>" 

168 

169 @property 

170 def host(self) -> str: 

171 """ 

172 Getter method to remove any trailing dots that indicate the hostname is an FQDN. 

173 

174 In general, SSL certificates don't include the trailing dot indicating a 

175 fully-qualified domain name, and thus, they don't validate properly when 

176 checked against a domain name that includes the dot. In addition, some 

177 servers may not expect to receive the trailing dot when provided. 

178 

179 However, the hostname with trailing dot is critical to DNS resolution; doing a 

180 lookup with the trailing dot will properly only resolve the appropriate FQDN, 

181 whereas a lookup without a trailing dot will search the system's search domain 

182 list. Thus, it's important to keep the original host around for use only in 

183 those cases where it's appropriate (i.e., when doing DNS lookup to establish the 

184 actual TCP connection across which we're going to send HTTP requests). 

185 """ 

186 return self._dns_host.rstrip(".") 

187 

188 @host.setter 

189 def host(self, value: str) -> None: 

190 """ 

191 Setter for the `host` property. 

192 

193 We assume that only urllib3 uses the _dns_host attribute; httplib itself 

194 only uses `host`, and it seems reasonable that other libraries follow suit. 

195 """ 

196 self._dns_host = value 

197 

198 def _new_conn(self) -> socket.socket: 

199 """Establish a socket connection and set nodelay settings on it. 

200 

201 :return: New socket connection. 

202 """ 

203 try: 

204 sock = connection.create_connection( 

205 (self._dns_host, self.port), 

206 self.timeout, 

207 source_address=self.source_address, 

208 socket_options=self.socket_options, 

209 ) 

210 except socket.gaierror as e: 

211 raise NameResolutionError(self.host, self, e) from e 

212 except SocketTimeout as e: 

213 raise ConnectTimeoutError( 

214 self, 

215 f"Connection to {self.host} timed out. (connect timeout={self.timeout})", 

216 ) from e 

217 

218 except OSError as e: 

219 raise NewConnectionError( 

220 self, f"Failed to establish a new connection: {e}" 

221 ) from e 

222 

223 sys.audit("http.client.connect", self, self.host, self.port) 

224 

225 return sock 

226 

227 def set_tunnel( 

228 self, 

229 host: str, 

230 port: int | None = None, 

231 headers: typing.Mapping[str, str] | None = None, 

232 scheme: str = "http", 

233 ) -> None: 

234 if scheme not in ("http", "https"): 

235 raise ValueError( 

236 f"Invalid proxy scheme for tunneling: {scheme!r}, must be either 'http' or 'https'" 

237 ) 

238 super().set_tunnel(host, port=port, headers=headers) 

239 self._tunnel_scheme = scheme 

240 

241 if sys.version_info < (3, 11, 9) or ((3, 12) <= sys.version_info < (3, 12, 3)): 

242 # Taken from python/cpython#100986 which was backported in 3.11.9 and 3.12.3. 

243 # When using connection_from_host, host will come without brackets. 

244 def _wrap_ipv6(self, ip: bytes) -> bytes: 

245 if b":" in ip and ip[0] != b"["[0]: 

246 return b"[" + ip + b"]" 

247 return ip 

248 

249 if sys.version_info < (3, 11, 9): 

250 # `_tunnel` copied from 3.11.13 backporting 

251 # https://github.com/python/cpython/commit/0d4026432591d43185568dd31cef6a034c4b9261 

252 # and https://github.com/python/cpython/commit/6fbc61070fda2ffb8889e77e3b24bca4249ab4d1 

253 def _tunnel(self) -> None: 

254 _MAXLINE = http.client._MAXLINE # type: ignore[attr-defined] 

255 connect = b"CONNECT %s:%d HTTP/1.0\r\n" % ( # type: ignore[str-format] 

256 self._wrap_ipv6(self._tunnel_host.encode("ascii")), # type: ignore[union-attr] 

257 self._tunnel_port, 

258 ) 

259 headers = [connect] 

260 for header, value in self._tunnel_headers.items(): # type: ignore[attr-defined] 

261 headers.append(f"{header}: {value}\r\n".encode("latin-1")) 

262 headers.append(b"\r\n") 

263 # Making a single send() call instead of one per line encourages 

264 # the host OS to use a more optimal packet size instead of 

265 # potentially emitting a series of small packets. 

266 self.send(b"".join(headers)) 

267 del headers 

268 

269 response = self.response_class(self.sock, method=self._method) # type: ignore[attr-defined] 

270 try: 

271 (version, code, message) = response._read_status() # type: ignore[attr-defined] 

272 

273 if code != http.HTTPStatus.OK: 

274 self.close() 

275 raise OSError( 

276 f"Tunnel connection failed: {code} {message.strip()}" 

277 ) 

278 while True: 

279 line = response.fp.readline(_MAXLINE + 1) 

280 if len(line) > _MAXLINE: 

281 raise http.client.LineTooLong("header line") 

282 if not line: 

283 # for sites which EOF without sending a trailer 

284 break 

285 if line in (b"\r\n", b"\n", b""): 

286 break 

287 

288 if self.debuglevel > 0: 

289 print("header:", line.decode()) 

290 finally: 

291 response.close() 

292 

293 elif (3, 12) <= sys.version_info < (3, 12, 3): 

294 # `_tunnel` copied from 3.12.11 backporting 

295 # https://github.com/python/cpython/commit/23aef575c7629abcd4aaf028ebd226fb41a4b3c8 

296 def _tunnel(self) -> None: # noqa: F811 

297 connect = b"CONNECT %s:%d HTTP/1.1\r\n" % ( # type: ignore[str-format] 

298 self._wrap_ipv6(self._tunnel_host.encode("idna")), # type: ignore[union-attr] 

299 self._tunnel_port, 

300 ) 

301 headers = [connect] 

302 for header, value in self._tunnel_headers.items(): # type: ignore[attr-defined] 

303 headers.append(f"{header}: {value}\r\n".encode("latin-1")) 

304 headers.append(b"\r\n") 

305 # Making a single send() call instead of one per line encourages 

306 # the host OS to use a more optimal packet size instead of 

307 # potentially emitting a series of small packets. 

308 self.send(b"".join(headers)) 

309 del headers 

310 

311 response = self.response_class(self.sock, method=self._method) # type: ignore[attr-defined] 

312 try: 

313 (version, code, message) = response._read_status() # type: ignore[attr-defined] 

314 

315 self._raw_proxy_headers = http.client._read_headers(response.fp) # type: ignore[attr-defined] 

316 

317 if self.debuglevel > 0: 

318 for header in self._raw_proxy_headers: 

319 print("header:", header.decode()) 

320 

321 if code != http.HTTPStatus.OK: 

322 self.close() 

323 raise OSError( 

324 f"Tunnel connection failed: {code} {message.strip()}" 

325 ) 

326 

327 finally: 

328 response.close() 

329 

330 def connect(self) -> None: 

331 self.sock = self._new_conn() 

332 if self._tunnel_host: 

333 # If we're tunneling it means we're connected to our proxy. 

334 self._has_connected_to_proxy = True 

335 

336 # TODO: Fix tunnel so it doesn't depend on self.sock state. 

337 self._tunnel() 

338 

339 # If there's a proxy to be connected to we are fully connected. 

340 # This is set twice (once above and here) due to forwarding proxies 

341 # not using tunnelling. 

342 self._has_connected_to_proxy = bool(self.proxy) 

343 

344 if self._has_connected_to_proxy: 

345 self.proxy_is_verified = False 

346 

347 @property 

348 def is_closed(self) -> bool: 

349 return self.sock is None 

350 

351 @property 

352 def is_connected(self) -> bool: 

353 if self.sock is None: 

354 return False 

355 return not wait_for_read(self.sock, timeout=0.0) 

356 

357 @property 

358 def has_connected_to_proxy(self) -> bool: 

359 return self._has_connected_to_proxy 

360 

361 @property 

362 def proxy_is_forwarding(self) -> bool: 

363 """ 

364 Return True if a forwarding proxy is configured, else return False 

365 """ 

366 return bool(self.proxy) and self._tunnel_host is None 

367 

368 @property 

369 def proxy_is_tunneling(self) -> bool: 

370 """ 

371 Return True if a tunneling proxy is configured, else return False 

372 """ 

373 return self._tunnel_host is not None 

374 

375 def close(self) -> None: 

376 try: 

377 super().close() 

378 finally: 

379 # Reset all stateful properties so connection 

380 # can be re-used without leaking prior configs. 

381 self.sock = None 

382 self.is_verified = False 

383 self.proxy_is_verified = None 

384 self._has_connected_to_proxy = False 

385 self._response_options = None 

386 self._tunnel_host = None 

387 self._tunnel_port = None 

388 self._tunnel_scheme = None 

389 

390 def putrequest( 

391 self, 

392 method: str, 

393 url: str, 

394 skip_host: bool = False, 

395 skip_accept_encoding: bool = False, 

396 ) -> None: 

397 """""" 

398 # Empty docstring because the indentation of CPython's implementation 

399 # is broken but we don't want this method in our documentation. 

400 match = _CONTAINS_CONTROL_CHAR_RE.search(method) 

401 if match: 

402 raise ValueError( 

403 f"Method cannot contain non-token characters {method!r} (found at least {match.group()!r})" 

404 ) 

405 

406 return super().putrequest( 

407 method, url, skip_host=skip_host, skip_accept_encoding=skip_accept_encoding 

408 ) 

409 

410 def putheader(self, header: str, *values: str) -> None: # type: ignore[override] 

411 """""" 

412 if not any(isinstance(v, str) and v == SKIP_HEADER for v in values): 

413 super().putheader(header, *values) 

414 elif to_str(header.lower()) not in SKIPPABLE_HEADERS: 

415 skippable_headers = "', '".join( 

416 [str.title(header) for header in sorted(SKIPPABLE_HEADERS)] 

417 ) 

418 raise ValueError( 

419 f"urllib3.util.SKIP_HEADER only supports '{skippable_headers}'" 

420 ) 

421 

422 # `request` method's signature intentionally violates LSP. 

423 # urllib3's API is different from `http.client.HTTPConnection` and the subclassing is only incidental. 

424 def request( # type: ignore[override] 

425 self, 

426 method: str, 

427 url: str, 

428 body: _TYPE_BODY | None = None, 

429 headers: typing.Mapping[str, str] | None = None, 

430 *, 

431 chunked: bool = False, 

432 preload_content: bool = True, 

433 decode_content: bool = True, 

434 enforce_content_length: bool = True, 

435 ) -> None: 

436 # Update the inner socket's timeout value to send the request. 

437 # This only triggers if the connection is re-used. 

438 if self.sock is not None: 

439 self.sock.settimeout(self.timeout) 

440 

441 # Store these values to be fed into the HTTPResponse 

442 # object later. TODO: Remove this in favor of a real 

443 # HTTP lifecycle mechanism. 

444 

445 # We have to store these before we call .request() 

446 # because sometimes we can still salvage a response 

447 # off the wire even if we aren't able to completely 

448 # send the request body. 

449 self._response_options = _ResponseOptions( 

450 request_method=method, 

451 request_url=url, 

452 preload_content=preload_content, 

453 decode_content=decode_content, 

454 enforce_content_length=enforce_content_length, 

455 ) 

456 

457 if headers is None: 

458 headers = {} 

459 header_keys = frozenset(to_str(k.lower()) for k in headers) 

460 skip_accept_encoding = "accept-encoding" in header_keys 

461 skip_host = "host" in header_keys 

462 self.putrequest( 

463 method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host 

464 ) 

465 

466 # Transform the body into an iterable of sendall()-able chunks 

467 # and detect if an explicit Content-Length is doable. 

468 chunks_and_cl = body_to_chunks(body, method=method, blocksize=self.blocksize) 

469 chunks = chunks_and_cl.chunks 

470 content_length = chunks_and_cl.content_length 

471 

472 # When chunked is explicit set to 'True' we respect that. 

473 if chunked: 

474 if "transfer-encoding" not in header_keys: 

475 self.putheader("Transfer-Encoding", "chunked") 

476 else: 

477 # Detect whether a framing mechanism is already in use. If so 

478 # we respect that value, otherwise we pick chunked vs content-length 

479 # depending on the type of 'body'. 

480 if "content-length" in header_keys: 

481 chunked = False 

482 elif "transfer-encoding" in header_keys: 

483 chunked = True 

484 

485 # Otherwise we go off the recommendation of 'body_to_chunks()'. 

486 else: 

487 chunked = False 

488 if content_length is None: 

489 if chunks is not None: 

490 chunked = True 

491 self.putheader("Transfer-Encoding", "chunked") 

492 else: 

493 self.putheader("Content-Length", str(content_length)) 

494 

495 # Now that framing headers are out of the way we send all the other headers. 

496 if "user-agent" not in header_keys: 

497 self.putheader("User-Agent", _get_default_user_agent()) 

498 for header, value in headers.items(): 

499 self.putheader(header, value) 

500 self.endheaders() 

501 

502 # If we're given a body we start sending that in chunks. 

503 if chunks is not None: 

504 for chunk in chunks: 

505 # Sending empty chunks isn't allowed for TE: chunked 

506 # as it indicates the end of the body. 

507 if not chunk: 

508 continue 

509 if isinstance(chunk, str): 

510 chunk = chunk.encode("utf-8") 

511 if chunked: 

512 self.send(b"%x\r\n%b\r\n" % (len(chunk), chunk)) 

513 else: 

514 self.send(chunk) 

515 

516 # Regardless of whether we have a body or not, if we're in 

517 # chunked mode we want to send an explicit empty chunk. 

518 if chunked: 

519 self.send(b"0\r\n\r\n") 

520 

521 def request_chunked( 

522 self, 

523 method: str, 

524 url: str, 

525 body: _TYPE_BODY | None = None, 

526 headers: typing.Mapping[str, str] | None = None, 

527 ) -> None: 

528 """ 

529 Alternative to the common request method, which sends the 

530 body with chunked encoding and not as one block 

531 """ 

532 warnings.warn( 

533 "HTTPConnection.request_chunked() is deprecated and will be removed " 

534 "in urllib3 v2.1.0. Instead use HTTPConnection.request(..., chunked=True).", 

535 category=DeprecationWarning, 

536 stacklevel=2, 

537 ) 

538 self.request(method, url, body=body, headers=headers, chunked=True) 

539 

540 def getresponse( # type: ignore[override] 

541 self, 

542 ) -> HTTPResponse: 

543 """ 

544 Get the response from the server. 

545 

546 If the HTTPConnection is in the correct state, returns an instance of HTTPResponse or of whatever object is returned by the response_class variable. 

547 

548 If a request has not been sent or if a previous response has not be handled, ResponseNotReady is raised. If the HTTP response indicates that the connection should be closed, then it will be closed before the response is returned. When the connection is closed, the underlying socket is closed. 

549 """ 

550 # Raise the same error as http.client.HTTPConnection 

551 if self._response_options is None: 

552 raise ResponseNotReady() 

553 

554 # Reset this attribute for being used again. 

555 resp_options = self._response_options 

556 self._response_options = None 

557 

558 # Since the connection's timeout value may have been updated 

559 # we need to set the timeout on the socket. 

560 self.sock.settimeout(self.timeout) 

561 

562 # This is needed here to avoid circular import errors 

563 from .response import HTTPResponse 

564 

565 # Save a reference to the shutdown function before ownership is passed 

566 # to httplib_response 

567 # TODO should we implement it everywhere? 

568 _shutdown = getattr(self.sock, "shutdown", None) 

569 

570 # Get the response from http.client.HTTPConnection 

571 httplib_response = super().getresponse() 

572 

573 try: 

574 assert_header_parsing(httplib_response.msg) 

575 except (HeaderParsingError, TypeError) as hpe: 

576 log.warning( 

577 "Failed to parse headers (url=%s): %s", 

578 _url_from_connection(self, resp_options.request_url), 

579 hpe, 

580 exc_info=True, 

581 ) 

582 

583 headers = HTTPHeaderDict(httplib_response.msg.items()) 

584 

585 response = HTTPResponse( 

586 body=httplib_response, 

587 headers=headers, 

588 status=httplib_response.status, 

589 version=httplib_response.version, 

590 version_string=getattr(self, "_http_vsn_str", "HTTP/?"), 

591 reason=httplib_response.reason, 

592 preload_content=resp_options.preload_content, 

593 decode_content=resp_options.decode_content, 

594 original_response=httplib_response, 

595 enforce_content_length=resp_options.enforce_content_length, 

596 request_method=resp_options.request_method, 

597 request_url=resp_options.request_url, 

598 sock_shutdown=_shutdown, 

599 ) 

600 return response 

601 

602 

603class HTTPSConnection(HTTPConnection): 

604 """ 

605 Many of the parameters to this constructor are passed to the underlying SSL 

606 socket by means of :py:func:`urllib3.util.ssl_wrap_socket`. 

607 """ 

608 

609 default_port = port_by_scheme["https"] # type: ignore[misc] 

610 

611 cert_reqs: int | str | None = None 

612 ca_certs: str | None = None 

613 ca_cert_dir: str | None = None 

614 ca_cert_data: None | str | bytes = None 

615 ssl_version: int | str | None = None 

616 ssl_minimum_version: int | None = None 

617 ssl_maximum_version: int | None = None 

618 assert_fingerprint: str | None = None 

619 _connect_callback: typing.Callable[..., None] | None = None 

620 

621 def __init__( 

622 self, 

623 host: str, 

624 port: int | None = None, 

625 *, 

626 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, 

627 source_address: tuple[str, int] | None = None, 

628 blocksize: int = 16384, 

629 socket_options: None | ( 

630 connection._TYPE_SOCKET_OPTIONS 

631 ) = HTTPConnection.default_socket_options, 

632 proxy: Url | None = None, 

633 proxy_config: ProxyConfig | None = None, 

634 cert_reqs: int | str | None = None, 

635 assert_hostname: None | str | typing.Literal[False] = None, 

636 assert_fingerprint: str | None = None, 

637 server_hostname: str | None = None, 

638 ssl_context: ssl.SSLContext | None = None, 

639 ca_certs: str | None = None, 

640 ca_cert_dir: str | None = None, 

641 ca_cert_data: None | str | bytes = None, 

642 ssl_minimum_version: int | None = None, 

643 ssl_maximum_version: int | None = None, 

644 ssl_version: int | str | None = None, # Deprecated 

645 cert_file: str | None = None, 

646 key_file: str | None = None, 

647 key_password: str | None = None, 

648 ) -> None: 

649 super().__init__( 

650 host, 

651 port=port, 

652 timeout=timeout, 

653 source_address=source_address, 

654 blocksize=blocksize, 

655 socket_options=socket_options, 

656 proxy=proxy, 

657 proxy_config=proxy_config, 

658 ) 

659 

660 self.key_file = key_file 

661 self.cert_file = cert_file 

662 self.key_password = key_password 

663 self.ssl_context = ssl_context 

664 self.server_hostname = server_hostname 

665 self.assert_hostname = assert_hostname 

666 self.assert_fingerprint = assert_fingerprint 

667 self.ssl_version = ssl_version 

668 self.ssl_minimum_version = ssl_minimum_version 

669 self.ssl_maximum_version = ssl_maximum_version 

670 self.ca_certs = ca_certs and os.path.expanduser(ca_certs) 

671 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir) 

672 self.ca_cert_data = ca_cert_data 

673 

674 # cert_reqs depends on ssl_context so calculate last. 

675 if cert_reqs is None: 

676 if self.ssl_context is not None: 

677 cert_reqs = self.ssl_context.verify_mode 

678 else: 

679 cert_reqs = resolve_cert_reqs(None) 

680 self.cert_reqs = cert_reqs 

681 self._connect_callback = None 

682 

683 def set_cert( 

684 self, 

685 key_file: str | None = None, 

686 cert_file: str | None = None, 

687 cert_reqs: int | str | None = None, 

688 key_password: str | None = None, 

689 ca_certs: str | None = None, 

690 assert_hostname: None | str | typing.Literal[False] = None, 

691 assert_fingerprint: str | None = None, 

692 ca_cert_dir: str | None = None, 

693 ca_cert_data: None | str | bytes = None, 

694 ) -> None: 

695 """ 

696 This method should only be called once, before the connection is used. 

697 """ 

698 warnings.warn( 

699 "HTTPSConnection.set_cert() is deprecated and will be removed " 

700 "in urllib3 v2.1.0. Instead provide the parameters to the " 

701 "HTTPSConnection constructor.", 

702 category=DeprecationWarning, 

703 stacklevel=2, 

704 ) 

705 

706 # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also 

707 # have an SSLContext object in which case we'll use its verify_mode. 

708 if cert_reqs is None: 

709 if self.ssl_context is not None: 

710 cert_reqs = self.ssl_context.verify_mode 

711 else: 

712 cert_reqs = resolve_cert_reqs(None) 

713 

714 self.key_file = key_file 

715 self.cert_file = cert_file 

716 self.cert_reqs = cert_reqs 

717 self.key_password = key_password 

718 self.assert_hostname = assert_hostname 

719 self.assert_fingerprint = assert_fingerprint 

720 self.ca_certs = ca_certs and os.path.expanduser(ca_certs) 

721 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir) 

722 self.ca_cert_data = ca_cert_data 

723 

724 def connect(self) -> None: 

725 # Today we don't need to be doing this step before the /actual/ socket 

726 # connection, however in the future we'll need to decide whether to 

727 # create a new socket or re-use an existing "shared" socket as a part 

728 # of the HTTP/2 handshake dance. 

729 if self._tunnel_host is not None and self._tunnel_port is not None: 

730 probe_http2_host = self._tunnel_host 

731 probe_http2_port = self._tunnel_port 

732 else: 

733 probe_http2_host = self.host 

734 probe_http2_port = self.port 

735 

736 # Check if the target origin supports HTTP/2. 

737 # If the value comes back as 'None' it means that the current thread 

738 # is probing for HTTP/2 support. Otherwise, we're waiting for another 

739 # probe to complete, or we get a value right away. 

740 target_supports_http2: bool | None 

741 if "h2" in ssl_.ALPN_PROTOCOLS: 

742 target_supports_http2 = http2_probe.acquire_and_get( 

743 host=probe_http2_host, port=probe_http2_port 

744 ) 

745 else: 

746 # If HTTP/2 isn't going to be offered it doesn't matter if 

747 # the target supports HTTP/2. Don't want to make a probe. 

748 target_supports_http2 = False 

749 

750 if self._connect_callback is not None: 

751 self._connect_callback( 

752 "before connect", 

753 thread_id=threading.get_ident(), 

754 target_supports_http2=target_supports_http2, 

755 ) 

756 

757 try: 

758 sock: socket.socket | ssl.SSLSocket 

759 self.sock = sock = self._new_conn() 

760 server_hostname: str = self.host 

761 tls_in_tls = False 

762 

763 # Do we need to establish a tunnel? 

764 if self.proxy_is_tunneling: 

765 # We're tunneling to an HTTPS origin so need to do TLS-in-TLS. 

766 if self._tunnel_scheme == "https": 

767 # _connect_tls_proxy will verify and assign proxy_is_verified 

768 self.sock = sock = self._connect_tls_proxy(self.host, sock) 

769 tls_in_tls = True 

770 elif self._tunnel_scheme == "http": 

771 self.proxy_is_verified = False 

772 

773 # If we're tunneling it means we're connected to our proxy. 

774 self._has_connected_to_proxy = True 

775 

776 self._tunnel() 

777 # Override the host with the one we're requesting data from. 

778 server_hostname = typing.cast(str, self._tunnel_host) 

779 

780 if self.server_hostname is not None: 

781 server_hostname = self.server_hostname 

782 

783 is_time_off = datetime.date.today() < RECENT_DATE 

784 if is_time_off: 

785 warnings.warn( 

786 ( 

787 f"System time is way off (before {RECENT_DATE}). This will probably " 

788 "lead to SSL verification errors" 

789 ), 

790 SystemTimeWarning, 

791 ) 

792 

793 # Remove trailing '.' from fqdn hostnames to allow certificate validation 

794 server_hostname_rm_dot = server_hostname.rstrip(".") 

795 

796 sock_and_verified = _ssl_wrap_socket_and_match_hostname( 

797 sock=sock, 

798 cert_reqs=self.cert_reqs, 

799 ssl_version=self.ssl_version, 

800 ssl_minimum_version=self.ssl_minimum_version, 

801 ssl_maximum_version=self.ssl_maximum_version, 

802 ca_certs=self.ca_certs, 

803 ca_cert_dir=self.ca_cert_dir, 

804 ca_cert_data=self.ca_cert_data, 

805 cert_file=self.cert_file, 

806 key_file=self.key_file, 

807 key_password=self.key_password, 

808 server_hostname=server_hostname_rm_dot, 

809 ssl_context=self.ssl_context, 

810 tls_in_tls=tls_in_tls, 

811 assert_hostname=self.assert_hostname, 

812 assert_fingerprint=self.assert_fingerprint, 

813 ) 

814 self.sock = sock_and_verified.socket 

815 

816 # If an error occurs during connection/handshake we may need to release 

817 # our lock so another connection can probe the origin. 

818 except BaseException: 

819 if self._connect_callback is not None: 

820 self._connect_callback( 

821 "after connect failure", 

822 thread_id=threading.get_ident(), 

823 target_supports_http2=target_supports_http2, 

824 ) 

825 

826 if target_supports_http2 is None: 

827 http2_probe.set_and_release( 

828 host=probe_http2_host, port=probe_http2_port, supports_http2=None 

829 ) 

830 raise 

831 

832 # If this connection doesn't know if the origin supports HTTP/2 

833 # we report back to the HTTP/2 probe our result. 

834 if target_supports_http2 is None: 

835 supports_http2 = sock_and_verified.socket.selected_alpn_protocol() == "h2" 

836 http2_probe.set_and_release( 

837 host=probe_http2_host, 

838 port=probe_http2_port, 

839 supports_http2=supports_http2, 

840 ) 

841 

842 # Forwarding proxies can never have a verified target since 

843 # the proxy is the one doing the verification. Should instead 

844 # use a CONNECT tunnel in order to verify the target. 

845 # See: https://github.com/urllib3/urllib3/issues/3267. 

846 if self.proxy_is_forwarding: 

847 self.is_verified = False 

848 else: 

849 self.is_verified = sock_and_verified.is_verified 

850 

851 # If there's a proxy to be connected to we are fully connected. 

852 # This is set twice (once above and here) due to forwarding proxies 

853 # not using tunnelling. 

854 self._has_connected_to_proxy = bool(self.proxy) 

855 

856 # Set `self.proxy_is_verified` unless it's already set while 

857 # establishing a tunnel. 

858 if self._has_connected_to_proxy and self.proxy_is_verified is None: 

859 self.proxy_is_verified = sock_and_verified.is_verified 

860 

861 def _connect_tls_proxy(self, hostname: str, sock: socket.socket) -> ssl.SSLSocket: 

862 """ 

863 Establish a TLS connection to the proxy using the provided SSL context. 

864 """ 

865 # `_connect_tls_proxy` is called when self._tunnel_host is truthy. 

866 proxy_config = typing.cast(ProxyConfig, self.proxy_config) 

867 ssl_context = proxy_config.ssl_context 

868 sock_and_verified = _ssl_wrap_socket_and_match_hostname( 

869 sock, 

870 cert_reqs=self.cert_reqs, 

871 ssl_version=self.ssl_version, 

872 ssl_minimum_version=self.ssl_minimum_version, 

873 ssl_maximum_version=self.ssl_maximum_version, 

874 ca_certs=self.ca_certs, 

875 ca_cert_dir=self.ca_cert_dir, 

876 ca_cert_data=self.ca_cert_data, 

877 server_hostname=hostname, 

878 ssl_context=ssl_context, 

879 assert_hostname=proxy_config.assert_hostname, 

880 assert_fingerprint=proxy_config.assert_fingerprint, 

881 # Features that aren't implemented for proxies yet: 

882 cert_file=None, 

883 key_file=None, 

884 key_password=None, 

885 tls_in_tls=False, 

886 ) 

887 self.proxy_is_verified = sock_and_verified.is_verified 

888 return sock_and_verified.socket # type: ignore[return-value] 

889 

890 

891class _WrappedAndVerifiedSocket(typing.NamedTuple): 

892 """ 

893 Wrapped socket and whether the connection is 

894 verified after the TLS handshake 

895 """ 

896 

897 socket: ssl.SSLSocket | SSLTransport 

898 is_verified: bool 

899 

900 

901def _ssl_wrap_socket_and_match_hostname( 

902 sock: socket.socket, 

903 *, 

904 cert_reqs: None | str | int, 

905 ssl_version: None | str | int, 

906 ssl_minimum_version: int | None, 

907 ssl_maximum_version: int | None, 

908 cert_file: str | None, 

909 key_file: str | None, 

910 key_password: str | None, 

911 ca_certs: str | None, 

912 ca_cert_dir: str | None, 

913 ca_cert_data: None | str | bytes, 

914 assert_hostname: None | str | typing.Literal[False], 

915 assert_fingerprint: str | None, 

916 server_hostname: str | None, 

917 ssl_context: ssl.SSLContext | None, 

918 tls_in_tls: bool = False, 

919) -> _WrappedAndVerifiedSocket: 

920 """Logic for constructing an SSLContext from all TLS parameters, passing 

921 that down into ssl_wrap_socket, and then doing certificate verification 

922 either via hostname or fingerprint. This function exists to guarantee 

923 that both proxies and targets have the same behavior when connecting via TLS. 

924 """ 

925 default_ssl_context = False 

926 if ssl_context is None: 

927 default_ssl_context = True 

928 context = create_urllib3_context( 

929 ssl_version=resolve_ssl_version(ssl_version), 

930 ssl_minimum_version=ssl_minimum_version, 

931 ssl_maximum_version=ssl_maximum_version, 

932 cert_reqs=resolve_cert_reqs(cert_reqs), 

933 ) 

934 else: 

935 context = ssl_context 

936 

937 context.verify_mode = resolve_cert_reqs(cert_reqs) 

938 

939 # In some cases, we want to verify hostnames ourselves 

940 if ( 

941 # `ssl` can't verify fingerprints or alternate hostnames 

942 assert_fingerprint 

943 or assert_hostname 

944 # assert_hostname can be set to False to disable hostname checking 

945 or assert_hostname is False 

946 # We still support OpenSSL 1.0.2, which prevents us from verifying 

947 # hostnames easily: https://github.com/pyca/pyopenssl/pull/933 

948 or ssl_.IS_PYOPENSSL 

949 or not ssl_.HAS_NEVER_CHECK_COMMON_NAME 

950 ): 

951 context.check_hostname = False 

952 

953 # Try to load OS default certs if none are given. We need to do the hasattr() check 

954 # for custom pyOpenSSL SSLContext objects because they don't support 

955 # load_default_certs(). 

956 if ( 

957 not ca_certs 

958 and not ca_cert_dir 

959 and not ca_cert_data 

960 and default_ssl_context 

961 and hasattr(context, "load_default_certs") 

962 ): 

963 context.load_default_certs() 

964 

965 # Ensure that IPv6 addresses are in the proper format and don't have a 

966 # scope ID. Python's SSL module fails to recognize scoped IPv6 addresses 

967 # and interprets them as DNS hostnames. 

968 if server_hostname is not None: 

969 normalized = server_hostname.strip("[]") 

970 if "%" in normalized: 

971 normalized = normalized[: normalized.rfind("%")] 

972 if is_ipaddress(normalized): 

973 server_hostname = normalized 

974 

975 ssl_sock = ssl_wrap_socket( 

976 sock=sock, 

977 keyfile=key_file, 

978 certfile=cert_file, 

979 key_password=key_password, 

980 ca_certs=ca_certs, 

981 ca_cert_dir=ca_cert_dir, 

982 ca_cert_data=ca_cert_data, 

983 server_hostname=server_hostname, 

984 ssl_context=context, 

985 tls_in_tls=tls_in_tls, 

986 ) 

987 

988 try: 

989 if assert_fingerprint: 

990 _assert_fingerprint( 

991 ssl_sock.getpeercert(binary_form=True), assert_fingerprint 

992 ) 

993 elif ( 

994 context.verify_mode != ssl.CERT_NONE 

995 and not context.check_hostname 

996 and assert_hostname is not False 

997 ): 

998 cert: _TYPE_PEER_CERT_RET_DICT = ssl_sock.getpeercert() # type: ignore[assignment] 

999 

1000 # Need to signal to our match_hostname whether to use 'commonName' or not. 

1001 # If we're using our own constructed SSLContext we explicitly set 'False' 

1002 # because PyPy hard-codes 'True' from SSLContext.hostname_checks_common_name. 

1003 if default_ssl_context: 

1004 hostname_checks_common_name = False 

1005 else: 

1006 hostname_checks_common_name = ( 

1007 getattr(context, "hostname_checks_common_name", False) or False 

1008 ) 

1009 

1010 _match_hostname( 

1011 cert, 

1012 assert_hostname or server_hostname, # type: ignore[arg-type] 

1013 hostname_checks_common_name, 

1014 ) 

1015 

1016 return _WrappedAndVerifiedSocket( 

1017 socket=ssl_sock, 

1018 is_verified=context.verify_mode == ssl.CERT_REQUIRED 

1019 or bool(assert_fingerprint), 

1020 ) 

1021 except BaseException: 

1022 ssl_sock.close() 

1023 raise 

1024 

1025 

1026def _match_hostname( 

1027 cert: _TYPE_PEER_CERT_RET_DICT | None, 

1028 asserted_hostname: str, 

1029 hostname_checks_common_name: bool = False, 

1030) -> None: 

1031 # Our upstream implementation of ssl.match_hostname() 

1032 # only applies this normalization to IP addresses so it doesn't 

1033 # match DNS SANs so we do the same thing! 

1034 stripped_hostname = asserted_hostname.strip("[]") 

1035 if is_ipaddress(stripped_hostname): 

1036 asserted_hostname = stripped_hostname 

1037 

1038 try: 

1039 match_hostname(cert, asserted_hostname, hostname_checks_common_name) 

1040 except CertificateError as e: 

1041 log.warning( 

1042 "Certificate did not match expected hostname: %s. Certificate: %s", 

1043 asserted_hostname, 

1044 cert, 

1045 ) 

1046 # Add cert to exception and reraise so client code can inspect 

1047 # the cert when catching the exception, if they want to 

1048 e._peer_cert = cert # type: ignore[attr-defined] 

1049 raise 

1050 

1051 

1052def _wrap_proxy_error(err: Exception, proxy_scheme: str | None) -> ProxyError: 

1053 # Look for the phrase 'wrong version number', if found 

1054 # then we should warn the user that we're very sure that 

1055 # this proxy is HTTP-only and they have a configuration issue. 

1056 error_normalized = " ".join(re.split("[^a-z]", str(err).lower())) 

1057 is_likely_http_proxy = ( 

1058 "wrong version number" in error_normalized 

1059 or "unknown protocol" in error_normalized 

1060 or "record layer failure" in error_normalized 

1061 ) 

1062 http_proxy_warning = ( 

1063 ". Your proxy appears to only use HTTP and not HTTPS, " 

1064 "try changing your proxy URL to be HTTP. See: " 

1065 "https://urllib3.readthedocs.io/en/latest/advanced-usage.html" 

1066 "#https-proxy-error-http-proxy" 

1067 ) 

1068 new_err = ProxyError( 

1069 f"Unable to connect to proxy" 

1070 f"{http_proxy_warning if is_likely_http_proxy and proxy_scheme == 'https' else ''}", 

1071 err, 

1072 ) 

1073 new_err.__cause__ = err 

1074 return new_err 

1075 

1076 

1077def _get_default_user_agent() -> str: 

1078 return f"python-urllib3/{__version__}" 

1079 

1080 

1081class DummyConnection: 

1082 """Used to detect a failed ConnectionCls import.""" 

1083 

1084 

1085if not ssl: 

1086 HTTPSConnection = DummyConnection # type: ignore[misc, assignment] # noqa: F811 

1087 

1088 

1089VerifiedHTTPSConnection = HTTPSConnection 

1090 

1091 

1092def _url_from_connection( 

1093 conn: HTTPConnection | HTTPSConnection, path: str | None = None 

1094) -> str: 

1095 """Returns the URL from a given connection. This is mainly used for testing and logging.""" 

1096 

1097 scheme = "https" if isinstance(conn, HTTPSConnection) else "http" 

1098 

1099 return Url(scheme=scheme, host=conn.host, port=conn.port, path=path).url