Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/urllib3/connection.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

414 statements  

1from __future__ import annotations 

2 

3import datetime 

4import http.client 

5import logging 

6import os 

7import re 

8import socket 

9import sys 

10import threading 

11import typing 

12import warnings 

13from http.client import HTTPConnection as _HTTPConnection 

14from http.client import HTTPException as HTTPException # noqa: F401 

15from http.client import ResponseNotReady 

16from socket import timeout as SocketTimeout 

17 

18if typing.TYPE_CHECKING: 

19 from .response import HTTPResponse 

20 from .util.ssl_ import _TYPE_PEER_CERT_RET_DICT 

21 from .util.ssltransport import SSLTransport 

22 

23from ._collections import HTTPHeaderDict 

24from .http2 import probe as http2_probe 

25from .util.response import assert_header_parsing 

26from .util.timeout import _DEFAULT_TIMEOUT, _TYPE_TIMEOUT, Timeout 

27from .util.util import to_str 

28from .util.wait import wait_for_read 

29 

30try: # Compiled with SSL? 

31 import ssl 

32 

33 BaseSSLError = ssl.SSLError 

34except (ImportError, AttributeError): 

35 ssl = None # type: ignore[assignment] 

36 

37 class BaseSSLError(BaseException): # type: ignore[no-redef] 

38 pass 

39 

40 

41from ._base_connection import _TYPE_BODY 

42from ._base_connection import ProxyConfig as ProxyConfig 

43from ._base_connection import _ResponseOptions as _ResponseOptions 

44from ._version import __version__ 

45from .exceptions import ( 

46 ConnectTimeoutError, 

47 HeaderParsingError, 

48 NameResolutionError, 

49 NewConnectionError, 

50 ProxyError, 

51 SystemTimeWarning, 

52) 

53from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection, ssl_ 

54from .util.request import body_to_chunks 

55from .util.ssl_ import assert_fingerprint as _assert_fingerprint 

56from .util.ssl_ import ( 

57 create_urllib3_context, 

58 is_ipaddress, 

59 resolve_cert_reqs, 

60 resolve_ssl_version, 

61 ssl_wrap_socket, 

62) 

63from .util.ssl_match_hostname import CertificateError, match_hostname 

64from .util.url import Url 

65 

66# Not a no-op, we're adding this to the namespace so it can be imported. 

67ConnectionError = ConnectionError 

68BrokenPipeError = BrokenPipeError 

69 

70 

71log = logging.getLogger(__name__) 

72 

73port_by_scheme = {"http": 80, "https": 443} 

74 

75# When it comes time to update this value as a part of regular maintenance 

76# (ie test_recent_date is failing) update it to ~6 months before the current date. 

77RECENT_DATE = datetime.date(2025, 1, 1) 

78 

79_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

80 

81 

82class HTTPConnection(_HTTPConnection): 

83 """ 

84 Based on :class:`http.client.HTTPConnection` but provides an extra constructor 

85 backwards-compatibility layer between older and newer Pythons. 

86 

87 Additional keyword parameters are used to configure attributes of the connection. 

88 Accepted parameters include: 

89 

90 - ``source_address``: Set the source address for the current connection. 

91 - ``socket_options``: Set specific options on the underlying socket. If not specified, then 

92 defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling 

93 Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy. 

94 

95 For example, if you wish to enable TCP Keep Alive in addition to the defaults, 

96 you might pass: 

97 

98 .. code-block:: python 

99 

100 HTTPConnection.default_socket_options + [ 

101 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), 

102 ] 

103 

104 Or you may want to disable the defaults by passing an empty list (e.g., ``[]``). 

105 """ 

106 

107 default_port: typing.ClassVar[int] = port_by_scheme["http"] # type: ignore[misc] 

108 

109 #: Disable Nagle's algorithm by default. 

110 #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]`` 

