Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/connection.py: 30%
324 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:05 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:05 +0000
1from __future__ import annotations
3import datetime
4import logging
5import os
6import re
7import socket
8import sys
9import typing
10import warnings
11from http.client import HTTPConnection as _HTTPConnection
12from http.client import HTTPException as HTTPException # noqa: F401
13from http.client import ResponseNotReady
14from socket import timeout as SocketTimeout
16if typing.TYPE_CHECKING:
17 from typing import Literal
19 from .response import HTTPResponse
20 from .util.ssl_ import _TYPE_PEER_CERT_RET_DICT
21 from .util.ssltransport import SSLTransport
23from ._collections import HTTPHeaderDict
24from .util.response import assert_header_parsing
25from .util.timeout import _DEFAULT_TIMEOUT, _TYPE_TIMEOUT, Timeout
26from .util.util import to_str
27from .util.wait import wait_for_read
29try: # Compiled with SSL?
30 import ssl
32 BaseSSLError = ssl.SSLError
33except (ImportError, AttributeError):
34 ssl = None # type: ignore[assignment]
36 class BaseSSLError(BaseException): # type: ignore[no-redef]
37 pass
40from ._base_connection import _TYPE_BODY
41from ._base_connection import ProxyConfig as ProxyConfig
42from ._base_connection import _ResponseOptions as _ResponseOptions
43from ._version import __version__
44from .exceptions import (
45 ConnectTimeoutError,
46 HeaderParsingError,
47 NameResolutionError,
48 NewConnectionError,
49 ProxyError,
50 SystemTimeWarning,
51)
52from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection, ssl_
53from .util.request import body_to_chunks
54from .util.ssl_ import assert_fingerprint as _assert_fingerprint
55from .util.ssl_ import (
56 create_urllib3_context,
57 is_ipaddress,
58 resolve_cert_reqs,
59 resolve_ssl_version,
60 ssl_wrap_socket,
61)
62from .util.ssl_match_hostname import CertificateError, match_hostname
63from .util.url import Url
65# Not a no-op, we're adding this to the namespace so it can be imported.
66ConnectionError = ConnectionError
67BrokenPipeError = BrokenPipeError
70log = logging.getLogger(__name__)
72port_by_scheme = {"http": 80, "https": 443}
74# When it comes time to update this value as a part of regular maintenance
75# (ie test_recent_date is failing) update it to ~6 months before the current date.
76RECENT_DATE = datetime.date(2022, 1, 1)
78_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]")
80_HAS_SYS_AUDIT = hasattr(sys, "audit")
83class HTTPConnection(_HTTPConnection):
84 """
85 Based on :class:`http.client.HTTPConnection` but provides an extra constructor
86 backwards-compatibility layer between older and newer Pythons.
88 Additional keyword parameters are used to configure attributes of the connection.
89 Accepted parameters include:
91 - ``source_address``: Set the source address for the current connection.
92 - ``socket_options``: Set specific options on the underlying socket. If not specified, then
93 defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling
94 Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy.
96 For example, if you wish to enable TCP Keep Alive in addition to the defaults,
97 you might pass:
99 .. code-block:: python
101 HTTPConnection.default_socket_options + [
102 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
103 ]
105 Or you may want to disable the defaults by passing an empty list (e.g., ``[]``).
106 """
108 default_port: typing.ClassVar[int] = port_by_scheme["http"] # type: ignore[misc]
110 #: Disable Nagle's algorithm by default.
111 #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]``
112 default_socket_options: typing.ClassVar[connection._TYPE_SOCKET_OPTIONS] = [
113 (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
114 ]
116 #: Whether this connection verifies the host's certificate.
117 is_verified: bool = False
119 #: Whether this proxy connection verified the proxy host's certificate.
120 # If no proxy is currently connected to the value will be ``None``.
121 proxy_is_verified: bool | None = None
123 blocksize: int
124 source_address: tuple[str, int] | None
125 socket_options: connection._TYPE_SOCKET_OPTIONS | None
127 _has_connected_to_proxy: bool
128 _response_options: _ResponseOptions | None
129 _tunnel_host: str | None
130 _tunnel_port: int | None
131 _tunnel_scheme: str | None
133 def __init__(
134 self,
135 host: str,
136 port: int | None = None,
137 *,
138 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT,
139 source_address: tuple[str, int] | None = None,
140 blocksize: int = 16384,
141 socket_options: None
142 | (connection._TYPE_SOCKET_OPTIONS) = default_socket_options,
143 proxy: Url | None = None,
144 proxy_config: ProxyConfig | None = None,
145 ) -> None:
146 super().__init__(
147 host=host,
148 port=port,
149 timeout=Timeout.resolve_default_timeout(timeout),
150 source_address=source_address,
151 blocksize=blocksize,
152 )
153 self.socket_options = socket_options
154 self.proxy = proxy
155 self.proxy_config = proxy_config
157 self._has_connected_to_proxy = False
158 self._response_options = None
159 self._tunnel_host: str | None = None
160 self._tunnel_port: int | None = None
161 self._tunnel_scheme: str | None = None
163 # https://github.com/python/mypy/issues/4125
164 # Mypy treats this as LSP violation, which is considered a bug.
165 # If `host` is made a property it violates LSP, because a writeable attribute is overridden with a read-only one.
166 # However, there is also a `host` setter so LSP is not violated.
167 # Potentially, a `@host.deleter` might be needed depending on how this issue will be fixed.
168 @property
169 def host(self) -> str:
170 """
171 Getter method to remove any trailing dots that indicate the hostname is an FQDN.
173 In general, SSL certificates don't include the trailing dot indicating a
174 fully-qualified domain name, and thus, they don't validate properly when
175 checked against a domain name that includes the dot. In addition, some
176 servers may not expect to receive the trailing dot when provided.
178 However, the hostname with trailing dot is critical to DNS resolution; doing a
179 lookup with the trailing dot will properly only resolve the appropriate FQDN,
180 whereas a lookup without a trailing dot will search the system's search domain
181 list. Thus, it's important to keep the original host around for use only in
182 those cases where it's appropriate (i.e., when doing DNS lookup to establish the
183 actual TCP connection across which we're going to send HTTP requests).
184 """
185 return self._dns_host.rstrip(".")
187 @host.setter
188 def host(self, value: str) -> None:
189 """
190 Setter for the `host` property.
192 We assume that only urllib3 uses the _dns_host attribute; httplib itself
193 only uses `host`, and it seems reasonable that other libraries follow suit.
194 """
195 self._dns_host = value
197 def _new_conn(self) -> socket.socket:
198 """Establish a socket connection and set nodelay settings on it.
200 :return: New socket connection.
201 """
202 try:
203 sock = connection.create_connection(
204 (self._dns_host, self.port),
205 self.timeout,
206 source_address=self.source_address,
207 socket_options=self.socket_options,
208 )
209 except socket.gaierror as e:
210 raise NameResolutionError(self.host, self, e) from e
211 except SocketTimeout as e:
212 raise ConnectTimeoutError(
213 self,
214 f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
215 ) from e
217 except OSError as e:
218 raise NewConnectionError(
219 self, f"Failed to establish a new connection: {e}"
220 ) from e
222 # Audit hooks are only available in Python 3.8+
223 if _HAS_SYS_AUDIT:
224 sys.audit("http.client.connect", self, self.host, self.port)
226 return sock
228 def set_tunnel(
229 self,
230 host: str,
231 port: int | None = None,
232 headers: typing.Mapping[str, str] | None = None,
233 scheme: str = "http",
234 ) -> None:
235 if scheme not in ("http", "https"):
236 raise ValueError(
237 f"Invalid proxy scheme for tunneling: {scheme!r}, must be either 'http' or 'https'"
238 )
239 super().set_tunnel(host, port=port, headers=headers)
240 self._tunnel_scheme = scheme
242 def connect(self) -> None:
243 self.sock = self._new_conn()
244 if self._tunnel_host:
245 # If we're tunneling it means we're connected to our proxy.
246 self._has_connected_to_proxy = True
248 # TODO: Fix tunnel so it doesn't depend on self.sock state.
249 self._tunnel() # type: ignore[attr-defined]
251 # If there's a proxy to be connected to we are fully connected.
252 # This is set twice (once above and here) due to forwarding proxies
253 # not using tunnelling.
254 self._has_connected_to_proxy = bool(self.proxy)
256 @property
257 def is_closed(self) -> bool:
258 return self.sock is None
260 @property
261 def is_connected(self) -> bool:
262 if self.sock is None:
263 return False
264 return not wait_for_read(self.sock, timeout=0.0)
266 @property
267 def has_connected_to_proxy(self) -> bool:
268 return self._has_connected_to_proxy
270 def close(self) -> None:
271 try:
272 super().close()
273 finally:
274 # Reset all stateful properties so connection
275 # can be re-used without leaking prior configs.
276 self.sock = None
277 self.is_verified = False
278 self.proxy_is_verified = None
279 self._has_connected_to_proxy = False
280 self._response_options = None
281 self._tunnel_host = None
282 self._tunnel_port = None
283 self._tunnel_scheme = None
285 def putrequest(
286 self,
287 method: str,
288 url: str,
289 skip_host: bool = False,
290 skip_accept_encoding: bool = False,
291 ) -> None:
292 """"""
293 # Empty docstring because the indentation of CPython's implementation
294 # is broken but we don't want this method in our documentation.
295 match = _CONTAINS_CONTROL_CHAR_RE.search(method)
296 if match:
297 raise ValueError(
298 f"Method cannot contain non-token characters {method!r} (found at least {match.group()!r})"
299 )
301 return super().putrequest(
302 method, url, skip_host=skip_host, skip_accept_encoding=skip_accept_encoding
303 )
305 def putheader(self, header: str, *values: str) -> None:
306 """"""
307 if not any(isinstance(v, str) and v == SKIP_HEADER for v in values):
308 super().putheader(header, *values)
309 elif to_str(header.lower()) not in SKIPPABLE_HEADERS:
310 skippable_headers = "', '".join(
311 [str.title(header) for header in sorted(SKIPPABLE_HEADERS)]
312 )
313 raise ValueError(
314 f"urllib3.util.SKIP_HEADER only supports '{skippable_headers}'"
315 )
317 # `request` method's signature intentionally violates LSP.
318 # urllib3's API is different from `http.client.HTTPConnection` and the subclassing is only incidental.
319 def request( # type: ignore[override]
320 self,
321 method: str,
322 url: str,
323 body: _TYPE_BODY | None = None,
324 headers: typing.Mapping[str, str] | None = None,
325 *,
326 chunked: bool = False,
327 preload_content: bool = True,
328 decode_content: bool = True,
329 enforce_content_length: bool = True,
330 ) -> None:
331 # Update the inner socket's timeout value to send the request.
332 # This only triggers if the connection is re-used.
333 if self.sock is not None:
334 self.sock.settimeout(self.timeout)
336 # Store these values to be fed into the HTTPResponse
337 # object later. TODO: Remove this in favor of a real
338 # HTTP lifecycle mechanism.
340 # We have to store these before we call .request()
341 # because sometimes we can still salvage a response
342 # off the wire even if we aren't able to completely
343 # send the request body.
344 self._response_options = _ResponseOptions(
345 request_method=method,
346 request_url=url,
347 preload_content=preload_content,
348 decode_content=decode_content,
349 enforce_content_length=enforce_content_length,
350 )
352 if headers is None:
353 headers = {}
354 header_keys = frozenset(to_str(k.lower()) for k in headers)
355 skip_accept_encoding = "accept-encoding" in header_keys
356 skip_host = "host" in header_keys
357 self.putrequest(
358 method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host
359 )
361 # Transform the body into an iterable of sendall()-able chunks
362 # and detect if an explicit Content-Length is doable.
363 chunks_and_cl = body_to_chunks(body, method=method, blocksize=self.blocksize)
364 chunks = chunks_and_cl.chunks
365 content_length = chunks_and_cl.content_length
367 # When chunked is explicit set to 'True' we respect that.
368 if chunked:
369 if "transfer-encoding" not in header_keys:
370 self.putheader("Transfer-Encoding", "chunked")
371 else:
372 # Detect whether a framing mechanism is already in use. If so
373 # we respect that value, otherwise we pick chunked vs content-length
374 # depending on the type of 'body'.
375 if "content-length" in header_keys:
376 chunked = False
377 elif "transfer-encoding" in header_keys:
378 chunked = True
380 # Otherwise we go off the recommendation of 'body_to_chunks()'.
381 else:
382 chunked = False
383 if content_length is None:
384 if chunks is not None:
385 chunked = True
386 self.putheader("Transfer-Encoding", "chunked")
387 else:
388 self.putheader("Content-Length", str(content_length))
390 # Now that framing headers are out of the way we send all the other headers.
391 if "user-agent" not in header_keys:
392 self.putheader("User-Agent", _get_default_user_agent())
393 for header, value in headers.items():
394 self.putheader(header, value)
395 self.endheaders()
397 # If we're given a body we start sending that in chunks.
398 if chunks is not None:
399 for chunk in chunks:
400 # Sending empty chunks isn't allowed for TE: chunked
401 # as it indicates the end of the body.
402 if not chunk:
403 continue
404 if isinstance(chunk, str):
405 chunk = chunk.encode("utf-8")
406 if chunked:
407 self.send(b"%x\r\n%b\r\n" % (len(chunk), chunk))
408 else:
409 self.send(chunk)
411 # Regardless of whether we have a body or not, if we're in
412 # chunked mode we want to send an explicit empty chunk.
413 if chunked:
414 self.send(b"0\r\n\r\n")
416 def request_chunked(
417 self,
418 method: str,
419 url: str,
420 body: _TYPE_BODY | None = None,
421 headers: typing.Mapping[str, str] | None = None,
422 ) -> None:
423 """
424 Alternative to the common request method, which sends the
425 body with chunked encoding and not as one block
426 """
427 warnings.warn(
428 "HTTPConnection.request_chunked() is deprecated and will be removed "
429 "in urllib3 v2.1.0. Instead use HTTPConnection.request(..., chunked=True).",
430 category=DeprecationWarning,
431 stacklevel=2,
432 )
433 self.request(method, url, body=body, headers=headers, chunked=True)
435 def getresponse( # type: ignore[override]
436 self,
437 ) -> HTTPResponse:
438 """
439 Get the response from the server.
441 If the HTTPConnection is in the correct state, returns an instance of HTTPResponse or of whatever object is returned by the response_class variable.
443 If a request has not been sent or if a previous response has not be handled, ResponseNotReady is raised. If the HTTP response indicates that the connection should be closed, then it will be closed before the response is returned. When the connection is closed, the underlying socket is closed.
444 """
445 # Raise the same error as http.client.HTTPConnection
446 if self._response_options is None:
447 raise ResponseNotReady()
449 # Reset this attribute for being used again.
450 resp_options = self._response_options
451 self._response_options = None
453 # Since the connection's timeout value may have been updated
454 # we need to set the timeout on the socket.
455 self.sock.settimeout(self.timeout)
457 # This is needed here to avoid circular import errors
458 from .response import HTTPResponse
460 # Get the response from http.client.HTTPConnection
461 httplib_response = super().getresponse()
463 try:
464 assert_header_parsing(httplib_response.msg)
465 except (HeaderParsingError, TypeError) as hpe:
466 log.warning(
467 "Failed to parse headers (url=%s): %s",
468 _url_from_connection(self, resp_options.request_url),
469 hpe,
470 exc_info=True,
471 )
473 headers = HTTPHeaderDict(httplib_response.msg.items())
475 response = HTTPResponse(
476 body=httplib_response,
477 headers=headers,
478 status=httplib_response.status,
479 version=httplib_response.version,
480 reason=httplib_response.reason,
481 preload_content=resp_options.preload_content,
482 decode_content=resp_options.decode_content,
483 original_response=httplib_response,
484 enforce_content_length=resp_options.enforce_content_length,
485 request_method=resp_options.request_method,
486 request_url=resp_options.request_url,
487 )
488 return response
491class HTTPSConnection(HTTPConnection):
492 """
493 Many of the parameters to this constructor are passed to the underlying SSL
494 socket by means of :py:func:`urllib3.util.ssl_wrap_socket`.
495 """
497 default_port = port_by_scheme["https"] # type: ignore[misc]
499 cert_reqs: int | str | None = None
500 ca_certs: str | None = None
501 ca_cert_dir: str | None = None
502 ca_cert_data: None | str | bytes = None
503 ssl_version: int | str | None = None
504 ssl_minimum_version: int | None = None
505 ssl_maximum_version: int | None = None
506 assert_fingerprint: str | None = None
508 def __init__(
509 self,
510 host: str,
511 port: int | None = None,
512 *,
513 timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT,
514 source_address: tuple[str, int] | None = None,
515 blocksize: int = 16384,
516 socket_options: None
517 | (connection._TYPE_SOCKET_OPTIONS) = HTTPConnection.default_socket_options,
518 proxy: Url | None = None,
519 proxy_config: ProxyConfig | None = None,
520 cert_reqs: int | str | None = None,
521 assert_hostname: None | str | Literal[False] = None,
522 assert_fingerprint: str | None = None,
523 server_hostname: str | None = None,
524 ssl_context: ssl.SSLContext | None = None,
525 ca_certs: str | None = None,
526 ca_cert_dir: str | None = None,
527 ca_cert_data: None | str | bytes = None,
528 ssl_minimum_version: int | None = None,
529 ssl_maximum_version: int | None = None,
530 ssl_version: int | str | None = None, # Deprecated
531 cert_file: str | None = None,
532 key_file: str | None = None,
533 key_password: str | None = None,
534 ) -> None:
535 super().__init__(
536 host,
537 port=port,
538 timeout=timeout,
539 source_address=source_address,
540 blocksize=blocksize,
541 socket_options=socket_options,
542 proxy=proxy,
543 proxy_config=proxy_config,
544 )
546 self.key_file = key_file
547 self.cert_file = cert_file
548 self.key_password = key_password
549 self.ssl_context = ssl_context
550 self.server_hostname = server_hostname
551 self.assert_hostname = assert_hostname
552 self.assert_fingerprint = assert_fingerprint
553 self.ssl_version = ssl_version
554 self.ssl_minimum_version = ssl_minimum_version
555 self.ssl_maximum_version = ssl_maximum_version
556 self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
557 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
558 self.ca_cert_data = ca_cert_data
560 # cert_reqs depends on ssl_context so calculate last.
561 if cert_reqs is None:
562 if self.ssl_context is not None:
563 cert_reqs = self.ssl_context.verify_mode
564 else:
565 cert_reqs = resolve_cert_reqs(None)
566 self.cert_reqs = cert_reqs
568 def set_cert(
569 self,
570 key_file: str | None = None,
571 cert_file: str | None = None,
572 cert_reqs: int | str | None = None,
573 key_password: str | None = None,
574 ca_certs: str | None = None,
575 assert_hostname: None | str | Literal[False] = None,
576 assert_fingerprint: str | None = None,
577 ca_cert_dir: str | None = None,
578 ca_cert_data: None | str | bytes = None,
579 ) -> None:
580 """
581 This method should only be called once, before the connection is used.
582 """
583 warnings.warn(
584 "HTTPSConnection.set_cert() is deprecated and will be removed "
585 "in urllib3 v2.1.0. Instead provide the parameters to the "
586 "HTTPSConnection constructor.",
587 category=DeprecationWarning,
588 stacklevel=2,
589 )
591 # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also
592 # have an SSLContext object in which case we'll use its verify_mode.
593 if cert_reqs is None:
594 if self.ssl_context is not None:
595 cert_reqs = self.ssl_context.verify_mode
596 else:
597 cert_reqs = resolve_cert_reqs(None)
599 self.key_file = key_file
600 self.cert_file = cert_file
601 self.cert_reqs = cert_reqs
602 self.key_password = key_password
603 self.assert_hostname = assert_hostname
604 self.assert_fingerprint = assert_fingerprint
605 self.ca_certs = ca_certs and os.path.expanduser(ca_certs)
606 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir)
607 self.ca_cert_data = ca_cert_data
609 def connect(self) -> None:
610 sock: socket.socket | ssl.SSLSocket
611 self.sock = sock = self._new_conn()
612 server_hostname: str = self.host
613 tls_in_tls = False
615 # Do we need to establish a tunnel?
616 if self._tunnel_host is not None:
617 # We're tunneling to an HTTPS origin so need to do TLS-in-TLS.
618 if self._tunnel_scheme == "https":
619 self.sock = sock = self._connect_tls_proxy(self.host, sock)
620 tls_in_tls = True
622 # If we're tunneling it means we're connected to our proxy.
623 self._has_connected_to_proxy = True
625 self._tunnel() # type: ignore[attr-defined]
626 # Override the host with the one we're requesting data from.
627 server_hostname = self._tunnel_host
629 if self.server_hostname is not None:
630 server_hostname = self.server_hostname
632 is_time_off = datetime.date.today() < RECENT_DATE
633 if is_time_off:
634 warnings.warn(
635 (
636 f"System time is way off (before {RECENT_DATE}). This will probably "
637 "lead to SSL verification errors"
638 ),
639 SystemTimeWarning,
640 )
642 sock_and_verified = _ssl_wrap_socket_and_match_hostname(
643 sock=sock,
644 cert_reqs=self.cert_reqs,
645 ssl_version=self.ssl_version,
646 ssl_minimum_version=self.ssl_minimum_version,
647 ssl_maximum_version=self.ssl_maximum_version,
648 ca_certs=self.ca_certs,
649 ca_cert_dir=self.ca_cert_dir,
650 ca_cert_data=self.ca_cert_data,
651 cert_file=self.cert_file,
652 key_file=self.key_file,
653 key_password=self.key_password,
654 server_hostname=server_hostname,
655 ssl_context=self.ssl_context,
656 tls_in_tls=tls_in_tls,
657 assert_hostname=self.assert_hostname,
658 assert_fingerprint=self.assert_fingerprint,
659 )
660 self.sock = sock_and_verified.socket
661 self.is_verified = sock_and_verified.is_verified
663 # If there's a proxy to be connected to we are fully connected.
664 # This is set twice (once above and here) due to forwarding proxies
665 # not using tunnelling.
666 self._has_connected_to_proxy = bool(self.proxy)
668 def _connect_tls_proxy(self, hostname: str, sock: socket.socket) -> ssl.SSLSocket:
669 """
670 Establish a TLS connection to the proxy using the provided SSL context.
671 """
672 # `_connect_tls_proxy` is called when self._tunnel_host is truthy.
673 proxy_config = typing.cast(ProxyConfig, self.proxy_config)
674 ssl_context = proxy_config.ssl_context
675 sock_and_verified = _ssl_wrap_socket_and_match_hostname(
676 sock,
677 cert_reqs=self.cert_reqs,
678 ssl_version=self.ssl_version,
679 ssl_minimum_version=self.ssl_minimum_version,
680 ssl_maximum_version=self.ssl_maximum_version,
681 ca_certs=self.ca_certs,
682 ca_cert_dir=self.ca_cert_dir,
683 ca_cert_data=self.ca_cert_data,
684 server_hostname=hostname,
685 ssl_context=ssl_context,
686 assert_hostname=proxy_config.assert_hostname,
687 assert_fingerprint=proxy_config.assert_fingerprint,
688 # Features that aren't implemented for proxies yet:
689 cert_file=None,
690 key_file=None,
691 key_password=None,
692 tls_in_tls=False,
693 )
694 self.proxy_is_verified = sock_and_verified.is_verified
695 return sock_and_verified.socket # type: ignore[return-value]
698class _WrappedAndVerifiedSocket(typing.NamedTuple):
699 """
700 Wrapped socket and whether the connection is
701 verified after the TLS handshake
702 """
704 socket: ssl.SSLSocket | SSLTransport
705 is_verified: bool
708def _ssl_wrap_socket_and_match_hostname(
709 sock: socket.socket,
710 *,
711 cert_reqs: None | str | int,
712 ssl_version: None | str | int,
713 ssl_minimum_version: int | None,
714 ssl_maximum_version: int | None,
715 cert_file: str | None,
716 key_file: str | None,
717 key_password: str | None,
718 ca_certs: str | None,
719 ca_cert_dir: str | None,
720 ca_cert_data: None | str | bytes,
721 assert_hostname: None | str | Literal[False],
722 assert_fingerprint: str | None,
723 server_hostname: str | None,
724 ssl_context: ssl.SSLContext | None,
725 tls_in_tls: bool = False,
726) -> _WrappedAndVerifiedSocket:
727 """Logic for constructing an SSLContext from all TLS parameters, passing
728 that down into ssl_wrap_socket, and then doing certificate verification
729 either via hostname or fingerprint. This function exists to guarantee
730 that both proxies and targets have the same behavior when connecting via TLS.
731 """
732 default_ssl_context = False
733 if ssl_context is None:
734 default_ssl_context = True
735 context = create_urllib3_context(
736 ssl_version=resolve_ssl_version(ssl_version),
737 ssl_minimum_version=ssl_minimum_version,
738 ssl_maximum_version=ssl_maximum_version,
739 cert_reqs=resolve_cert_reqs(cert_reqs),
740 )
741 else:
742 context = ssl_context
744 context.verify_mode = resolve_cert_reqs(cert_reqs)
746 # In some cases, we want to verify hostnames ourselves
747 if (
748 # `ssl` can't verify fingerprints or alternate hostnames
749 assert_fingerprint
750 or assert_hostname
751 # assert_hostname can be set to False to disable hostname checking
752 or assert_hostname is False
753 # We still support OpenSSL 1.0.2, which prevents us from verifying
754 # hostnames easily: https://github.com/pyca/pyopenssl/pull/933
755 or ssl_.IS_PYOPENSSL
756 or not ssl_.HAS_NEVER_CHECK_COMMON_NAME
757 ):
758 context.check_hostname = False
760 # Try to load OS default certs if none are given. We need to do the hasattr() check
761 # for custom pyOpenSSL SSLContext objects because they don't support
762 # load_default_certs().
763 if (
764 not ca_certs
765 and not ca_cert_dir
766 and not ca_cert_data
767 and default_ssl_context
768 and hasattr(context, "load_default_certs")
769 ):
770 context.load_default_certs()
772 # Ensure that IPv6 addresses are in the proper format and don't have a
773 # scope ID. Python's SSL module fails to recognize scoped IPv6 addresses
774 # and interprets them as DNS hostnames.
775 if server_hostname is not None:
776 normalized = server_hostname.strip("[]")
777 if "%" in normalized:
778 normalized = normalized[: normalized.rfind("%")]
779 if is_ipaddress(normalized):
780 server_hostname = normalized
782 ssl_sock = ssl_wrap_socket(
783 sock=sock,
784 keyfile=key_file,
785 certfile=cert_file,
786 key_password=key_password,
787 ca_certs=ca_certs,
788 ca_cert_dir=ca_cert_dir,
789 ca_cert_data=ca_cert_data,
790 server_hostname=server_hostname,
791 ssl_context=context,
792 tls_in_tls=tls_in_tls,
793 )
795 try:
796 if assert_fingerprint:
797 _assert_fingerprint(
798 ssl_sock.getpeercert(binary_form=True), assert_fingerprint
799 )
800 elif (
801 context.verify_mode != ssl.CERT_NONE
802 and not context.check_hostname
803 and assert_hostname is not False
804 ):
805 cert: _TYPE_PEER_CERT_RET_DICT = ssl_sock.getpeercert() # type: ignore[assignment]
807 # Need to signal to our match_hostname whether to use 'commonName' or not.
808 # If we're using our own constructed SSLContext we explicitly set 'False'
809 # because PyPy hard-codes 'True' from SSLContext.hostname_checks_common_name.
810 if default_ssl_context:
811 hostname_checks_common_name = False
812 else:
813 hostname_checks_common_name = (
814 getattr(context, "hostname_checks_common_name", False) or False
815 )
817 _match_hostname(
818 cert,
819 assert_hostname or server_hostname, # type: ignore[arg-type]
820 hostname_checks_common_name,
821 )
823 return _WrappedAndVerifiedSocket(
824 socket=ssl_sock,
825 is_verified=context.verify_mode == ssl.CERT_REQUIRED
826 or bool(assert_fingerprint),
827 )
828 except BaseException:
829 ssl_sock.close()
830 raise
833def _match_hostname(
834 cert: _TYPE_PEER_CERT_RET_DICT | None,
835 asserted_hostname: str,
836 hostname_checks_common_name: bool = False,
837) -> None:
838 # Our upstream implementation of ssl.match_hostname()
839 # only applies this normalization to IP addresses so it doesn't
840 # match DNS SANs so we do the same thing!
841 stripped_hostname = asserted_hostname.strip("[]")
842 if is_ipaddress(stripped_hostname):
843 asserted_hostname = stripped_hostname
845 try:
846 match_hostname(cert, asserted_hostname, hostname_checks_common_name)
847 except CertificateError as e:
848 log.warning(
849 "Certificate did not match expected hostname: %s. Certificate: %s",
850 asserted_hostname,
851 cert,
852 )
853 # Add cert to exception and reraise so client code can inspect
854 # the cert when catching the exception, if they want to
855 e._peer_cert = cert # type: ignore[attr-defined]
856 raise
859def _wrap_proxy_error(err: Exception, proxy_scheme: str | None) -> ProxyError:
860 # Look for the phrase 'wrong version number', if found
861 # then we should warn the user that we're very sure that
862 # this proxy is HTTP-only and they have a configuration issue.
863 error_normalized = " ".join(re.split("[^a-z]", str(err).lower()))
864 is_likely_http_proxy = (
865 "wrong version number" in error_normalized
866 or "unknown protocol" in error_normalized
867 )
868 http_proxy_warning = (
869 ". Your proxy appears to only use HTTP and not HTTPS, "
870 "try changing your proxy URL to be HTTP. See: "
871 "https://urllib3.readthedocs.io/en/latest/advanced-usage.html"
872 "#https-proxy-error-http-proxy"
873 )
874 new_err = ProxyError(
875 f"Unable to connect to proxy"
876 f"{http_proxy_warning if is_likely_http_proxy and proxy_scheme == 'https' else ''}",
877 err,
878 )
879 new_err.__cause__ = err
880 return new_err
883def _get_default_user_agent() -> str:
884 return f"python-urllib3/{__version__}"
887class DummyConnection:
888 """Used to detect a failed ConnectionCls import."""
891if not ssl:
892 HTTPSConnection = DummyConnection # type: ignore[misc, assignment] # noqa: F811
895VerifiedHTTPSConnection = HTTPSConnection
898def _url_from_connection(
899 conn: HTTPConnection | HTTPSConnection, path: str | None = None
900) -> str:
901 """Returns the URL from a given connection. This is mainly used for testing and logging."""
903 scheme = "https" if isinstance(conn, HTTPSConnection) else "http"
905 return Url(scheme=scheme, host=conn.host, port=conn.port, path=path).url