Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/connection.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import datetime
4import logging
5import os
6import re
7import socket
8import sys
9import typing
10import warnings
11from http.client import HTTPConnection as _HTTPConnection
12from http.client import HTTPException as HTTPException # noqa: F401
13from http.client import ResponseNotReady
14from socket import timeout as SocketTimeout
16if typing.TYPE_CHECKING:
17 from .response import HTTPResponse
18 from .util.ssl_ import _TYPE_PEER_CERT_RET_DICT
19 from .util.ssltransport import SSLTransport
21from ._collections import HTTPHeaderDict
22from .util.response import assert_header_parsing
23from .util.timeout import _DEFAULT_TIMEOUT, _TYPE_TIMEOUT, Timeout
24from .util.util import to_str
25from .util.wait import wait_for_read
27try: # Compiled with SSL?
28 import ssl
30 BaseSSLError = ssl.SSLError
31except (ImportError, AttributeError):
32 ssl = None # type: ignore[assignment]
34 class BaseSSLError(BaseException): # type: ignore[no-redef]
35 pass
38from ._base_connection import _TYPE_BODY
39from ._base_connection import ProxyConfig as ProxyConfig
40from ._base_connection import _ResponseOptions as _ResponseOptions
41from ._version import __version__
42from .exceptions import (
43 ConnectTimeoutError,
44 HeaderParsingError,
45 NameResolutionError,
46 NewConnectionError,
47 ProxyError,
48 SystemTimeWarning,
49)
50from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection, ssl_
51from .util.request import body_to_chunks
52from .util.ssl_ import assert_fingerprint as _assert_fingerprint
53from .util.ssl_ import (
54 create_urllib3_context,
55 is_ipaddress,
56 resolve_cert_reqs,
57 resolve_ssl_version,
58 ssl_wrap_socket,
59)
60from .util.ssl_match_hostname import CertificateError, match_hostname
61from .util.url import Url
63# Not a no-op, we're adding this to the namespace so it can be imported.
64ConnectionError = ConnectionError
65BrokenPipeError = BrokenPipeError
68log = logging.getLogger(__name__)
70port_by_scheme = {"http": 80, "https": 443}
72# When it comes time to update this value as a part of regular maintenance
73# (ie test_recent_date is failing) update it to ~6 months before the current date.
74RECENT_DATE = datetime.date(2023, 6, 1)
76_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
78_HAS_SYS_AUDIT = hasattr(sys, "audit")
81class HTTPConnection(_HTTPConnection):
82 """
83 Based on :class:`http.client.HTTPConnection` but provides an extra constructor
84 backwards-compatibility layer between older and newer Pythons.
86 Additional keyword parameters are used to configure attributes of the connection.
87 Accepted parameters include:
89 - ``source_address``: Set the source address for the current connection.
90 - ``socket_options``: Set specific options on the underlying socket. If not specified, then
91 defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling
92 Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy.
94 For example, if you wish to enable TCP Keep Alive in addition to the defaults,
95 you might pass:
97 .. code-block:: python
99 HTTPConnection.default_socket_options + [
100 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
101 ]
103 Or you may want to disable the defaults by passing an empty list (e.g., ``[]``).
104 """
106 default_port: typing.ClassVar[int] = port_by_scheme["http"] # type: ignore[misc]
108 #: Disable Nagle's algorithm by default.
109 #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]``
110 default_socket_options: typing.ClassVar[connection._TYPE_SOCKET_OPTIONS] = [
111 (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
112 ]
114 #: Whether this connection verifies the host's certificate.
115 is_verified: bool = False
117 #: Whether this proxy connection verified the proxy host's certificate.
118 # If no proxy is currently connected to the value will be ``None``.
119 proxy_is_verified: bool | None = None
121 blocksize: int
122 source_address: tuple[str, int] | None
123 socket_options: connection._TYPE_SOCKET_OPTIONS | None
125 _has_connected_to_proxy: bool
126 _response_options: _ResponseOptions | None
127 _tunnel_host: str | None
128 _tunnel_port: int | None
129 _tunnel_scheme: str | None
131 def __init__(
132 self,
133 host: str,
134 port: int | None = None,
135 *,
136 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT,
137 source_address: tuple[str, int] | None = None,
138 blocksize: int = 16384,
139 socket_options: None
140 | (connection._TYPE_SOCKET_OPTIONS) = default_socket_options,
141 proxy: Url | None = None,
142 proxy_config: ProxyConfig | None = None,
143 ) -> None:
144 super().__init__(
145 host=host,
146 port=port,
147 timeout=Timeout.resolve_default_timeout(timeout),
148 source_address=source_address,
149 blocksize=blocksize,
150 )
151 self.socket_options = socket_options
152 self.proxy = proxy
153 self.proxy_config = proxy_config
155 self._has_connected_to_proxy = False
156 self._response_options = None
157 self._tunnel_host: str | None = None
158 self._tunnel_port: int | None = None
159 self._tunnel_scheme: str | None = None
161 @property
162 def host(self) -> str:
163 """
164 Getter method to remove any trailing dots that indicate the hostname is an FQDN.
166 In general, SSL certificates don't include the trailing dot indicating a
167 fully-qualified domain name, and thus, they don't validate properly when
168 checked against a domain name that includes the dot. In addition, some
169 servers may not expect to receive the trailing dot when provided.
171 However, the hostname with trailing dot is critical to DNS resolution; doing a
172 lookup with the trailing dot will properly only resolve the appropriate FQDN,
173 whereas a lookup without a trailing dot will search the system's search domain
174 list. Thus, it's important to keep the original host around for use only in
175 those cases where it's appropriate (i.e., when doing DNS lookup to establish the
176 actual TCP connection across which we're going to send HTTP requests).
177 """
178 return self._dns_host.rstrip(".")
180 @host.setter
181 def host(self, value: str) -> None:
182 """
183 Setter for the `host` property.
185 We assume that only urllib3 uses the _dns_host attribute; httplib itself
186 only uses `host`, and it seems reasonable that other libraries follow suit.
187 """
188 self._dns_host = value
190 def _new_conn(self) -> socket.socket:
191 """Establish a socket connection and set nodelay settings on it.
193 :return: New socket connection.
194 """
195 try:
196 sock = connection.create_connection(
197 (self._dns_host, self.port),
198 self.timeout,
199 source_address=self.source_address,
200 socket_options=self.socket_options,
201 )
202 except socket.gaierror as e:
203 raise NameResolutionError(self.host, self, e) from e
204 except SocketTimeout as e:
205 raise ConnectTimeoutError(
206 self,
207 f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
208 ) from e
210 except OSError as e:
211 raise NewConnectionError(
212 self, f"Failed to establish a new connection: {e}"
213 ) from e
215 # Audit hooks are only available in Python 3.8+
216 if _HAS_SYS_AUDIT:
217 sys.audit("http.client.connect", self, self.host, self.port)
219 return sock
221 def set_tunnel(
222 self,
223 host: str,
224 port: int | None = None,
225 headers: typing.Mapping[str, str] | None = None,
226 scheme: str = "http",
227 ) -> None:
228 if scheme not in ("http", "https"):
229 raise ValueError(
230 f"Invalid proxy scheme for tunneling: {scheme!r}, must be either 'http' or 'https'"
231 )
232 super().set_tunnel(host, port=port, headers=headers)
233 self._tunnel_scheme = scheme
235 def connect(self) -> None:
236 self.sock = self._new_conn()
237 if self._tunnel_host:
238 # If we're tunneling it means we're connected to our proxy.
239 self._has_connected_to_proxy = True
241 # TODO: Fix tunnel so it doesn't depend on self.sock state.
242 self._tunnel() # type: ignore[attr-defined]
244 # If there's a proxy to be connected to we are fully connected.
245 # This is set twice (once above and here) due to forwarding proxies
246 # not using tunnelling.
247 self._has_connected_to_proxy = bool(self.proxy)
249 if self._has_connected_to_proxy:
250 self.proxy_is_verified = False
252 @property
253 def is_closed(self) -> bool:
254 return self.sock is None
256 @property
257 def is_connected(self) -> bool:
258 if self.sock is None:
259 return False
260 return not wait_for_read(self.sock, timeout=0.0)
262 @property
263 def has_connected_to_proxy(self) -> bool:
264 return self._has_connected_to_proxy
266 @property
267 def proxy_is_forwarding(self) -> bool:
268 """
269 Return True if a forwarding proxy is configured, else return False
270 """
271 return bool(self.proxy) and self._tunnel_host is None
273 def close(self) -> None:
274 try:
275 super().close()
276 finally:
277 # Reset all stateful properties so connection
278 # can be re-used without leaking prior configs.
279 self.sock = None
280 self.is_verified = False
281 self.proxy_is_verified = None
282 self._has_connected_to_proxy = False
283 self._response_options = None
284 self._tunnel_host = None
285 self._tunnel_port = None
286 self._tunnel_scheme = None
288 def putrequest(
289 self,
290 method: str,
291 url: str,
292 skip_host: bool = False,
293 skip_accept_encoding: bool = False,
294 ) -> None:
295 """"""
296 # Empty docstring because the indentation of CPython's implementation
297 # is broken but we don't want this method in our documentation.
298 match = _CONTAINS_CONTROL_CHAR_RE.search(method)
299 if match:
300 raise ValueError(
301 f"Method cannot contain non-token characters {method!r} (found at least {match.group()!r})"
302 )
304 return super().putrequest(
305 method, url, skip_host=skip_host, skip_accept_encoding=skip_accept_encoding
306 )
308 def putheader(self, header: str, *values: str) -> None: # type: ignore[override]
309 """"""
310 if not any(isinstance(v, str) and v == SKIP_HEADER for v in values):
311 super().putheader(header, *values)
312 elif to_str(header.lower()) not in SKIPPABLE_HEADERS:
313 skippable_headers = "', '".join(
314 [str.title(header) for header in sorted(SKIPPABLE_HEADERS)]
315 )
316 raise ValueError(
317 f"urllib3.util.SKIP_HEADER only supports '{skippable_headers}'"
318 )
320 # `request` method's signature intentionally violates LSP.
321 # urllib3's API is different from `http.client.HTTPConnection` and the subclassing is only incidental.
322 def request( # type: ignore[override]
323 self,
324 method: str,
325 url: str,
326 body: _TYPE_BODY | None = None,
327 headers: typing.Mapping[str, str] | None = None,
328 *,
329 chunked: bool = False,
330 preload_content: bool = True,
331 decode_content: bool = True,
332 enforce_content_length: bool = True,
333 ) -> None:
334 # Update the inner socket's timeout value to send the request.
335 # This only triggers if the connection is re-used.
336 if self.sock is not None:
337 self.sock.settimeout(self.timeout)
339 # Store these values to be fed into the HTTPResponse
340 # object later. TODO: Remove this in favor of a real
341 # HTTP lifecycle mechanism.
343 # We have to store these before we call .request()
344 # because sometimes we can still salvage a response
345 # off the wire even if we aren't able to completely
346 # send the request body.
347 self._response_options = _ResponseOptions(
348 request_method=method,
349 request_url=url,
350 preload_content=preload_content,
351 decode_content=decode_content,
352 enforce_content_length=enforce_content_length,
353 )
355 if headers is None:
356 headers = {}
357 header_keys = frozenset(to_str(k.lower()) for k in headers)
358 skip_accept_encoding = "accept-encoding" in header_keys
359 skip_host = "host" in header_keys
360 self.putrequest(
361 method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host
362 )
364 # Transform the body into an iterable of sendall()-able chunks
365 # and detect if an explicit Content-Length is doable.
366 chunks_and_cl = body_to_chunks(body, method=method, blocksize=self.blocksize)
367 chunks = chunks_and_cl.chunks
368 content_length = chunks_and_cl.content_length
370 # When chunked is explicit set to 'True' we respect that.
371 if chunked:
372 if "transfer-encoding" not in header_keys:
373 self.putheader("Transfer-Encoding", "chunked")
374 else:
375 # Detect whether a framing mechanism is already in use. If so
376 # we respect that value, otherwise we pick chunked vs content-length
377 # depending on the type of 'body'.
378 if "content-length" in header_keys:
379 chunked = False
380 elif "transfer-encoding" in header_keys:
381 chunked = True
383 # Otherwise we go off the recommendation of 'body_to_chunks()'.
384 else:
385 chunked = False
386 if content_length is None:
387 if chunks is not None:
388 chunked = True
389 self.putheader("Transfer-Encoding", "chunked")
390 else:
391 self.putheader("Content-Length", str(content_length))
393 # Now that framing headers are out of the way we send all the other headers.
394 if "user-agent" not in header_keys:
395 self.putheader("User-Agent", _get_default_user_agent())
396 for header, value in headers.items():
397 self.putheader(header, value)
398 self.endheaders()
400 # If we're given a body we start sending that in chunks.
401 if chunks is not None:
402 for chunk in chunks:
403 # Sending empty chunks isn't allowed for TE: chunked
404 # as it indicates the end of the body.
405 if not chunk:
406 continue
407 if isinstance(chunk, str):
408 chunk = chunk.encode("utf-8")
409 if chunked:
410 self.send(b"%x\r\n%b\r\n" % (len(chunk), chunk))
411 else:
412 self.send(chunk)
414 # Regardless of whether we have a body or not, if we're in
415 # chunked mode we want to send an explicit empty chunk.
416 if chunked:
417 self.send(b"0\r\n\r\n")
419 def request_chunked(
420 self,
421 method: str,
422 url: str,
423 body: _TYPE_BODY | None = None,
424 headers: typing.Mapping[str, str] | None = None,
425 ) -> None:
426 """
427 Alternative to the common request method, which sends the
428 body with chunked encoding and not as one block
429 """
430 warnings.warn(
431 "HTTPConnection.request_chunked() is deprecated and will be removed "
432 "in urllib3 v2.1.0. Instead use HTTPConnection.request(..., chunked=True).",
433 category=DeprecationWarning,
434 stacklevel=2,
435 )
436 self.request(method, url, body=body, headers=headers, chunked=True)
438 def getresponse( # type: ignore[override]
439 self,
440 ) -> HTTPResponse:
441 """
442 Get the response from the server.
444 If the HTTPConnection is in the correct state, returns an instance of HTTPResponse or of whatever object is returned by the response_class variable.
446 If a request has not been sent or if a previous response has not be handled, ResponseNotReady is raised. If the HTTP response indicates that the connection should be closed, then it will be closed before the response is returned. When the connection is closed, the underlying socket is closed.
447 """
448 # Raise the same error as http.client.HTTPConnection
449 if self._response_options is None:
450 raise ResponseNotReady()
452 # Reset this attribute for being used again.
453 resp_options = self._response_options
454 self._response_options = None
456 # Since the connection's timeout value may have been updated
457 # we need to set the timeout on the socket.
458 self.sock.settimeout(self.timeout)
460 # This is needed here to avoid circular import errors
461 from .response import HTTPResponse
463 # Get the response from http.client.HTTPConnection
464 httplib_response = super().getresponse()
466 try:
467 assert_header_parsing(httplib_response.msg)
468 except (HeaderParsingError, TypeError) as hpe:
469 log.warning(
470 "Failed to parse headers (url=%s): %s",
471 _url_from_connection(self, resp_options.request_url),
472 hpe,
473 exc_info=True,
474 )
476 headers = HTTPHeaderDict(httplib_response.msg.items())
478 response = HTTPResponse(
479 body=httplib_response,
480 headers=headers,
481 status=httplib_response.status,
482 version=httplib_response.version,
483 version_string=getattr(self, "_http_vsn_str", "HTTP/?"),
484 reason=httplib_response.reason,
485 preload_content=resp_options.preload_content,
486 decode_content=resp_options.decode_content,
487 original_response=httplib_response,
488 enforce_content_length=resp_options.enforce_content_length,
489 request_method=resp_options.request_method,
490 request_url=resp_options.request_url,
491 )
492 return response
495class HTTPSConnection(HTTPConnection):
496 """
497 Many of the parameters to this constructor are passed to the underlying SSL
498 socket by means of :py:func:`urllib3.util.ssl_wrap_socket`.
499 """
501 default_port = port_by_scheme["https"] # type: ignore[misc]
503 cert_reqs: int | str | None = None
504 ca_certs: str | None = None
505 ca_cert_dir: str | None = None
506 ca_cert_data: None | str | bytes = None
507 ssl_version: int | str | None = None
508 ssl_minimum_version: int | None = None
509 ssl_maximum_version: int | None = None
510 assert_fingerprint: str | None = None
512 def __init__(
513 self,
514 host: str,
515 port: int | None = None,
516 *,
517 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT,
518 source_address: tuple[str, int] | None = None,
519 blocksize: int = 16384,
520 socket_options: None
521 | (connection._TYPE_SOCKET_OPTIONS) = HTTPConnection.default_socket_options,
522 proxy: Url | None = None,
523 proxy_config: ProxyConfig | None = None,
524 cert_reqs: int | str | None = None,
525 assert_hostname: None | str | typing.Literal[False] = None,
526 assert_fingerprint: str | None = None,
527 server_hostname: str | None = None,
528 ssl_context: ssl.SSLContext | None = None,
529 ca_certs: str | None = None,
530 ca_cert_dir: str | None = None,
531 ca_cert_data: None | str | bytes = None,
532 ssl_minimum_version: int | None = None,
533 ssl_maximum_version: int | None = None,
534 ssl_version: int | str | None = None, # Deprecated
535 cert_file: str | None = None,
536 key_file: str | None = None,
537 key_password: str | None = None,
538 ) -> None:
539 super().__init__(
540 host,
541 port=port,
542 timeout=timeout,
543 source_address=source_address,
544 blocksize=blocksize,
545 socket_options=socket_options,
546 proxy=proxy,
547 proxy_config=proxy_config,
548 )
550 self.key_file = key_file
551 self.cert_file = cert_file
552 self.key_password = key_password
553 self.ssl_context = ssl_context
554 self.server_hostname = server_hostname
555 self.assert_hostname = assert_hostname
556 self.assert_fingerprint = assert_fingerprint
557 self.ssl_version = ssl_version
558 self.ssl_minimum_version = ssl_minimum_version
559 self.ssl_maximum_version = ssl_maximum_version
560 self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
561 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
562 self.ca_cert_data = ca_cert_data
564 # cert_reqs depends on ssl_context so calculate last.
565 if cert_reqs is None:
566 if self.ssl_context is not None:
567 cert_reqs = self.ssl_context.verify_mode
568 else:
569 cert_reqs = resolve_cert_reqs(None)
570 self.cert_reqs = cert_reqs
572 def set_cert(
573 self,
574 key_file: str | None = None,
575 cert_file: str | None = None,
576 cert_reqs: int | str | None = None,
577 key_password: str | None = None,
578 ca_certs: str | None = None,
579 assert_hostname: None | str | typing.Literal[False] = None,
580 assert_fingerprint: str | None = None,
581 ca_cert_dir: str | None = None,
582 ca_cert_data: None | str | bytes = None,
583 ) -> None:
584 """
585 This method should only be called once, before the connection is used.
586 """
587 warnings.warn(
588 "HTTPSConnection.set_cert() is deprecated and will be removed "
589 "in urllib3 v2.1.0. Instead provide the parameters to the "
590 "HTTPSConnection constructor.",
591 category=DeprecationWarning,
592 stacklevel=2,
593 )
595 # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also
596 # have an SSLContext object in which case we'll use its verify_mode.
597 if cert_reqs is None:
598 if self.ssl_context is not None:
599 cert_reqs = self.ssl_context.verify_mode
600 else:
601 cert_reqs = resolve_cert_reqs(None)
603 self.key_file = key_file
604 self.cert_file = cert_file
605 self.cert_reqs = cert_reqs
606 self.key_password = key_password
607 self.assert_hostname = assert_hostname
608 self.assert_fingerprint = assert_fingerprint
609 self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
610 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
611 self.ca_cert_data = ca_cert_data
613 def connect(self) -> None:
614 sock: socket.socket | ssl.SSLSocket
615 self.sock = sock = self._new_conn()
616 server_hostname: str = self.host
617 tls_in_tls = False
619 # Do we need to establish a tunnel?
620 if self._tunnel_host is not None:
621 # We're tunneling to an HTTPS origin so need to do TLS-in-TLS.
622 if self._tunnel_scheme == "https":
623 # _connect_tls_proxy will verify and assign proxy_is_verified
624 self.sock = sock = self._connect_tls_proxy(self.host, sock)
625 tls_in_tls = True
626 elif self._tunnel_scheme == "http":
627 self.proxy_is_verified = False
629 # If we're tunneling it means we're connected to our proxy.
630 self._has_connected_to_proxy = True
632 self._tunnel() # type: ignore[attr-defined]
633 # Override the host with the one we're requesting data from.
634 server_hostname = self._tunnel_host
636 if self.server_hostname is not None:
637 server_hostname = self.server_hostname
639 is_time_off = datetime.date.today() < RECENT_DATE
640 if is_time_off:
641 warnings.warn(
642 (
643 f"System time is way off (before {RECENT_DATE}). This will probably "
644 "lead to SSL verification errors"
645 ),
646 SystemTimeWarning,
647 )
649 # Remove trailing '.' from fqdn hostnames to allow certificate validation
650 server_hostname_rm_dot = server_hostname.rstrip(".")
652 sock_and_verified = _ssl_wrap_socket_and_match_hostname(
653 sock=sock,
654 cert_reqs=self.cert_reqs,
655 ssl_version=self.ssl_version,
656 ssl_minimum_version=self.ssl_minimum_version,
657 ssl_maximum_version=self.ssl_maximum_version,
658 ca_certs=self.ca_certs,
659 ca_cert_dir=self.ca_cert_dir,
660 ca_cert_data=self.ca_cert_data,
661 cert_file=self.cert_file,
662 key_file=self.key_file,
663 key_password=self.key_password,
664 server_hostname=server_hostname_rm_dot,
665 ssl_context=self.ssl_context,
666 tls_in_tls=tls_in_tls,
667 assert_hostname=self.assert_hostname,
668 assert_fingerprint=self.assert_fingerprint,
669 )
670 self.sock = sock_and_verified.socket
672 # Forwarding proxies can never have a verified target since
673 # the proxy is the one doing the verification. Should instead
674 # use a CONNECT tunnel in order to verify the target.
675 # See: https://github.com/urllib3/urllib3/issues/3267.
676 if self.proxy_is_forwarding:
677 self.is_verified = False
678 else:
679 self.is_verified = sock_and_verified.is_verified
681 # If there's a proxy to be connected to we are fully connected.
682 # This is set twice (once above and here) due to forwarding proxies
683 # not using tunnelling.
684 self._has_connected_to_proxy = bool(self.proxy)
686 # Set `self.proxy_is_verified` unless it's already set while
687 # establishing a tunnel.
688 if self._has_connected_to_proxy and self.proxy_is_verified is None:
689 self.proxy_is_verified = sock_and_verified.is_verified
691 def _connect_tls_proxy(self, hostname: str, sock: socket.socket) -> ssl.SSLSocket:
692 """
693 Establish a TLS connection to the proxy using the provided SSL context.
694 """
695 # `_connect_tls_proxy` is called when self._tunnel_host is truthy.
696 proxy_config = typing.cast(ProxyConfig, self.proxy_config)
697 ssl_context = proxy_config.ssl_context
698 sock_and_verified = _ssl_wrap_socket_and_match_hostname(
699 sock,
700 cert_reqs=self.cert_reqs,
701 ssl_version=self.ssl_version,
702 ssl_minimum_version=self.ssl_minimum_version,
703 ssl_maximum_version=self.ssl_maximum_version,
704 ca_certs=self.ca_certs,
705 ca_cert_dir=self.ca_cert_dir,
706 ca_cert_data=self.ca_cert_data,
707 server_hostname=hostname,
708 ssl_context=ssl_context,
709 assert_hostname=proxy_config.assert_hostname,
710 assert_fingerprint=proxy_config.assert_fingerprint,
711 # Features that aren't implemented for proxies yet:
712 cert_file=None,
713 key_file=None,
714 key_password=None,
715 tls_in_tls=False,
716 )
717 self.proxy_is_verified = sock_and_verified.is_verified
718 return sock_and_verified.socket # type: ignore[return-value]
721class _WrappedAndVerifiedSocket(typing.NamedTuple):
722 """
723 Wrapped socket and whether the connection is
724 verified after the TLS handshake
725 """
727 socket: ssl.SSLSocket | SSLTransport
728 is_verified: bool
731def _ssl_wrap_socket_and_match_hostname(
732 sock: socket.socket,
733 *,
734 cert_reqs: None | str | int,
735 ssl_version: None | str | int,
736 ssl_minimum_version: int | None,
737 ssl_maximum_version: int | None,
738 cert_file: str | None,
739 key_file: str | None,
740 key_password: str | None,
741 ca_certs: str | None,
742 ca_cert_dir: str | None,
743 ca_cert_data: None | str | bytes,
744 assert_hostname: None | str | typing.Literal[False],
745 assert_fingerprint: str | None,
746 server_hostname: str | None,
747 ssl_context: ssl.SSLContext | None,
748 tls_in_tls: bool = False,
749) -> _WrappedAndVerifiedSocket:
750 """Logic for constructing an SSLContext from all TLS parameters, passing
751 that down into ssl_wrap_socket, and then doing certificate verification
752 either via hostname or fingerprint. This function exists to guarantee
753 that both proxies and targets have the same behavior when connecting via TLS.
754 """
755 default_ssl_context = False
756 if ssl_context is None:
757 default_ssl_context = True
758 context = create_urllib3_context(
759 ssl_version=resolve_ssl_version(ssl_version),
760 ssl_minimum_version=ssl_minimum_version,
761 ssl_maximum_version=ssl_maximum_version,
762 cert_reqs=resolve_cert_reqs(cert_reqs),
763 )
764 else:
765 context = ssl_context
767 context.verify_mode = resolve_cert_reqs(cert_reqs)
769 # In some cases, we want to verify hostnames ourselves
770 if (
771 # `ssl` can't verify fingerprints or alternate hostnames
772 assert_fingerprint
773 or assert_hostname
774 # assert_hostname can be set to False to disable hostname checking
775 or assert_hostname is False
776 # We still support OpenSSL 1.0.2, which prevents us from verifying
777 # hostnames easily: https://github.com/pyca/pyopenssl/pull/933
778 or ssl_.IS_PYOPENSSL
779 or not ssl_.HAS_NEVER_CHECK_COMMON_NAME
780 ):
781 context.check_hostname = False
783 # Try to load OS default certs if none are given. We need to do the hasattr() check
784 # for custom pyOpenSSL SSLContext objects because they don't support
785 # load_default_certs().
786 if (
787 not ca_certs
788 and not ca_cert_dir
789 and not ca_cert_data
790 and default_ssl_context
791 and hasattr(context, "load_default_certs")
792 ):
793 context.load_default_certs()
795 # Ensure that IPv6 addresses are in the proper format and don't have a
796 # scope ID. Python's SSL module fails to recognize scoped IPv6 addresses
797 # and interprets them as DNS hostnames.
798 if server_hostname is not None:
799 normalized = server_hostname.strip("[]")
800 if "%" in normalized:
801 normalized = normalized[: normalized.rfind("%")]
802 if is_ipaddress(normalized):
803 server_hostname = normalized
805 ssl_sock = ssl_wrap_socket(
806 sock=sock,
807 keyfile=key_file,
808 certfile=cert_file,
809 key_password=key_password,
810 ca_certs=ca_certs,
811 ca_cert_dir=ca_cert_dir,
812 ca_cert_data=ca_cert_data,
813 server_hostname=server_hostname,
814 ssl_context=context,
815 tls_in_tls=tls_in_tls,
816 )
818 try:
819 if assert_fingerprint:
820 _assert_fingerprint(
821 ssl_sock.getpeercert(binary_form=True), assert_fingerprint
822 )
823 elif (
824 context.verify_mode != ssl.CERT_NONE
825 and not context.check_hostname
826 and assert_hostname is not False
827 ):
828 cert: _TYPE_PEER_CERT_RET_DICT = ssl_sock.getpeercert() # type: ignore[assignment]
830 # Need to signal to our match_hostname whether to use 'commonName' or not.
831 # If we're using our own constructed SSLContext we explicitly set 'False'
832 # because PyPy hard-codes 'True' from SSLContext.hostname_checks_common_name.
833 if default_ssl_context:
834 hostname_checks_common_name = False
835 else:
836 hostname_checks_common_name = (
837 getattr(context, "hostname_checks_common_name", False) or False
838 )
840 _match_hostname(
841 cert,
842 assert_hostname or server_hostname, # type: ignore[arg-type]
843 hostname_checks_common_name,
844 )
846 return _WrappedAndVerifiedSocket(
847 socket=ssl_sock,
848 is_verified=context.verify_mode == ssl.CERT_REQUIRED
849 or bool(assert_fingerprint),
850 )
851 except BaseException:
852 ssl_sock.close()
853 raise
856def _match_hostname(
857 cert: _TYPE_PEER_CERT_RET_DICT | None,
858 asserted_hostname: str,
859 hostname_checks_common_name: bool = False,
860) -> None:
861 # Our upstream implementation of ssl.match_hostname()
862 # only applies this normalization to IP addresses so it doesn't
863 # match DNS SANs so we do the same thing!
864 stripped_hostname = asserted_hostname.strip("[]")
865 if is_ipaddress(stripped_hostname):
866 asserted_hostname = stripped_hostname
868 try:
869 match_hostname(cert, asserted_hostname, hostname_checks_common_name)
870 except CertificateError as e:
871 log.warning(
872 "Certificate did not match expected hostname: %s. Certificate: %s",
873 asserted_hostname,
874 cert,
875 )
876 # Add cert to exception and reraise so client code can inspect
877 # the cert when catching the exception, if they want to
878 e._peer_cert = cert # type: ignore[attr-defined]
879 raise
882def _wrap_proxy_error(err: Exception, proxy_scheme: str | None) -> ProxyError:
883 # Look for the phrase 'wrong version number', if found
884 # then we should warn the user that we're very sure that
885 # this proxy is HTTP-only and they have a configuration issue.
886 error_normalized = " ".join(re.split("[^a-z]", str(err).lower()))
887 is_likely_http_proxy = (
888 "wrong version number" in error_normalized
889 or "unknown protocol" in error_normalized
890 or "record layer failure" in error_normalized
891 )
892 http_proxy_warning = (
893 ". Your proxy appears to only use HTTP and not HTTPS, "
894 "try changing your proxy URL to be HTTP. See: "
895 "https://urllib3.readthedocs.io/en/latest/advanced-usage.html"
896 "#https-proxy-error-http-proxy"
897 )
898 new_err = ProxyError(
899 f"Unable to connect to proxy"
900 f"{http_proxy_warning if is_likely_http_proxy and proxy_scheme == 'https' else ''}",
901 err,
902 )
903 new_err.__cause__ = err
904 return new_err
907def _get_default_user_agent() -> str:
908 return f"python-urllib3/{__version__}"
911class DummyConnection:
912 """Used to detect a failed ConnectionCls import."""
915if not ssl:
916 HTTPSConnection = DummyConnection # type: ignore[misc, assignment] # noqa: F811
919VerifiedHTTPSConnection = HTTPSConnection
922def _url_from_connection(
923 conn: HTTPConnection | HTTPSConnection, path: str | None = None
924) -> str:
925 """Returns the URL from a given connection. This is mainly used for testing and logging."""
927 scheme = "https" if isinstance(conn, HTTPSConnection) else "http"
929 return Url(scheme=scheme, host=conn.host, port=conn.port, path=path).url