111 default_socket_options: typing.ClassVar[connection._TYPE_SOCKET_OPTIONS] = [ 

112 (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

113 ] 

114 

115 #: Whether this connection verifies the host's certificate. 

116 is_verified: bool = False 

117 

118 #: Whether this proxy connection verified the proxy host's certificate. 

119 # If no proxy is currently connected to the value will be ``None``. 

120 proxy_is_verified: bool | None = None 

121 

122 blocksize: int 

123 source_address: tuple[str, int] | None 

124 socket_options: connection._TYPE_SOCKET_OPTIONS | None 

125 

126 _has_connected_to_proxy: bool 

127 _response_options: _ResponseOptions | None 

128 _tunnel_host: str | None 

129 _tunnel_port: int | None 

130 _tunnel_scheme: str | None 

131 

132 def __init__( 

133 self, 

134 host: str, 

135 port: int | None = None, 

136 *, 

137 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, 

138 source_address: tuple[str, int] | None = None, 

139 blocksize: int = 16384, 

140 socket_options: None | ( 

141 connection._TYPE_SOCKET_OPTIONS 

142 ) = default_socket_options, 

143 proxy: Url | None = None, 

144 proxy_config: ProxyConfig | None = None, 

145 ) -> None: 

146 super().__init__( 

147 host=host, 

148 port=port, 

149 timeout=Timeout.resolve_default_timeout(timeout), 

150 source_address=source_address, 

151 blocksize=blocksize, 

152 ) 

153 self.socket_options = socket_options 

154 self.proxy = proxy 

155 self.proxy_config = proxy_config 

156 

157 self._has_connected_to_proxy = False 

158 self._response_options = None 

159 self._tunnel_host: str | None = None 

160 self._tunnel_port: int | None = None 

161 self._tunnel_scheme: str | None = None 

162 

163 @property 

164 def host(self) -> str: 

165 """ 

166 Getter method to remove any trailing dots that indicate the hostname is an FQDN. 

167 

168 In general, SSL certificates don't include the trailing dot indicating a 

169 fully-qualified domain name, and thus, they don't validate properly when 

170 checked against a domain name that includes the dot. In addition, some 

171 servers may not expect to receive the trailing dot when provided. 

172 

173 However, the hostname with trailing dot is critical to DNS resolution; doing a 

174 lookup with the trailing dot will properly only resolve the appropriate FQDN, 

175 whereas a lookup without a trailing dot will search the system's search domain 

176 list. Thus, it's important to keep the original host around for use only in 

177 those cases where it's appropriate (i.e., when doing DNS lookup to establish the 

178 actual TCP connection across which we're going to send HTTP requests). 

179 """ 

180 return self._dns_host.rstrip(".") 

181 

182 @host.setter 

183 def host(self, value: str) -> None: 

184 """ 

185 Setter for the `host` property. 

186 

187 We assume that only urllib3 uses the _dns_host attribute; httplib itself 

188 only uses `host`, and it seems reasonable that other libraries follow suit. 

189 """ 

190 self._dns_host = value 

191 

192 def _new_conn(self) -> socket.socket: 

193 """Establish a socket connection and set nodelay settings on it. 

194 

195 :return: New socket connection. 

196 """ 

197 try: 

198 sock = connection.create_connection( 

199 (self._dns_host, self.port), 

200 self.timeout, 

201 source_address=self.source_address, 

202 socket_options=self.socket_options, 

203 ) 

204 except socket.gaierror as e: 

205 raise NameResolutionError(self.host, self, e) from e 

206 except SocketTimeout as e: 

207 raise ConnectTimeoutError( 

208 self, 

209 f"Connection to {self.host} timed out. (connect timeout={self.timeout})", 

210 ) from e 

211 

212 except OSError as e: 

213 raise NewConnectionError( 

214 self, f"Failed to establish a new connection: {e}" 

215 ) from e 

216 

217 sys.audit("http.client.connect", self, self.host, self.port) 

218 

219 return sock 

220 

221 def set_tunnel( 

222 self, 

223 host: str, 

224 port: int | None = None, 

225 headers: typing.Mapping[str, str] | None = None, 

226 scheme: str = "http", 

227 ) -> None: 

228 if scheme not in ("http", "https"): 

229 raise ValueError( 

230 f"Invalid proxy scheme for tunneling: {scheme!r}, must be either 'http' or 'https'" 

231 ) 

232 super().set_tunnel(host, port=port, headers=headers) 

233 self._tunnel_scheme = scheme 

234 

235 if sys.version_info < (3, 11, 9) or ((3, 12) <= sys.version_info < (3, 12, 3)): 

236 # Taken from python/cpython#100986 which was backported in 3.11.9 and 3.12.3. 

237 # When using connection_from_host, host will come without brackets. 

238 def _wrap_ipv6(self, ip: bytes) -> bytes: 

239 if b":" in ip and ip[0] != b"["[0]: 

240 return b"[" + ip + b"]" 

241 return ip 

242 

243 if sys.version_info < (3, 11, 9): 

244 # `_tunnel` copied from 3.11.13 backporting 

245 # https://github.com/python/cpython/commit/0d4026432591d43185568dd31cef6a034c4b9261 

246 # and https://github.com/python/cpython/commit/6fbc61070fda2ffb8889e77e3b24bca4249ab4d1 

247 def _tunnel(self) -> None: 

248 _MAXLINE = http.client._MAXLINE # type: ignore[attr-defined] 

249 connect = b"CONNECT %s:%d HTTP/1.0\r\n" % ( # type: ignore[str-format] 

250 self._wrap_ipv6(self._tunnel_host.encode("ascii")), # type: ignore[union-attr] 

251 self._tunnel_port, 

252 ) 

253 headers = [connect] 

254 for header, value in self._tunnel_headers.items(): # type: ignore[attr-defined] 

255 headers.append(f"{header}: {value}\r\n".encode("latin-1")) 

256 headers.append(b"\r\n") 

257 # Making a single send() call instead of one per line encourages 

258 # the host OS to use a more optimal packet size instead of 

259 # potentially emitting a series of small packets. 

260 self.send(b"".join(headers)) 

261 del headers 

262 

263 response = self.response_class(self.sock, method=self._method) # type: ignore[attr-defined] 

264 try: 

265 (version, code, message) = response._read_status() # type: ignore[attr-defined] 

266 

267 if code != http.HTTPStatus.OK: 

268 self.close() 

269 raise OSError( 

270 f"Tunnel connection failed: {code} {message.strip()}" 

271 ) 

272 while True: 

273 line = response.fp.readline(_MAXLINE + 1) 

274 if len(line) > _MAXLINE: 

275 raise http.client.LineTooLong("header line") 

276 if not line: 

277 # for sites which EOF without sending a trailer 

278 break 

279 if line in (b"\r\n", b"\n", b""): 

280 break 

281 

282 if self.debuglevel > 0: 

283 print("header:", line.decode()) 

284 finally: 

285 response.close() 

286 

287 elif (3, 12) <= sys.version_info < (3, 12, 3): 

288 # `_tunnel` copied from 3.12.11 backporting 

289 # https://github.com/python/cpython/commit/23aef575c7629abcd4aaf028ebd226fb41a4b3c8 

290 def _tunnel(self) -> None: # noqa: F811 

291 connect = b"CONNECT %s:%d HTTP/1.1\r\n" % ( # type: ignore[str-format] 

292 self._wrap_ipv6(self._tunnel_host.encode("idna")), # type: ignore[union-attr] 

293 self._tunnel_port, 

294 ) 

295 headers = [connect] 

296 for header, value in self._tunnel_headers.items(): # type: ignore[attr-defined] 

297 headers.append(f"{header}: {value}\r\n".encode("latin-1")) 

298 headers.append(b"\r\n") 

299 # Making a single send() call instead of one per line encourages 

300 # the host OS to use a more optimal packet size instead of 

301 # potentially emitting a series of small packets. 

302 self.send(b"".join(headers)) 

303 del headers 

304 

305 response = self.response_class(self.sock, method=self._method) # type: ignore[attr-defined] 

306 try: 

307 (version, code, message) = response._read_status() # type: ignore[attr-defined] 

308 

309 self._raw_proxy_headers = http.client._read_headers(response.fp) # type: ignore[attr-defined] 

310 

311 if self.debuglevel > 0: 

312 for header in self._raw_proxy_headers: 

313 print("header:", header.decode()) 

314 

315 if code != http.HTTPStatus.OK: 

316 self.close() 

317 raise OSError( 

318 f"Tunnel connection failed: {code} {message.strip()}" 

319 ) 

320 

321 finally: 

322 response.close() 

323 

324 def connect(self) -> None: 

325 self.sock = self._new_conn() 

326 if self._tunnel_host: 

327 # If we're tunneling it means we're connected to our proxy. 

328 self._has_connected_to_proxy = True 

329 

330 # TODO: Fix tunnel so it doesn't depend on self.sock state. 

331 self._tunnel() 

332 

333 # If there's a proxy to be connected to we are fully connected. 

334 # This is set twice (once above and here) due to forwarding proxies 

335 # not using tunnelling. 

336 self._has_connected_to_proxy = bool(self.proxy) 

337 

338 if self._has_connected_to_proxy: 

339 self.proxy_is_verified = False 

340 

341 @property 

342 def is_closed(self) -> bool: 

343 return self.sock is None 

344 

345 @property 

346 def is_connected(self) -> bool: 

347 if self.sock is None: 

348 return False 

349 return not wait_for_read(self.sock, timeout=0.0) 

350 

351 @property 

352 def has_connected_to_proxy(self) -> bool: 

353 return self._has_connected_to_proxy 

354 

355 @property 

356 def proxy_is_forwarding(self) -> bool: 

357 """ 

358 Return True if a forwarding proxy is configured, else return False 

359 """ 

360 return bool(self.proxy) and self._tunnel_host is None 

361 

362 @property 

363 def proxy_is_tunneling(self) -> bool: 

364 """ 

365 Return True if a tunneling proxy is configured, else return False 

366 """ 

367 return self._tunnel_host is not None 

368 

369 def close(self) -> None: 

370 try: 

371 super().close() 

372 finally: 

373 # Reset all stateful properties so connection 

374 # can be re-used without leaking prior configs. 

375 self.sock = None 

376 self.is_verified = False 

377 self.proxy_is_verified = None 

378 self._has_connected_to_proxy = False 

379 self._response_options = None 

380 self._tunnel_host = None 

381 self._tunnel_port = None 

382 self._tunnel_scheme = None 

383 

384 def putrequest( 

385 self, 

386 method: str, 

387 url: str, 

388 skip_host: bool = False, 

389 skip_accept_encoding: bool = False, 

390 ) -> None: 

391 """""" 

392 # Empty docstring because the indentation of CPython's implementation 

393 # is broken but we don't want this method in our documentation. 

394 match = _CONTAINS_CONTROL_CHAR_RE.search(method) 

395 if match: 

396 raise ValueError( 

397 f"Method cannot contain non-token characters {method!r} (found at least {match.group()!r})" 

398 ) 

399 

400 return super().putrequest( 

401 method, url, skip_host=skip_host, skip_accept_encoding=skip_accept_encoding 

402 ) 

403 

404 def putheader(self, header: str, *values: str) -> None: # type: ignore[override] 

405 """""" 

406 if not any(isinstance(v, str) and v == SKIP_HEADER for v in values): 

407 super().putheader(header, *values) 

408 elif to_str(header.lower()) not in SKIPPABLE_HEADERS: 

409 skippable_headers = "', '".join( 

410 [str.title(header) for header in sorted(SKIPPABLE_HEADERS)] 

411 ) 

412 raise ValueError( 

413 f"urllib3.util.SKIP_HEADER only supports '{skippable_headers}'" 

414 ) 

415 

416 # `request` method's signature intentionally violates LSP. 

417 # urllib3's API is different from `http.client.HTTPConnection` and the subclassing is only incidental. 

418 def request( # type: ignore[override] 

419 self, 

420 method: str, 

421 url: str, 

422 body: _TYPE_BODY | None = None, 

423 headers: typing.Mapping[str, str] | None = None, 

424 *, 

425 chunked: bool = False, 

426 preload_content: bool = True, 

427 decode_content: bool = True, 

428 enforce_content_length: bool = True, 

429 ) -> None: 

430 # Update the inner socket's timeout value to send the request. 

431 # This only triggers if the connection is re-used. 

432 if self.sock is not None: 

433 self.sock.settimeout(self.timeout) 

434 

435 # Store these values to be fed into the HTTPResponse 

436 # object later. TODO: Remove this in favor of a real 

437 # HTTP lifecycle mechanism. 

438 

439 # We have to store these before we call .request() 

440 # because sometimes we can still salvage a response 

441 # off the wire even if we aren't able to completely 

442 # send the request body. 

443 self._response_options = _ResponseOptions( 

444 request_method=method, 

445 request_url=url, 

446 preload_content=preload_content, 

447 decode_content=decode_content, 

448 enforce_content_length=enforce_content_length, 

449 ) 

450 

451 if headers is None: 

452 headers = {} 

453 header_keys = frozenset(to_str(k.lower()) for k in headers) 

454 skip_accept_encoding = "accept-encoding" in header_keys 

455 skip_host = "host" in header_keys 

456 self.putrequest( 

457 method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host 

458 ) 

459 

460 # Transform the body into an iterable of sendall()-able chunks 

461 # and detect if an explicit Content-Length is doable. 

462 chunks_and_cl = body_to_chunks(body, method=method, blocksize=self.blocksize) 

463 chunks = chunks_and_cl.chunks 

464 content_length = chunks_and_cl.content_length 

465 

466 # When chunked is explicit set to 'True' we respect that. 

467 if chunked: 

468 if "transfer-encoding" not in header_keys: 

469 self.putheader("Transfer-Encoding", "chunked") 

470 else: 

471 # Detect whether a framing mechanism is already in use. If so 

472 # we respect that value, otherwise we pick chunked vs content-length 

473 # depending on the type of 'body'. 

474 if "content-length" in header_keys: 

475 chunked = False 

476 elif "transfer-encoding" in header_keys: 

477 chunked = True 

478 

479 # Otherwise we go off the recommendation of 'body_to_chunks()'. 

480 else: 

481 chunked = False 

482 if content_length is None: 

483 if chunks is not None: 

484 chunked = True 

485 self.putheader("Transfer-Encoding", "chunked") 

486 else: 

487 self.putheader("Content-Length", str(content_length)) 

488 

489 # Now that framing headers are out of the way we send all the other headers. 

490 if "user-agent" not in header_keys: 

491 self.putheader("User-Agent", _get_default_user_agent()) 

492 for header, value in headers.items(): 

493 self.putheader(header, value) 

494 self.endheaders() 

495 

496 # If we're given a body we start sending that in chunks. 

497 if chunks is not None: 

498 for chunk in chunks: 

499 # Sending empty chunks isn't allowed for TE: chunked 

500 # as it indicates the end of the body. 

501 if not chunk: 

502 continue 

503 if isinstance(chunk, str): 

504 chunk = chunk.encode("utf-8") 

505 if chunked: 

506 self.send(b"%x\r\n%b\r\n" % (len(chunk), chunk)) 

507 else: 

508 self.send(chunk) 

509 

510 # Regardless of whether we have a body or not, if we're in 

511 # chunked mode we want to send an explicit empty chunk. 

512 if chunked: 

513 self.send(b"0\r\n\r\n") 

514 

515 def request_chunked( 

516 self, 

517 method: str, 

518 url: str, 

519 body: _TYPE_BODY | None = None, 

520 headers: typing.Mapping[str, str] | None = None, 

521 ) -> None: 

522 """ 

523 Alternative to the common request method, which sends the 

524 body with chunked encoding and not as one block 

525 """ 

526 warnings.warn( 

527 "HTTPConnection.request_chunked() is deprecated and will be removed " 

528 "in urllib3 v2.1.0. Instead use HTTPConnection.request(..., chunked=True).", 

529 category=DeprecationWarning, 

530 stacklevel=2, 

531 ) 

532 self.request(method, url, body=body, headers=headers, chunked=True) 

533 

534 def getresponse( # type: ignore[override] 

535 self, 

536 ) -> HTTPResponse: 

537 """ 

538 Get the response from the server. 

539 

540 If the HTTPConnection is in the correct state, returns an instance of HTTPResponse or of whatever object is returned by the response_class variable. 

541 

542 If a request has not been sent or if a previous response has not be handled, ResponseNotReady is raised. If the HTTP response indicates that the connection should be closed, then it will be closed before the response is returned. When the connection is closed, the underlying socket is closed. 

543 """ 

544 # Raise the same error as http.client.HTTPConnection 

545 if self._response_options is None: 

546 raise ResponseNotReady() 

547 

548 # Reset this attribute for being used again. 

549 resp_options = self._response_options 

550 self._response_options = None 

551 

552 # Since the connection's timeout value may have been updated 

553 # we need to set the timeout on the socket. 

554 self.sock.settimeout(self.timeout) 

555 

556 # This is needed here to avoid circular import errors 

557 from .response import HTTPResponse 

558 

559 # Save a reference to the shutdown function before ownership is passed 

560 # to httplib_response 

561 # TODO should we implement it everywhere? 

562 _shutdown = getattr(self.sock, "shutdown", None) 

563 

564 # Get the response from http.client.HTTPConnection 

565 httplib_response = super().getresponse() 

566 

567 try: 

568 assert_header_parsing(httplib_response.msg) 

569 except (HeaderParsingError, TypeError) as hpe: 

570 log.warning( 

571 "Failed to parse headers (url=%s): %s", 

572 _url_from_connection(self, resp_options.request_url), 

573 hpe, 

574 exc_info=True, 

575 ) 

576 

577 headers = HTTPHeaderDict(httplib_response.msg.items()) 

578 

579 response = HTTPResponse( 

580 body=httplib_response, 

581 headers=headers, 

582 status=httplib_response.status, 

583 version=httplib_response.version, 

584 version_string=getattr(self, "_http_vsn_str", "HTTP/?"), 

585 reason=httplib_response.reason, 

586 preload_content=resp_options.preload_content, 

587 decode_content=resp_options.decode_content, 

588 original_response=httplib_response, 

589 enforce_content_length=resp_options.enforce_content_length, 

590 request_method=resp_options.request_method, 

591 request_url=resp_options.request_url, 

592 sock_shutdown=_shutdown, 

593 ) 

594 return response 

595 

596 

597class HTTPSConnection(HTTPConnection): 

598 """ 

599 Many of the parameters to this constructor are passed to the underlying SSL 

600 socket by means of :py:func:`urllib3.util.ssl_wrap_socket`. 

601 """ 

602 

603 default_port = port_by_scheme["https"] # type: ignore[misc] 

604 

605 cert_reqs: int | str | None = None 

606 ca_certs: str | None = None 

607 ca_cert_dir: str | None = None 

608 ca_cert_data: None | str | bytes = None 

609 ssl_version: int | str | None = None 

610 ssl_minimum_version: int | None = None 

611 ssl_maximum_version: int | None = None 

612 assert_fingerprint: str | None = None 

613 _connect_callback: typing.Callable[..., None] | None = None 

614 

615 def __init__( 

616 self, 

617 host: str, 

618 port: int | None = None, 

619 *, 

620 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, 

621 source_address: tuple[str, int] | None = None, 

622 blocksize: int = 16384, 

623 socket_options: None | ( 

624 connection._TYPE_SOCKET_OPTIONS 

625 ) = HTTPConnection.default_socket_options, 

626 proxy: Url | None = None, 

627 proxy_config: ProxyConfig | None = None, 

628 cert_reqs: int | str | None = None, 

629 assert_hostname: None | str | typing.Literal[False] = None, 

630 assert_fingerprint: str | None = None, 

631 server_hostname: str | None = None, 

632 ssl_context: ssl.SSLContext | None = None, 

633 ca_certs: str | None = None, 

634 ca_cert_dir: str | None = None, 

635 ca_cert_data: None | str | bytes = None, 

636 ssl_minimum_version: int | None = None, 

637 ssl_maximum_version: int | None = None, 

638 ssl_version: int | str | None = None, # Deprecated 

639 cert_file: str | None = None, 

640 key_file: str | None = None, 

641 key_password: str | None = None, 

642 ) -> None: 

643 super().__init__( 

644 host, 

645 port=port, 

646 timeout=timeout, 

647 source_address=source_address, 

648 blocksize=blocksize, 

649 socket_options=socket_options, 

650 proxy=proxy, 

651 proxy_config=proxy_config, 

652 ) 

653 

654 self.key_file = key_file 

655 self.cert_file = cert_file 

656 self.key_password = key_password 

657 self.ssl_context = ssl_context 

658 self.server_hostname = server_hostname 

659 self.assert_hostname = assert_hostname 

660 self.assert_fingerprint = assert_fingerprint 

661 self.ssl_version = ssl_version 

662 self.ssl_minimum_version = ssl_minimum_version 

663 self.ssl_maximum_version = ssl_maximum_version 

664 self.ca_certs = ca_certs and os.path.expanduser(ca_certs) 

665 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir) 

