1from __future__ import annotations
2
3import datetime
4import logging
5import os
6import re
7import socket
8import sys
9import typing
10import warnings
11from http.client import HTTPConnection as _HTTPConnection
12from http.client import HTTPException as HTTPException # noqa: F401
13from http.client import ResponseNotReady
14from socket import timeout as SocketTimeout
15
16if typing.TYPE_CHECKING:
17 from .response import HTTPResponse
18 from .util.ssl_ import _TYPE_PEER_CERT_RET_DICT
19 from .util.ssltransport import SSLTransport
20
21from ._collections import HTTPHeaderDict
22from .util.response import assert_header_parsing
23from .util.timeout import _DEFAULT_TIMEOUT, _TYPE_TIMEOUT, Timeout
24from .util.util import to_str
25from .util.wait import wait_for_read
26
27try: # Compiled with SSL?
28 import ssl
29
30 BaseSSLError = ssl.SSLError
31except (ImportError, AttributeError):
32 ssl = None # type: ignore[assignment]
33
34 class BaseSSLError(BaseException): # type: ignore[no-redef]
35 pass
36
37
38from ._base_connection import _TYPE_BODY
39from ._base_connection import ProxyConfig as ProxyConfig
40from ._base_connection import _ResponseOptions as _ResponseOptions
41from ._version import __version__
42from .exceptions import (
43 ConnectTimeoutError,
44 HeaderParsingError,
45 NameResolutionError,
46 NewConnectionError,
47 ProxyError,
48 SystemTimeWarning,
49)
50from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection, ssl_
51from .util.request import body_to_chunks
52from .util.ssl_ import assert_fingerprint as _assert_fingerprint
53from .util.ssl_ import (
54 create_urllib3_context,
55 is_ipaddress,
56 resolve_cert_reqs,
57 resolve_ssl_version,
58 ssl_wrap_socket,
59)
60from .util.ssl_match_hostname import CertificateError, match_hostname
61from .util.url import Url
62
63# Not a no-op, we're adding this to the namespace so it can be imported.
64ConnectionError = ConnectionError
65BrokenPipeError = BrokenPipeError
66
67
68log = logging.getLogger(__name__)
69
70port_by_scheme = {"http": 80, "https": 443}
71
72# When it comes time to update this value as a part of regular maintenance
73# (ie test_recent_date is failing) update it to ~6 months before the current date.
74RECENT_DATE = datetime.date(2023, 6, 1)
75
76_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
77
78_HAS_SYS_AUDIT = hasattr(sys, "audit")
79
80
81class HTTPConnection(_HTTPConnection):
82 """
83 Based on :class:`http.client.HTTPConnection` but provides an extra constructor
84 backwards-compatibility layer between older and newer Pythons.
85
86 Additional keyword parameters are used to configure attributes of the connection.
87 Accepted parameters include:
88
89 - ``source_address``: Set the source address for the current connection.
90 - ``socket_options``: Set specific options on the underlying socket. If not specified, then
91 defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling
92 Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy.
93
94 For example, if you wish to enable TCP Keep Alive in addition to the defaults,
95 you might pass:
96
97 .. code-block:: python
98
99 HTTPConnection.default_socket_options + [
100 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
101 ]
102
103 Or you may want to disable the defaults by passing an empty list (e.g., ``[]``).
104 """
105
106 default_port: typing.ClassVar[int] = port_by_scheme["http"] # type: ignore[misc]
107
108 #: Disable Nagle's algorithm by default.
109 #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]``
110 default_socket_options: typing.ClassVar[connection._TYPE_SOCKET_OPTIONS] = [
111 (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
112 ]
113
114 #: Whether this connection verifies the host's certificate.
115 is_verified: bool = False
116
117 #: Whether this proxy connection verified the proxy host's certificate.
118 # If no proxy is currently connected to the value will be ``None``.
119 proxy_is_verified: bool | None = None
120
121 blocksize: int
122 source_address: tuple[str, int] | None
123 socket_options: connection._TYPE_SOCKET_OPTIONS | None
124
125 _has_connected_to_proxy: bool
126 _response_options: _ResponseOptions | None
127 _tunnel_host: str | None
128 _tunnel_port: int | None
129 _tunnel_scheme: str | None
130
131 def __init__(
132 self,
133 host: str,
134 port: int | None = None,
135 *,
136 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT,
137 source_address: tuple[str, int] | None = None,
138 blocksize: int = 16384,
139 socket_options: None
140 | (connection._TYPE_SOCKET_OPTIONS) = default_socket_options,
141 proxy: Url | None = None,
142 proxy_config: ProxyConfig | None = None,
143 ) -> None:
144 super().__init__(
145 host=host,
146 port=port,
147 timeout=Timeout.resolve_default_timeout(timeout),
148 source_address=source_address,
149 blocksize=blocksize,
150 )
151 self.socket_options = socket_options
152 self.proxy = proxy
153 self.proxy_config = proxy_config
154
155 self._has_connected_to_proxy = False
156 self._response_options = None
157 self._tunnel_host: str | None = None
158 self._tunnel_port: int | None = None
159 self._tunnel_scheme: str | None = None
160
161 @property
162 def host(self) -> str:
163 """
164 Getter method to remove any trailing dots that indicate the hostname is an FQDN.
165
166 In general, SSL certificates don't include the trailing dot indicating a
167 fully-qualified domain name, and thus, they don't validate properly when
168 checked against a domain name that includes the dot. In addition, some
169 servers may not expect to receive the trailing dot when provided.
170
171 However, the hostname with trailing dot is critical to DNS resolution; doing a
172 lookup with the trailing dot will properly only resolve the appropriate FQDN,
173 whereas a lookup without a trailing dot will search the system's search domain
174 list. Thus, it's important to keep the original host around for use only in
175 those cases where it's appropriate (i.e., when doing DNS lookup to establish the
176 actual TCP connection across which we're going to send HTTP requests).
177 """
178 return self._dns_host.rstrip(".")
179
180 @host.setter
181 def host(self, value: str) -> None:
182 """
183 Setter for the `host` property.
184
185 We assume that only urllib3 uses the _dns_host attribute; httplib itself
186 only uses `host`, and it seems reasonable that other libraries follow suit.
187 """
188 self._dns_host = value
189
190 def _new_conn(self) -> socket.socket:
191 """Establish a socket connection and set nodelay settings on it.
192
193 :return: New socket connection.
194 """
195 try:
196 sock = connection.create_connection(
197 (self._dns_host, self.port),
198 self.timeout,
199 source_address=self.source_address,
200 socket_options=self.socket_options,
201 )
202 except socket.gaierror as e:
203 raise NameResolutionError(self.host, self, e) from e
204 except SocketTimeout as e:
205 raise ConnectTimeoutError(
206 self,
207 f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
208 ) from e
209
210 except OSError as e:
211 raise NewConnectionError(
212 self, f"Failed to establish a new connection: {e}"
213 ) from e
214
215 # Audit hooks are only available in Python 3.8+
216 if _HAS_SYS_AUDIT:
217 sys.audit("http.client.connect", self, self.host, self.port)
218
219 return sock
220
221 def set_tunnel(
222 self,
223 host: str,
224 port: int | None = None,
225 headers: typing.Mapping[str, str] | None = None,
226 scheme: str = "http",
227 ) -> None:
228 if scheme not in ("http", "https"):
229 raise ValueError(
230 f"Invalid proxy scheme for tunneling: {scheme!r}, must be either 'http' or 'https'"
231 )
232 super().set_tunnel(host, port=port, headers=headers)
233 self._tunnel_scheme = scheme
234
235 def connect(self) -> None:
236 self.sock = self._new_conn()
237 if self._tunnel_host:
238 # If we're tunneling it means we're connected to our proxy.
239 self._has_connected_to_proxy = True
240
241 # TODO: Fix tunnel so it doesn't depend on self.sock state.
242 self._tunnel() # type: ignore[attr-defined]
243
244 # If there's a proxy to be connected to we are fully connected.
245 # This is set twice (once above and here) due to forwarding proxies
246 # not using tunnelling.
247 self._has_connected_to_proxy = bool(self.proxy)
248
249 if self._has_connected_to_proxy:
250 self.proxy_is_verified = False
251
252 @property
253 def is_closed(self) -> bool:
254 return self.sock is None
255
256 @property
257 def is_connected(self) -> bool:
258 if self.sock is None:
259 return False
260 return not wait_for_read(self.sock, timeout=0.0)
261
262 @property
263 def has_connected_to_proxy(self) -> bool:
264 return self._has_connected_to_proxy
265
266 @property
267 def proxy_is_forwarding(self) -> bool:
268 """
269 Return True if a forwarding proxy is configured, else return False
270 """
271 return bool(self.proxy) and self._tunnel_host is None
272
273 def close(self) -> None:
274 try:
275 super().close()
276 finally:
277 # Reset all stateful properties so connection
278 # can be re-used without leaking prior configs.
279 self.sock = None
280 self.is_verified = False
281 self.proxy_is_verified = None
282 self._has_connected_to_proxy = False
283 self._response_options = None
284 self._tunnel_host = None
285 self._tunnel_port = None
286 self._tunnel_scheme = None
287
288 def putrequest(
289 self,
290 method: str,
291 url: str,
292 skip_host: bool = False,
293 skip_accept_encoding: bool = False,
294 ) -> None:
295 """"""
296 # Empty docstring because the indentation of CPython's implementation
297 # is broken but we don't want this method in our documentation.
298 match = _CONTAINS_CONTROL_CHAR_RE.search(method)
299 if match:
300 raise ValueError(
301 f"Method cannot contain non-token characters {method!r} (found at least {match.group()!r})"
302 )
303
304 return super().putrequest(
305 method, url, skip_host=skip_host, skip_accept_encoding=skip_accept_encoding
306 )
307
308 def putheader(self, header: str, *values: str) -> None: # type: ignore[override]
309 """"""
310 if not any(isinstance(v, str) and v == SKIP_HEADER for v in values):
311 super().putheader(header, *values)
312 elif to_str(header.lower()) not in SKIPPABLE_HEADERS:
313 skippable_headers = "', '".join(
314 [str.title(header) for header in sorted(SKIPPABLE_HEADERS)]
315 )
316 raise ValueError(
317 f"urllib3.util.SKIP_HEADER only supports '{skippable_headers}'"
318 )
319
320 # `request` method's signature intentionally violates LSP.
321 # urllib3's API is different from `http.client.HTTPConnection` and the subclassing is only incidental.
322 def request( # type: ignore[override]
323 self,
324 method: str,
325 url: str,
326 body: _TYPE_BODY | None = None,
327 headers: typing.Mapping[str, str] | None = None,
328 *,
329 chunked: bool = False,
330 preload_content: bool = True,
331 decode_content: bool = True,
332 enforce_content_length: bool = True,
333 ) -> None:
334 # Update the inner socket's timeout value to send the request.
335 # This only triggers if the connection is re-used.
336 if self.sock is not None:
337 self.sock.settimeout(self.timeout)
338
339 # Store these values to be fed into the HTTPResponse
340 # object later. TODO: Remove this in favor of a real
341 # HTTP lifecycle mechanism.
342
343 # We have to store these before we call .request()
344 # because sometimes we can still salvage a response
345 # off the wire even if we aren't able to completely
346 # send the request body.
347 self._response_options = _ResponseOptions(
348 request_method=method,
349 request_url=url,
350 preload_content=preload_content,
351 decode_content=decode_content,
352 enforce_content_length=enforce_content_length,
353 )
354
355 if headers is None:
356 headers = {}
357 header_keys = frozenset(to_str(k.lower()) for k in headers)
358 skip_accept_encoding = "accept-encoding" in header_keys
359 skip_host = "host" in header_keys
360 self.putrequest(
361 method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host
362 )
363
364 # Transform the body into an iterable of sendall()-able chunks
365 # and detect if an explicit Content-Length is doable.
366 chunks_and_cl = body_to_chunks(body, method=method, blocksize=self.blocksize)
367 chunks = chunks_and_cl.chunks
368 content_length = chunks_and_cl.content_length
369
370 # When chunked is explicit set to 'True' we respect that.
371 if chunked:
372 if "transfer-encoding" not in header_keys:
373 self.putheader("Transfer-Encoding", "chunked")
374 else:
375 # Detect whether a framing mechanism is already in use. If so
376 # we respect that value, otherwise we pick chunked vs content-length
377 # depending on the type of 'body'.
378 if "content-length" in header_keys:
379 chunked = False
380 elif "transfer-encoding" in header_keys:
381 chunked = True
382
383 # Otherwise we go off the recommendation of 'body_to_chunks()'.
384 else:
385 chunked = False
386 if content_length is None:
387 if chunks is not None:
388 chunked = True
389 self.putheader("Transfer-Encoding", "chunked")
390 else:
391 self.putheader("Content-Length", str(content_length))
392
393 # Now that framing headers are out of the way we send all the other headers.
394 if "user-agent" not in header_keys:
395 self.putheader("User-Agent", _get_default_user_agent())
396 for header, value in headers.items():
397 self.putheader(header, value)
398 self.endheaders()
399
400 # If we're given a body we start sending that in chunks.
401 if chunks is not None:
402 for chunk in chunks:
403 # Sending empty chunks isn't allowed for TE: chunked
404 # as it indicates the end of the body.
405 if not chunk:
406 continue
407 if isinstance(chunk, str):
408 chunk = chunk.encode("utf-8")
409 if chunked:
410 self.send(b"%x\r\n%b\r\n" % (len(chunk), chunk))
411 else:
412 self.send(chunk)
413
414 # Regardless of whether we have a body or not, if we're in
415 # chunked mode we want to send an explicit empty chunk.
416 if chunked:
417 self.send(b"0\r\n\r\n")
418
419 def request_chunked(
420 self,
421 method: str,
422 url: str,
423 body: _TYPE_BODY | None = None,
424 headers: typing.Mapping[str, str] | None = None,
425 ) -> None:
426 """
427 Alternative to the common request method, which sends the
428 body with chunked encoding and not as one block
429 """
430 warnings.warn(
431 "HTTPConnection.request_chunked() is deprecated and will be removed "
432 "in urllib3 v2.1.0. Instead use HTTPConnection.request(..., chunked=True).",
433 category=DeprecationWarning,
434 stacklevel=2,
435 )
436 self.request(method, url, body=body, headers=headers, chunked=True)
437
438 def getresponse( # type: ignore[override]
439 self,
440 ) -> HTTPResponse:
441 """
442 Get the response from the server.
443
444 If the HTTPConnection is in the correct state, returns an instance of HTTPResponse or of whatever object is returned by the response_class variable.
445
446 If a request has not been sent or if a previous response has not be handled, ResponseNotReady is raised. If the HTTP response indicates that the connection should be closed, then it will be closed before the response is returned. When the connection is closed, the underlying socket is closed.
447 """
448 # Raise the same error as http.client.HTTPConnection
449 if self._response_options is None:
450 raise ResponseNotReady()
451
452 # Reset this attribute for being used again.
453 resp_options = self._response_options
454 self._response_options = None
455
456 # Since the connection's timeout value may have been updated
457 # we need to set the timeout on the socket.
458 self.sock.settimeout(self.timeout)
459
460 # This is needed here to avoid circular import errors
461 from .response import HTTPResponse
462
463 # Get the response from http.client.HTTPConnection
464 httplib_response = super().getresponse()
465
466 try:
467 assert_header_parsing(httplib_response.msg)
468 except (HeaderParsingError, TypeError) as hpe:
469 log.warning(
470 "Failed to parse headers (url=%s): %s",
471 _url_from_connection(self, resp_options.request_url),
472 hpe,
473 exc_info=True,
474 )
475
476 headers = HTTPHeaderDict(httplib_response.msg.items())
477
478 response = HTTPResponse(
479 body=httplib_response,
480 headers=headers,
481 status=httplib_response.status,
482 version=httplib_response.version,
483 version_string=getattr(self, "_http_vsn_str", "HTTP/?"),
484 reason=httplib_response.reason,
485 preload_content=resp_options.preload_content,
486 decode_content=resp_options.decode_content,
487 original_response=httplib_response,
488 enforce_content_length=resp_options.enforce_content_length,
489 request_method=resp_options.request_method,
490 request_url=resp_options.request_url,
491 )
492 return response
493
494
495class HTTPSConnection(HTTPConnection):
496 """
497 Many of the parameters to this constructor are passed to the underlying SSL
498 socket by means of :py:func:`urllib3.util.ssl_wrap_socket`.
499 """
500
501 default_port = port_by_scheme["https"] # type: ignore[misc]
502
503 cert_reqs: int | str | None = None
504 ca_certs: str | None = None
505 ca_cert_dir: str | None = None
506 ca_cert_data: None | str | bytes = None
507 ssl_version: int | str | None = None
508 ssl_minimum_version: int | None = None
509 ssl_maximum_version: int | None = None
510 assert_fingerprint: str | None = None
511
512 def __init__(
513 self,
514 host: str,
515 port: int | None = None,
516 *,
517 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT,
518 source_address: tuple[str, int] | None = None,
519 blocksize: int = 16384,
520 socket_options: None
521 | (connection._TYPE_SOCKET_OPTIONS) = HTTPConnection.default_socket_options,
522 proxy: Url | None = None,
523 proxy_config: ProxyConfig | None = None,
524 cert_reqs: int | str | None = None,
525 assert_hostname: None | str | typing.Literal[False] = None,
526 assert_fingerprint: str | None = None,
527 server_hostname: str | None = None,
528 ssl_context: ssl.SSLContext | None = None,
529 ca_certs: str | None = None,
530 ca_cert_dir: str | None = None,
531 ca_cert_data: None | str | bytes = None,
532 ssl_minimum_version: int | None = None,
533 ssl_maximum_version: int | None = None,
534 ssl_version: int | str | None = None, # Deprecated
535 cert_file: str | None = None,
536 key_file: str | None = None,
537 key_password: str | None = None,
538 ) -> None:
539 super().__init__(
540 host,
541 port=port,
542 timeout=timeout,
543 source_address=source_address,
544 blocksize=blocksize,
545 socket_options=socket_options,
546 proxy=proxy,
547 proxy_config=proxy_config,
548 )
549
550 self.key_file = key_file
551 self.cert_file = cert_file
552 self.key_password = key_password
553 self.ssl_context = ssl_context
554 self.server_hostname = server_hostname
555 self.assert_hostname = assert_hostname
556 self.assert_fingerprint = assert_fingerprint
557 self.ssl_version = ssl_version
558 self.ssl_minimum_version = ssl_minimum_version
559 self.ssl_maximum_version = ssl_maximum_version
560 self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
561 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
562 self.ca_cert_data = ca_cert_data
563
564 # cert_reqs depends on ssl_context so calculate last.
565 if cert_reqs is None:
566 if self.ssl_context is not None:
567 cert_reqs = self.ssl_context.verify_mode
568 else:
569 cert_reqs = resolve_cert_reqs(None)
570 self.cert_reqs = cert_reqs
571
572 def set_cert(
573 self,
574 key_file: str | None = None,
575 cert_file: str | None = None,
576 cert_reqs: int | str | None = None,
577 key_password: str | None = None,
578 ca_certs: str | None = None,
579 assert_hostname: None | str | typing.Literal[False] = None,
580 assert_fingerprint: str | None = None,
581 ca_cert_dir: str | None = None,
582 ca_cert_data: None | str | bytes = None,
583 ) -> None:
584 """
585 This method should only be called once, before the connection is used.
586 """
587 warnings.warn(
588 "HTTPSConnection.set_cert() is deprecated and will be removed "
589 "in urllib3 v2.1.0. Instead provide the parameters to the "
590 "HTTPSConnection constructor.",
591 category=DeprecationWarning,
592 stacklevel=2,
593 )
594
595 # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also
596 # have an SSLContext object in which case we'll use its verify_mode.
597 if cert_reqs is None:
598 if self.ssl_context is not None:
599 cert_reqs = self.ssl_context.verify_mode
600 else:
601 cert_reqs = resolve_cert_reqs(None)
602
603 self.key_file = key_file
604 self.cert_file = cert_file
605 self.cert_reqs = cert_reqs
606 self.key_password = key_password
607 self.assert_hostname = assert_hostname
608 self.assert_fingerprint = assert_fingerprint
609 self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
610 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
611 self.ca_cert_data = ca_cert_data
612
613 def connect(self) -> None:
614 sock: socket.socket | ssl.SSLSocket
615 self.sock = sock = self._new_conn()
616 server_hostname: str = self.host
617 tls_in_tls = False
618
619 # Do we need to establish a tunnel?
620 if self._tunnel_host is not None:
621 # We're tunneling to an HTTPS origin so need to do TLS-in-TLS.
622 if self._tunnel_scheme == "https":
623 # _connect_tls_proxy will verify and assign proxy_is_verified
624 self.sock = sock = self._connect_tls_proxy(self.host, sock)
625 tls_in_tls = True
626 elif self._tunnel_scheme == "http":
627 self.proxy_is_verified = False
628
629 # If we're tunneling it means we're connected to our proxy.
630 self._has_connected_to_proxy = True
631
632 self._tunnel() # type: ignore[attr-defined]
633 # Override the host with the one we're requesting data from.
634 server_hostname = self._tunnel_host
635
636 if self.server_hostname is not None:
637 server_hostname = self.server_hostname
638
639 is_time_off = datetime.date.today() < RECENT_DATE
640 if is_time_off:
641 warnings.warn(
642 (
643 f"System time is way off (before {RECENT_DATE}). This will probably "
644 "lead to SSL verification errors"
645 ),
646 SystemTimeWarning,
647 )
648
649 # Remove trailing '.' from fqdn hostnames to allow certificate validation
650 server_hostname_rm_dot = server_hostname.rstrip(".")
651
652 sock_and_verified = _ssl_wrap_socket_and_match_hostname(
653 sock=sock,
654 cert_reqs=self.cert_reqs,
655 ssl_version=self.ssl_version,
656 ssl_minimum_version=self.ssl_minimum_version,
657 ssl_maximum_version=self.ssl_maximum_version,
658 ca_certs=self.ca_certs,
659 ca_cert_dir=self.ca_cert_dir,
660 ca_cert_data=self.ca_cert_data,
661 cert_file=self.cert_file,
662 key_file=self.key_file,
663 key_password=self.key_password,
664 server_hostname=server_hostname_rm_dot,
665 ssl_context=self.ssl_context,
666 tls_in_tls=tls_in_tls,
667 assert_hostname=self.assert_hostname,
668 assert_fingerprint=self.assert_fingerprint,
669 )
670 self.sock = sock_and_verified.socket
671
672 # Forwarding proxies can never have a verified target since
673 # the proxy is the one doing the verification. Should instead
674 # use a CONNECT tunnel in order to verify the target.
675 # See: https://github.com/urllib3/urllib3/issues/3267.
676 if self.proxy_is_forwarding:
677 self.is_verified = False
678 else:
679 self.is_verified = sock_and_verified.is_verified
680
681 # If there's a proxy to be connected to we are fully connected.
682 # This is set twice (once above and here) due to forwarding proxies
683 # not using tunnelling.
684 self._has_connected_to_proxy = bool(self.proxy)
685
686 # Set `self.proxy_is_verified` unless it's already set while
687 # establishing a tunnel.
688 if self._has_connected_to_proxy and self.proxy_is_verified is None:
689 self.proxy_is_verified = sock_and_verified.is_verified
690
691 def _connect_tls_proxy(self, hostname: str, sock: socket.socket) -> ssl.SSLSocket:
692 """
693 Establish a TLS connection to the proxy using the provided SSL context.
694 """
695 # `_connect_tls_proxy` is called when self._tunnel_host is truthy.
696 proxy_config = typing.cast(ProxyConfig, self.proxy_config)
697 ssl_context = proxy_config.ssl_context
698 sock_and_verified = _ssl_wrap_socket_and_match_hostname(
699 sock,
700 cert_reqs=self.cert_reqs,
701 ssl_version=self.ssl_version,
702 ssl_minimum_version=self.ssl_minimum_version,
703 ssl_maximum_version=self.ssl_maximum_version,
704 ca_certs=self.ca_certs,
705 ca_cert_dir=self.ca_cert_dir,
706 ca_cert_data=self.ca_cert_data,
707 server_hostname=hostname,
708 ssl_context=ssl_context,
709 assert_hostname=proxy_config.assert_hostname,
710 assert_fingerprint=proxy_config.assert_fingerprint,
711 # Features that aren't implemented for proxies yet:
712 cert_file=None,
713 key_file=None,
714 key_password=None,
715 tls_in_tls=False,
716 )
717 self.proxy_is_verified = sock_and_verified.is_verified
718 return sock_and_verified.socket # type: ignore[return-value]
719
720
721class _WrappedAndVerifiedSocket(typing.NamedTuple):
722 """
723 Wrapped socket and whether the connection is
724 verified after the TLS handshake
725 """
726
727 socket: ssl.SSLSocket | SSLTransport
728 is_verified: bool
729
730
731def _ssl_wrap_socket_and_match_hostname(
732 sock: socket.socket,
733 *,
734 cert_reqs: None | str | int,
735 ssl_version: None | str | int,
736 ssl_minimum_version: int | None,
737 ssl_maximum_version: int | None,
738 cert_file: str | None,
739 key_file: str | None,
740 key_password: str | None,
741 ca_certs: str | None,
742 ca_cert_dir: str | None,
743 ca_cert_data: None | str | bytes,
744 assert_hostname: None | str | typing.Literal[False],
745 assert_fingerprint: str | None,
746 server_hostname: str | None,
747 ssl_context: ssl.SSLContext | None,
748 tls_in_tls: bool = False,
749) -> _WrappedAndVerifiedSocket:
750 """Logic for constructing an SSLContext from all TLS parameters, passing
751 that down into ssl_wrap_socket, and then doing certificate verification
752 either via hostname or fingerprint. This function exists to guarantee
753 that both proxies and targets have the same behavior when connecting via TLS.
754 """
755 default_ssl_context = False
756 if ssl_context is None:
757 default_ssl_context = True
758 context = create_urllib3_context(
759 ssl_version=resolve_ssl_version(ssl_version),
760 ssl_minimum_version=ssl_minimum_version,
761 ssl_maximum_version=ssl_maximum_version,
762 cert_reqs=resolve_cert_reqs(cert_reqs),
763 )
764 else:
765 context = ssl_context
766
767 context.verify_mode = resolve_cert_reqs(cert_reqs)
768
769 # In some cases, we want to verify hostnames ourselves
770 if (
771 # `ssl` can't verify fingerprints or alternate hostnames
772 assert_fingerprint
773 or assert_hostname
774 # assert_hostname can be set to False to disable hostname checking
775 or assert_hostname is False
776 # We still support OpenSSL 1.0.2, which prevents us from verifying
777 # hostnames easily: https://github.com/pyca/pyopenssl/pull/933
778 or ssl_.IS_PYOPENSSL
779 or not ssl_.HAS_NEVER_CHECK_COMMON_NAME
780 ):
781 context.check_hostname = False
782
783 # Try to load OS default certs if none are given. We need to do the hasattr() check
784 # for custom pyOpenSSL SSLContext objects because they don't support
785 # load_default_certs().
786 if (
787 not ca_certs
788 and not ca_cert_dir
789 and not ca_cert_data
790 and default_ssl_context
791 and hasattr(context, "load_default_certs")
792 ):
793 context.load_default_certs()
794
795 # Ensure that IPv6 addresses are in the proper format and don't have a
796 # scope ID. Python's SSL module fails to recognize scoped IPv6 addresses
797 # and interprets them as DNS hostnames.
798 if server_hostname is not None:
799 normalized = server_hostname.strip("[]")
800 if "%" in normalized:
801 normalized = normalized[: normalized.rfind("%")]
802 if is_ipaddress(normalized):
803 server_hostname = normalized
804
805 ssl_sock = ssl_wrap_socket(
806 sock=sock,
807 keyfile=key_file,
808 certfile=cert_file,
809 key_password=key_password,
810 ca_certs=ca_certs,
811 ca_cert_dir=ca_cert_dir,
812 ca_cert_data=ca_cert_data,
813 server_hostname=server_hostname,
814 ssl_context=context,
815 tls_in_tls=tls_in_tls,
816 )
817
818 try:
819 if assert_fingerprint:
820 _assert_fingerprint(
821 ssl_sock.getpeercert(binary_form=True), assert_fingerprint
822 )
823 elif (
824 context.verify_mode != ssl.CERT_NONE
825 and not context.check_hostname
826 and assert_hostname is not False
827 ):
828 cert: _TYPE_PEER_CERT_RET_DICT = ssl_sock.getpeercert() # type: ignore[assignment]
829
830 # Need to signal to our match_hostname whether to use 'commonName' or not.
831 # If we're using our own constructed SSLContext we explicitly set 'False'
832 # because PyPy hard-codes 'True' from SSLContext.hostname_checks_common_name.
833 if default_ssl_context:
834 hostname_checks_common_name = False
835 else:
836 hostname_checks_common_name = (
837 getattr(context, "hostname_checks_common_name", False) or False
838 )
839
840 _match_hostname(
841 cert,
842 assert_hostname or server_hostname, # type: ignore[arg-type]
843 hostname_checks_common_name,
844 )
845
846 return _WrappedAndVerifiedSocket(
847 socket=ssl_sock,
848 is_verified=context.verify_mode == ssl.CERT_REQUIRED
849 or bool(assert_fingerprint),
850 )
851 except BaseException:
852 ssl_sock.close()
853 raise
854
855
856def _match_hostname(
857 cert: _TYPE_PEER_CERT_RET_DICT | None,
858 asserted_hostname: str,
859 hostname_checks_common_name: bool = False,
860) -> None:
861 # Our upstream implementation of ssl.match_hostname()
862 # only applies this normalization to IP addresses so it doesn't
863 # match DNS SANs so we do the same thing!
864 stripped_hostname = asserted_hostname.strip("[]")
865 if is_ipaddress(stripped_hostname):
866 asserted_hostname = stripped_hostname
867
868 try:
869 match_hostname(cert, asserted_hostname, hostname_checks_common_name)
870 except CertificateError as e:
871 log.warning(
872 "Certificate did not match expected hostname: %s. Certificate: %s",
873 asserted_hostname,
874 cert,
875 )
876 # Add cert to exception and reraise so client code can inspect
877 # the cert when catching the exception, if they want to
878 e._peer_cert = cert # type: ignore[attr-defined]
879 raise
880
881
882def _wrap_proxy_error(err: Exception, proxy_scheme: str | None) -> ProxyError:
883 # Look for the phrase 'wrong version number', if found
884 # then we should warn the user that we're very sure that
885 # this proxy is HTTP-only and they have a configuration issue.
886 error_normalized = " ".join(re.split("[^a-z]", str(err).lower()))
887 is_likely_http_proxy = (
888 "wrong version number" in error_normalized
889 or "unknown protocol" in error_normalized
890 or "record layer failure" in error_normalized
891 )
892 http_proxy_warning = (
893 ". Your proxy appears to only use HTTP and not HTTPS, "
894 "try changing your proxy URL to be HTTP. See: "
895 "https://urllib3.readthedocs.io/en/latest/advanced-usage.html"
896 "#https-proxy-error-http-proxy"
897 )
898 new_err = ProxyError(
899 f"Unable to connect to proxy"
900 f"{http_proxy_warning if is_likely_http_proxy and proxy_scheme == 'https' else ''}",
901 err,
902 )
903 new_err.__cause__ = err
904 return new_err
905
906
907def _get_default_user_agent() -> str:
908 return f"python-urllib3/{__version__}"
909
910
911class DummyConnection:
912 """Used to detect a failed ConnectionCls import."""
913
914
915if not ssl:
916 HTTPSConnection = DummyConnection # type: ignore[misc, assignment] # noqa: F811
917
918
919VerifiedHTTPSConnection = HTTPSConnection
920
921
922def _url_from_connection(
923 conn: HTTPConnection | HTTPSConnection, path: str | None = None
924) -> str:
925 """Returns the URL from a given connection. This is mainly used for testing and logging."""
926
927 scheme = "https" if isinstance(conn, HTTPSConnection) else "http"
928
929 return Url(scheme=scheme, host=conn.host, port=conn.port, path=path).url