666 self.ca_cert_data = ca_cert_data 

667 

668 # cert_reqs depends on ssl_context so calculate last. 

669 if cert_reqs is None: 

670 if self.ssl_context is not None: 

671 cert_reqs = self.ssl_context.verify_mode 

672 else: 

673 cert_reqs = resolve_cert_reqs(None) 

674 self.cert_reqs = cert_reqs 

675 self._connect_callback = None 

676 

677 def set_cert( 

678 self, 

679 key_file: str | None = None, 

680 cert_file: str | None = None, 

681 cert_reqs: int | str | None = None, 

682 key_password: str | None = None, 

683 ca_certs: str | None = None, 

684 assert_hostname: None | str | typing.Literal[False] = None, 

685 assert_fingerprint: str | None = None, 

686 ca_cert_dir: str | None = None, 

687 ca_cert_data: None | str | bytes = None, 

688 ) -> None: 

689 """ 

690 This method should only be called once, before the connection is used. 

691 """ 

692 warnings.warn( 

693 "HTTPSConnection.set_cert() is deprecated and will be removed " 

694 "in urllib3 v2.1.0. Instead provide the parameters to the " 

695 "HTTPSConnection constructor.", 

696 category=DeprecationWarning, 

697 stacklevel=2, 

698 ) 

699 

700 # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also 

701 # have an SSLContext object in which case we'll use its verify_mode. 

702 if cert_reqs is None: 

703 if self.ssl_context is not None: 

704 cert_reqs = self.ssl_context.verify_mode 

705 else: 

706 cert_reqs = resolve_cert_reqs(None) 

707 

708 self.key_file = key_file 

709 self.cert_file = cert_file 

710 self.cert_reqs = cert_reqs 

711 self.key_password = key_password 

712 self.assert_hostname = assert_hostname 

713 self.assert_fingerprint = assert_fingerprint 

714 self.ca_certs = ca_certs and os.path.expanduser(ca_certs) 

715 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir) 

716 self.ca_cert_data = ca_cert_data 

717 

718 def connect(self) -> None: 

719 # Today we don't need to be doing this step before the /actual/ socket 

720 # connection, however in the future we'll need to decide whether to 

721 # create a new socket or re-use an existing "shared" socket as a part 

722 # of the HTTP/2 handshake dance. 

723 if self._tunnel_host is not None and self._tunnel_port is not None: 

724 probe_http2_host = self._tunnel_host 

725 probe_http2_port = self._tunnel_port 

726 else: 

727 probe_http2_host = self.host 

728 probe_http2_port = self.port 

729 

730 # Check if the target origin supports HTTP/2. 

731 # If the value comes back as 'None' it means that the current thread 

732 # is probing for HTTP/2 support. Otherwise, we're waiting for another 

733 # probe to complete, or we get a value right away. 

734 target_supports_http2: bool | None 

735 if "h2" in ssl_.ALPN_PROTOCOLS: 

736 target_supports_http2 = http2_probe.acquire_and_get( 

737 host=probe_http2_host, port=probe_http2_port 

738 ) 

739 else: 

740 # If HTTP/2 isn't going to be offered it doesn't matter if 

741 # the target supports HTTP/2. Don't want to make a probe. 

742 target_supports_http2 = False 

743 

744 if self._connect_callback is not None: 

745 self._connect_callback( 

746 "before connect", 

747 thread_id=threading.get_ident(), 

748 target_supports_http2=target_supports_http2, 

749 ) 

750 

751 try: 

752 sock: socket.socket | ssl.SSLSocket 

753 self.sock = sock = self._new_conn() 

754 server_hostname: str = self.host 

755 tls_in_tls = False 

756 

757 # Do we need to establish a tunnel? 

758 if self.proxy_is_tunneling: 

759 # We're tunneling to an HTTPS origin so need to do TLS-in-TLS. 

760 if self._tunnel_scheme == "https": 

761 # _connect_tls_proxy will verify and assign proxy_is_verified 

762 self.sock = sock = self._connect_tls_proxy(self.host, sock) 

763 tls_in_tls = True 

764 elif self._tunnel_scheme == "http": 

765 self.proxy_is_verified = False 

766 

767 # If we're tunneling it means we're connected to our proxy. 

768 self._has_connected_to_proxy = True 

769 

770 self._tunnel() 

771 # Override the host with the one we're requesting data from. 

772 server_hostname = typing.cast(str, self._tunnel_host) 

773 

774 if self.server_hostname is not None: 

775 server_hostname = self.server_hostname 

776 

777 is_time_off = datetime.date.today() < RECENT_DATE 

778 if is_time_off: 

779 warnings.warn( 

780 ( 

781 f"System time is way off (before {RECENT_DATE}). This will probably " 

782 "lead to SSL verification errors" 

783 ), 

784 SystemTimeWarning, 

785 ) 

786 

787 # Remove trailing '.' from fqdn hostnames to allow certificate validation 

788 server_hostname_rm_dot = server_hostname.rstrip(".") 

789 

790 sock_and_verified = _ssl_wrap_socket_and_match_hostname( 

791 sock=sock, 

792 cert_reqs=self.cert_reqs, 

793 ssl_version=self.ssl_version, 

794 ssl_minimum_version=self.ssl_minimum_version, 

795 ssl_maximum_version=self.ssl_maximum_version, 

796 ca_certs=self.ca_certs, 

797 ca_cert_dir=self.ca_cert_dir, 

798 ca_cert_data=self.ca_cert_data, 

799 cert_file=self.cert_file, 

800 key_file=self.key_file, 

801 key_password=self.key_password, 

802 server_hostname=server_hostname_rm_dot, 

803 ssl_context=self.ssl_context, 

804 tls_in_tls=tls_in_tls, 

805 assert_hostname=self.assert_hostname, 

806 assert_fingerprint=self.assert_fingerprint, 

807 ) 

808 self.sock = sock_and_verified.socket 

809 

810 # If an error occurs during connection/handshake we may need to release 

811 # our lock so another connection can probe the origin. 

812 except BaseException: 

813 if self._connect_callback is not None: 

814 self._connect_callback( 

815 "after connect failure", 

816 thread_id=threading.get_ident(), 

817 target_supports_http2=target_supports_http2, 

818 ) 

819 

820 if target_supports_http2 is None: 

821 http2_probe.set_and_release( 

822 host=probe_http2_host, port=probe_http2_port, supports_http2=None 

823 ) 

824 raise 

825 

826 # If this connection doesn't know if the origin supports HTTP/2 

827 # we report back to the HTTP/2 probe our result. 

828 if target_supports_http2 is None: 

829 supports_http2 = sock_and_verified.socket.selected_alpn_protocol() == "h2" 

830 http2_probe.set_and_release( 

831 host=probe_http2_host, 

832 port=probe_http2_port, 

833 supports_http2=supports_http2, 

834 ) 

835 

836 # Forwarding proxies can never have a verified target since 

837 # the proxy is the one doing the verification. Should instead 

838 # use a CONNECT tunnel in order to verify the target. 

839 # See: https://github.com/urllib3/urllib3/issues/3267. 

840 if self.proxy_is_forwarding: 

841 self.is_verified = False 

842 else: 

843 self.is_verified = sock_and_verified.is_verified 

844 

845 # If there's a proxy to be connected to we are fully connected. 

846 # This is set twice (once above and here) due to forwarding proxies 

847 # not using tunnelling. 

848 self._has_connected_to_proxy = bool(self.proxy) 

849 

850 # Set `self.proxy_is_verified` unless it's already set while 

851 # establishing a tunnel. 

852 if self._has_connected_to_proxy and self.proxy_is_verified is None: 

853 self.proxy_is_verified = sock_and_verified.is_verified 

854 

855 def _connect_tls_proxy(self, hostname: str, sock: socket.socket) -> ssl.SSLSocket: 

856 """ 

857 Establish a TLS connection to the proxy using the provided SSL context. 

858 """ 

859 # `_connect_tls_proxy` is called when self._tunnel_host is truthy. 

860 proxy_config = typing.cast(ProxyConfig, self.proxy_config) 

861 ssl_context = proxy_config.ssl_context 

862 sock_and_verified = _ssl_wrap_socket_and_match_hostname( 

863 sock, 

864 cert_reqs=self.cert_reqs, 

865 ssl_version=self.ssl_version, 

866 ssl_minimum_version=self.ssl_minimum_version, 

867 ssl_maximum_version=self.ssl_maximum_version, 

868 ca_certs=self.ca_certs, 

869 ca_cert_dir=self.ca_cert_dir, 

870 ca_cert_data=self.ca_cert_data, 

871 server_hostname=hostname, 

872 ssl_context=ssl_context, 

873 assert_hostname=proxy_config.assert_hostname, 

874 assert_fingerprint=proxy_config.assert_fingerprint, 

875 # Features that aren't implemented for proxies yet: 

876 cert_file=None, 

877 key_file=None, 

878 key_password=None, 

879 tls_in_tls=False, 

880 ) 

881 self.proxy_is_verified = sock_and_verified.is_verified 

882 return sock_and_verified.socket # type: ignore[return-value] 

883 

884 

885class _WrappedAndVerifiedSocket(typing.NamedTuple): 

886 """ 

887 Wrapped socket and whether the connection is 

888 verified after the TLS handshake 

889 """ 

890 

891 socket: ssl.SSLSocket | SSLTransport 

892 is_verified: bool 

893 

894 

895def _ssl_wrap_socket_and_match_hostname( 

896 sock: socket.socket, 

897 *, 

898 cert_reqs: None | str | int, 

899 ssl_version: None | str | int, 

900 ssl_minimum_version: int | None, 

901 ssl_maximum_version: int | None, 

902 cert_file: str | None, 

903 key_file: str | None, 

904 key_password: str | None, 

905 ca_certs: str | None, 

906 ca_cert_dir: str | None, 

907 ca_cert_data: None | str | bytes, 

908 assert_hostname: None | str | typing.Literal[False], 

909 assert_fingerprint: str | None, 

910 server_hostname: str | None, 

911 ssl_context: ssl.SSLContext | None, 

912 tls_in_tls: bool = False, 

913) -> _WrappedAndVerifiedSocket: 

914 """Logic for constructing an SSLContext from all TLS parameters, passing 

915 that down into ssl_wrap_socket, and then doing certificate verification 

916 either via hostname or fingerprint. This function exists to guarantee 

917 that both proxies and targets have the same behavior when connecting via TLS. 

918 """ 

919 default_ssl_context = False 

920 if ssl_context is None: 

921 default_ssl_context = True 

922 context = create_urllib3_context( 

923 ssl_version=resolve_ssl_version(ssl_version), 

924 ssl_minimum_version=ssl_minimum_version, 

925 ssl_maximum_version=ssl_maximum_version, 

926 cert_reqs=resolve_cert_reqs(cert_reqs), 

927 ) 

928 else: 

929 context = ssl_context 

930 

931 context.verify_mode = resolve_cert_reqs(cert_reqs) 

932 

933 # In some cases, we want to verify hostnames ourselves 

934 if ( 

935 # `ssl` can't verify fingerprints or alternate hostnames 

936 assert_fingerprint 

937 or assert_hostname 

938 # assert_hostname can be set to False to disable hostname checking 

939 or assert_hostname is False 

940 # We still support OpenSSL 1.0.2, which prevents us from verifying 

941 # hostnames easily: https://github.com/pyca/pyopenssl/pull/933 

942 or ssl_.IS_PYOPENSSL 

943 or not ssl_.HAS_NEVER_CHECK_COMMON_NAME 

944 ): 

945 context.check_hostname = False 

946 

947 # Try to load OS default certs if none are given. We need to do the hasattr() check 

948 # for custom pyOpenSSL SSLContext objects because they don't support 

949 # load_default_certs(). 

950 if ( 

951 not ca_certs 

952 and not ca_cert_dir 

953 and not ca_cert_data 

954 and default_ssl_context 

955 and hasattr(context, "load_default_certs") 

956 ): 

957 context.load_default_certs() 

958 

959 # Ensure that IPv6 addresses are in the proper format and don't have a 

960 # scope ID. Python's SSL module fails to recognize scoped IPv6 addresses 

961 # and interprets them as DNS hostnames. 

962 if server_hostname is not None: 

963 normalized = server_hostname.strip("[]") 

964 if "%" in normalized: 

965 normalized = normalized[: normalized.rfind("%")] 

966 if is_ipaddress(normalized): 

967 server_hostname = normalized 

968 

969 ssl_sock = ssl_wrap_socket( 

970 sock=sock, 

971 keyfile=key_file, 

972 certfile=cert_file, 

973 key_password=key_password, 

974 ca_certs=ca_certs, 

975 ca_cert_dir=ca_cert_dir, 

976 ca_cert_data=ca_cert_data, 

977 server_hostname=server_hostname, 

978 ssl_context=context, 

979 tls_in_tls=tls_in_tls, 

980 ) 

981 

982 try: 

983 if assert_fingerprint: 

984 _assert_fingerprint( 

985 ssl_sock.getpeercert(binary_form=True), assert_fingerprint 

986 ) 

987 elif ( 

988 context.verify_mode != ssl.CERT_NONE 

989 and not context.check_hostname 

990 and assert_hostname is not False 

991 ): 

992 cert: _TYPE_PEER_CERT_RET_DICT = ssl_sock.getpeercert() # type: ignore[assignment] 

993 

994 # Need to signal to our match_hostname whether to use 'commonName' or not. 

995 # If we're using our own constructed SSLContext we explicitly set 'False' 

996 # because PyPy hard-codes 'True' from SSLContext.hostname_checks_common_name. 

997 if default_ssl_context: 

998 hostname_checks_common_name = False 

999 else: 

1000 hostname_checks_common_name = ( 

1001 getattr(context, "hostname_checks_common_name", False) or False 

1002 ) 

1003 

1004 _match_hostname( 

1005 cert, 

1006 assert_hostname or server_hostname, # type: ignore[arg-type] 

1007 hostname_checks_common_name, 

1008 ) 

1009 

1010 return _WrappedAndVerifiedSocket( 

1011 socket=ssl_sock, 

1012 is_verified=context.verify_mode == ssl.CERT_REQUIRED 

1013 or bool(assert_fingerprint), 

1014 ) 

1015 except BaseException: 

1016 ssl_sock.close() 

1017 raise 

1018 

1019 

1020def _match_hostname( 

1021 cert: _TYPE_PEER_CERT_RET_DICT | None, 

1022 asserted_hostname: str, 

1023 hostname_checks_common_name: bool = False, 

1024) -> None: 

1025 # Our upstream implementation of ssl.match_hostname() 

1026 # only applies this normalization to IP addresses so it doesn't 

1027 # match DNS SANs so we do the same thing! 

1028 stripped_hostname = asserted_hostname.strip("[]") 

1029 if is_ipaddress(stripped_hostname): 

1030 asserted_hostname = stripped_hostname 

1031 

1032 try: 

1033 match_hostname(cert, asserted_hostname, hostname_checks_common_name) 

1034 except CertificateError as e: 

1035 log.warning( 

1036 "Certificate did not match expected hostname: %s. Certificate: %s", 

1037 asserted_hostname, 

1038 cert, 

1039 ) 

1040 # Add cert to exception and reraise so client code can inspect 

1041 # the cert when catching the exception, if they want to 

1042 e._peer_cert = cert # type: ignore[attr-defined] 

1043 raise 

1044 

1045 

1046def _wrap_proxy_error(err: Exception, proxy_scheme: str | None) -> ProxyError: 

1047 # Look for the phrase 'wrong version number', if found 

1048 # then we should warn the user that we're very sure that 

1049 # this proxy is HTTP-only and they have a configuration issue. 

1050 error_normalized = " ".join(re.split("[^a-z]", str(err).lower())) 

1051 is_likely_http_proxy = ( 

1052 "wrong version number" in error_normalized 

1053 or "unknown protocol" in error_normalized 

1054 or "record layer failure" in error_normalized 

1055 ) 

1056 http_proxy_warning = ( 

1057 ". Your proxy appears to only use HTTP and not HTTPS, " 

1058 "try changing your proxy URL to be HTTP. See: " 

1059 "https://urllib3.readthedocs.io/en/latest/advanced-usage.html" 

1060 "#https-proxy-error-http-proxy" 

1061 ) 

1062 new_err = ProxyError( 

1063 f"Unable to connect to proxy" 

1064 f"{http_proxy_warning if is_likely_http_proxy and proxy_scheme == 'https' else ''}", 

1065 err, 

1066 ) 

1067 new_err.__cause__ = err 

1068 return new_err 

1069 

1070 

1071def _get_default_user_agent() -> str: 

1072 return f"python-urllib3/{__version__}" 

1073 

1074 

1075class DummyConnection: 

1076 """Used to detect a failed ConnectionCls import.""" 

1077 

1078 

1079if not ssl: 

1080 HTTPSConnection = DummyConnection # type: ignore[misc, assignment] # noqa: F811 

1081 

1082 

1083VerifiedHTTPSConnection = HTTPSConnection 

1084 

1085 

1086def _url_from_connection( 

1087 conn: HTTPConnection | HTTPSConnection, path: str | None = None 

1088) -> str: 

1089 """Returns the URL from a given connection. This is mainly used for testing and logging.""" 

1090 

1091 scheme = "https" if isinstance(conn, HTTPSConnection) else "http" 

1092 

1093 return Url(scheme=scheme, host=conn.host, port=conn.port, path=path).url