Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/connector.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import functools
3import random
4import socket
5import sys
6import traceback
7import warnings
8from collections import OrderedDict, defaultdict, deque
9from collections.abc import Awaitable, Callable, Iterator, Sequence
10from contextlib import suppress
11from http import HTTPStatus
12from itertools import chain, cycle, islice
13from time import monotonic
14from types import TracebackType
15from typing import TYPE_CHECKING, Any, Literal, cast
17import aiohappyeyeballs
18from aiohappyeyeballs import AddrInfoType, SocketFactoryType
19from multidict import CIMultiDict
21from . import hdrs, helpers
22from .abc import AbstractResolver, ResolveResult
23from .client_exceptions import (
24 ClientConnectionError,
25 ClientConnectorCertificateError,
26 ClientConnectorDNSError,
27 ClientConnectorError,
28 ClientConnectorSSLError,
29 ClientHttpProxyError,
30 ClientProxyConnectionError,
31 InvalidUrlClientError,
32 ServerFingerprintMismatch,
33 UnixClientConnectorError,
34 cert_errors,
35 ssl_errors,
36)
37from .client_proto import ResponseHandler
38from .client_reqrep import (
39 SSL_ALLOWED_TYPES,
40 ClientRequest,
41 ClientRequestBase,
42 Fingerprint,
43)
44from .helpers import (
45 _SENTINEL,
46 ceil_timeout,
47 is_canonical_ipv4_address,
48 is_ip_address,
49 sentinel,
50 set_exception,
51 set_result,
52)
53from .log import client_logger
54from .resolver import DefaultResolver
56if sys.version_info >= (3, 12):
57 from collections.abc import Buffer
58else:
59 Buffer = "bytes | bytearray | memoryview[int] | memoryview[bytes]"
61try:
62 import ssl
64 SSLContext = ssl.SSLContext
65except ImportError: # pragma: no cover
66 ssl = None # type: ignore[assignment]
67 SSLContext = object # type: ignore[misc,assignment]
69EMPTY_SCHEMA_SET = frozenset({""})
70HTTP_SCHEMA_SET = frozenset({"http", "https"})
71WS_SCHEMA_SET = frozenset({"ws", "wss"})
73HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET
74HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET
76NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < (
77 3,
78 13,
79 1,
80) or sys.version_info < (3, 12, 8)
81# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960
82# which first appeared in Python 3.12.8 and 3.13.1
85__all__ = (
86 "BaseConnector",
87 "TCPConnector",
88 "UnixConnector",
89 "NamedPipeConnector",
90 "AddrInfoType",
91 "SocketFactoryType",
92)
95if TYPE_CHECKING:
96 from .client import ClientTimeout
97 from .client_reqrep import ConnectionKey
98 from .tracing import Trace
101class Connection:
102 """Represents a single connection."""
104 __slots__ = (
105 "_key",
106 "_connector",
107 "_loop",
108 "_protocol",
109 "_callbacks",
110 "_source_traceback",
111 )
113 def __init__(
114 self,
115 connector: "BaseConnector",
116 key: "ConnectionKey",
117 protocol: ResponseHandler,
118 loop: asyncio.AbstractEventLoop,
119 ) -> None:
120 self._key = key
121 self._connector = connector
122 self._loop = loop
123 self._protocol: ResponseHandler | None = protocol
124 self._callbacks: list[Callable[[], None]] = []
125 self._source_traceback = (
126 traceback.extract_stack(sys._getframe(1)) if loop.get_debug() else None
127 )
129 def __repr__(self) -> str:
130 return f"Connection<{self._key}>"
132 def __del__(self, _warnings: Any = warnings) -> None:
133 if self._protocol is not None:
134 _warnings.warn(
135 f"Unclosed connection {self!r}", ResourceWarning, source=self
136 )
137 if self._loop.is_closed():
138 return
140 self._connector._release(self._key, self._protocol, should_close=True)
142 context = {"client_connection": self, "message": "Unclosed connection"}
143 if self._source_traceback is not None:
144 context["source_traceback"] = self._source_traceback
145 self._loop.call_exception_handler(context)
147 def __bool__(self) -> Literal[True]:
148 """Force subclasses to not be falsy, to make checks simpler."""
149 return True
151 @property
152 def transport(self) -> asyncio.Transport | None:
153 if self._protocol is None:
154 return None
155 return self._protocol.transport
157 @property
158 def protocol(self) -> ResponseHandler | None:
159 return self._protocol
161 def add_callback(self, callback: Callable[[], None]) -> None:
162 if callback is not None:
163 self._callbacks.append(callback)
165 def _notify_release(self) -> None:
166 callbacks, self._callbacks = self._callbacks[:], []
168 for cb in callbacks:
169 with suppress(Exception):
170 cb()
172 def close(self) -> None:
173 self._notify_release()
175 if self._protocol is not None:
176 self._connector._release(self._key, self._protocol, should_close=True)
177 self._protocol = None
179 def release(self) -> None:
180 self._notify_release()
182 if self._protocol is not None:
183 self._connector._release(self._key, self._protocol)
184 self._protocol = None
186 @property
187 def closed(self) -> bool:
188 return self._protocol is None or not self._protocol.is_connected()
191class _ConnectTunnelConnection(Connection):
192 """Special connection wrapper for CONNECT tunnels that must never be pooled.
194 This connection wraps the proxy connection that will be upgraded with TLS.
195 It must never be released to the pool because:
196 1. Its 'closed' future will never complete, causing session.close() to hang
197 2. It represents an intermediate state, not a reusable connection
198 3. The real connection (with TLS) will be created separately
199 """
201 def release(self) -> None:
202 """Do nothing - don't pool or close the connection.
204 These connections are an intermediate state during the CONNECT tunnel
205 setup and will be cleaned up naturally after the TLS upgrade. If they
206 were to be pooled, they would never be properly closed, causing
207 session.close() to wait forever for their 'closed' future.
208 """
211class _TransportPlaceholder:
212 """placeholder for BaseConnector.connect function"""
214 __slots__ = ("closed", "transport")
216 def __init__(self, closed_future: asyncio.Future[Exception | None]) -> None:
217 """Initialize a placeholder for a transport."""
218 self.closed = closed_future
219 self.transport = None
221 def close(self) -> None:
222 """Close the placeholder."""
224 def abort(self) -> None:
225 """Abort the placeholder (does nothing)."""
228class BaseConnector:
229 """Base connector class.
231 keepalive_timeout - (optional) Keep-alive timeout.
232 force_close - Set to True to force close and do reconnect
233 after each request (and between redirects).
234 limit - The total number of simultaneous connections.
235 limit_per_host - Number of simultaneous connections to one host.
236 enable_cleanup_closed - Enables clean-up closed ssl transports.
237 Disabled by default.
238 timeout_ceil_threshold - Trigger ceiling of timeout values when
239 it's above timeout_ceil_threshold.
240 loop - Optional event loop.
241 """
243 _closed = True # prevent AttributeError in __del__ if ctor was failed
244 _source_traceback = None
246 # abort transport after 2 seconds (cleanup broken connections)
247 _cleanup_closed_period = 2.0
249 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET
251 def __init__(
252 self,
253 *,
254 keepalive_timeout: _SENTINEL | None | float = sentinel,
255 force_close: bool = False,
256 limit: int = 100,
257 limit_per_host: int = 0,
258 enable_cleanup_closed: bool = False,
259 timeout_ceil_threshold: float = 5,
260 ) -> None:
261 if force_close:
262 if keepalive_timeout is not None and keepalive_timeout is not sentinel:
263 raise ValueError(
264 "keepalive_timeout cannot be set if force_close is True"
265 )
266 else:
267 if keepalive_timeout is sentinel:
268 keepalive_timeout = 15.0
270 self._timeout_ceil_threshold = timeout_ceil_threshold
272 loop = asyncio.get_running_loop()
274 self._closed = False
275 if loop.get_debug():
276 self._source_traceback = traceback.extract_stack(sys._getframe(1))
278 # Connection pool of reusable connections.
279 # We use a deque to store connections because it has O(1) popleft()
280 # and O(1) append() operations to implement a FIFO queue.
281 self._conns: defaultdict[
282 ConnectionKey, deque[tuple[ResponseHandler, float]]
283 ] = defaultdict(deque)
284 self._limit = limit
285 self._limit_per_host = limit_per_host
286 self._acquired: set[ResponseHandler] = set()
287 self._acquired_per_host: defaultdict[ConnectionKey, set[ResponseHandler]] = (
288 defaultdict(set)
289 )
290 self._keepalive_timeout = cast(float, keepalive_timeout)
291 self._force_close = force_close
293 # {host_key: FIFO list of waiters}
294 # The FIFO is implemented with an OrderedDict with None keys because
295 # python does not have an ordered set.
296 self._waiters: defaultdict[
297 ConnectionKey, OrderedDict[asyncio.Future[None], None]
298 ] = defaultdict(OrderedDict)
300 self._loop = loop
301 self._factory = functools.partial(ResponseHandler, loop=loop)
303 # start keep-alive connection cleanup task
304 self._cleanup_handle: asyncio.TimerHandle | None = None
306 # start cleanup closed transports task
307 self._cleanup_closed_handle: asyncio.TimerHandle | None = None
309 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED:
310 warnings.warn(
311 "enable_cleanup_closed ignored because "
312 "https://github.com/python/cpython/pull/118960 is fixed "
313 f"in Python version {sys.version_info}",
314 DeprecationWarning,
315 stacklevel=2,
316 )
317 enable_cleanup_closed = False
319 self._cleanup_closed_disabled = not enable_cleanup_closed
320 self._cleanup_closed_transports: list[asyncio.Transport | None] = []
322 self._placeholder_future: asyncio.Future[Exception | None] = (
323 loop.create_future()
324 )
325 self._placeholder_future.set_result(None)
326 self._cleanup_closed()
328 def __del__(self, _warnings: Any = warnings) -> None:
329 if self._closed:
330 return
331 if not self._conns:
332 return
334 conns = [repr(c) for c in self._conns.values()]
336 self._close_immediately()
338 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, source=self)
339 context = {
340 "connector": self,
341 "connections": conns,
342 "message": "Unclosed connector",
343 }
344 if self._source_traceback is not None:
345 context["source_traceback"] = self._source_traceback
346 self._loop.call_exception_handler(context)
348 async def __aenter__(self) -> "BaseConnector":
349 return self
351 async def __aexit__(
352 self,
353 exc_type: type[BaseException] | None = None,
354 exc_value: BaseException | None = None,
355 exc_traceback: TracebackType | None = None,
356 ) -> None:
357 await self.close()
359 @property
360 def force_close(self) -> bool:
361 """Ultimately close connection on releasing if True."""
362 return self._force_close
364 @property
365 def limit(self) -> int:
366 """The total number for simultaneous connections.
368 If limit is 0 the connector has no limit.
369 The default limit size is 100.
370 """
371 return self._limit
373 @property
374 def limit_per_host(self) -> int:
375 """The limit for simultaneous connections to the same endpoint.
377 Endpoints are the same if they are have equal
378 (host, port, is_ssl) triple.
379 """
380 return self._limit_per_host
382 def _cleanup(self) -> None:
383 """Cleanup unused transports."""
384 if self._cleanup_handle:
385 self._cleanup_handle.cancel()
386 # _cleanup_handle should be unset, otherwise _release() will not
387 # recreate it ever!
388 self._cleanup_handle = None
390 now = monotonic()
391 timeout = self._keepalive_timeout
393 if self._conns:
394 connections = defaultdict(deque)
395 deadline = now - timeout
396 for key, conns in self._conns.items():
397 alive: deque[tuple[ResponseHandler, float]] = deque()
398 for proto, use_time in conns:
399 if proto.is_connected() and use_time - deadline >= 0:
400 alive.append((proto, use_time))
401 continue
402 transport = proto.transport
403 proto.close()
404 if not self._cleanup_closed_disabled and key.is_ssl:
405 self._cleanup_closed_transports.append(transport)
407 if alive:
408 connections[key] = alive
410 self._conns = connections
412 if self._conns:
413 self._cleanup_handle = helpers.weakref_handle(
414 self,
415 "_cleanup",
416 timeout,
417 self._loop,
418 timeout_ceil_threshold=self._timeout_ceil_threshold,
419 )
421 def _cleanup_closed(self) -> None:
422 """Double confirmation for transport close.
424 Some broken ssl servers may leave socket open without proper close.
425 """
426 if self._cleanup_closed_handle:
427 self._cleanup_closed_handle.cancel()
429 for transport in self._cleanup_closed_transports:
430 if transport is not None:
431 transport.abort()
433 self._cleanup_closed_transports = []
435 if not self._cleanup_closed_disabled:
436 self._cleanup_closed_handle = helpers.weakref_handle(
437 self,
438 "_cleanup_closed",
439 self._cleanup_closed_period,
440 self._loop,
441 timeout_ceil_threshold=self._timeout_ceil_threshold,
442 )
444 async def close(self, *, abort_ssl: bool = False) -> None:
445 """Close all opened transports.
447 :param abort_ssl: If True, SSL connections will be aborted immediately
448 without performing the shutdown handshake. This provides
449 faster cleanup at the cost of less graceful disconnection.
450 """
451 waiters = self._close_immediately(abort_ssl=abort_ssl)
452 if waiters:
453 results = await asyncio.gather(*waiters, return_exceptions=True)
454 for res in results:
455 if isinstance(res, Exception):
456 err_msg = "Error while closing connector: " + repr(res)
457 client_logger.debug(err_msg)
459 def _close_immediately(self, *, abort_ssl: bool = False) -> list[Awaitable[object]]:
460 waiters: list[Awaitable[object]] = []
462 if self._closed:
463 return waiters
465 self._closed = True
467 try:
468 if self._loop.is_closed():
469 return waiters
471 # cancel cleanup task
472 if self._cleanup_handle:
473 self._cleanup_handle.cancel()
475 # cancel cleanup close task
476 if self._cleanup_closed_handle:
477 self._cleanup_closed_handle.cancel()
479 for data in self._conns.values():
480 for proto, _ in data:
481 if (
482 abort_ssl
483 and proto.transport
484 and proto.transport.get_extra_info("sslcontext") is not None
485 ):
486 proto.abort()
487 else:
488 proto.close()
489 if closed := proto.closed:
490 waiters.append(closed)
492 for proto in self._acquired:
493 if (
494 abort_ssl
495 and proto.transport
496 and proto.transport.get_extra_info("sslcontext") is not None
497 ):
498 proto.abort()
499 else:
500 proto.close()
501 if closed := proto.closed:
502 waiters.append(closed)
504 # TODO (A.Yushovskiy, 24-May-2019) collect transp. closing futures
505 for transport in self._cleanup_closed_transports:
506 if transport is not None:
507 transport.abort()
509 return waiters
511 finally:
512 self._conns.clear()
513 self._acquired.clear()
514 for keyed_waiters in self._waiters.values():
515 for keyed_waiter in keyed_waiters:
516 keyed_waiter.cancel()
517 self._waiters.clear()
518 self._cleanup_handle = None
519 self._cleanup_closed_transports.clear()
520 self._cleanup_closed_handle = None
522 @property
523 def closed(self) -> bool:
524 """Is connector closed.
526 A readonly property.
527 """
528 return self._closed
530 def _available_connections(self, key: "ConnectionKey") -> int:
531 """
532 Return number of available connections.
534 The limit, limit_per_host and the connection key are taken into account.
536 If it returns less than 1 means that there are no connections
537 available.
538 """
539 # check total available connections
540 # If there are no limits, this will always return 1
541 total_remain = 1
543 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0:
544 return total_remain
546 # check limit per host
547 if host_remain := self._limit_per_host:
548 if acquired := self._acquired_per_host.get(key):
549 host_remain -= len(acquired)
550 if total_remain > host_remain:
551 return host_remain
553 return total_remain
555 def _update_proxy_auth_header_and_build_proxy_req(
556 self, req: ClientRequest
557 ) -> ClientRequestBase:
558 """Set Proxy-Authorization header for non-SSL proxy requests and builds the proxy request for SSL proxy requests."""
559 url = req.proxy
560 assert url is not None
561 headers = req.proxy_headers or CIMultiDict[str]()
562 headers[hdrs.HOST] = req.headers[hdrs.HOST]
563 proxy_req = ClientRequestBase(
564 hdrs.METH_GET,
565 url,
566 headers=headers,
567 loop=self._loop,
568 ssl=req.ssl,
569 )
570 if not req.is_ssl():
571 # For non-SSL proxies the request goes directly through the proxy,
572 # so any Proxy-Authorization belongs on the request itself, not on
573 # the synthetic proxy request used for SSL CONNECT.
574 proxy_auth = proxy_req.headers.pop(hdrs.PROXY_AUTHORIZATION, None)
575 if proxy_auth is not None:
576 req.headers[hdrs.PROXY_AUTHORIZATION] = proxy_auth
577 return proxy_req
579 async def connect(
580 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
581 ) -> Connection:
582 """Get from pool or create new connection."""
583 key = req.connection_key
584 if (conn := await self._get(key, traces)) is not None:
585 # If we do not have to wait and we can get a connection from the pool
586 # we can avoid the timeout ceil logic and directly return the connection
587 if req.proxy:
588 self._update_proxy_auth_header_and_build_proxy_req(req)
589 return conn
591 async with ceil_timeout(timeout.connect, timeout.ceil_threshold):
592 if self._available_connections(key) <= 0:
593 await self._wait_for_available_connection(key, traces)
594 if (conn := await self._get(key, traces)) is not None:
595 if req.proxy:
596 self._update_proxy_auth_header_and_build_proxy_req(req)
597 return conn
599 placeholder = cast(
600 ResponseHandler, _TransportPlaceholder(self._placeholder_future)
601 )
602 self._acquired.add(placeholder)
603 if self._limit_per_host:
604 self._acquired_per_host[key].add(placeholder)
606 try:
607 # Traces are done inside the try block to ensure that the
608 # that the placeholder is still cleaned up if an exception
609 # is raised.
610 if traces:
611 for trace in traces:
612 await trace.send_connection_create_start()
613 proto = await self._create_connection(req, traces, timeout)
614 if traces:
615 for trace in traces:
616 await trace.send_connection_create_end()
617 except BaseException:
618 self._release_acquired(key, placeholder)
619 raise
620 else:
621 if self._closed:
622 proto.close()
623 raise ClientConnectionError("Connector is closed.")
625 # The connection was successfully created, drop the placeholder
626 # and add the real connection to the acquired set. There should
627 # be no awaits after the proto is added to the acquired set
628 # to ensure that the connection is not left in the acquired set
629 # on cancellation.
630 self._acquired.remove(placeholder)
631 self._acquired.add(proto)
632 if self._limit_per_host:
633 acquired_per_host = self._acquired_per_host[key]
634 acquired_per_host.remove(placeholder)
635 acquired_per_host.add(proto)
636 return Connection(self, key, proto, self._loop)
638 async def _wait_for_available_connection(
639 self, key: "ConnectionKey", traces: list["Trace"]
640 ) -> None:
641 """Wait for an available connection slot."""
642 # We loop here because there is a race between
643 # the connection limit check and the connection
644 # being acquired. If the connection is acquired
645 # between the check and the await statement, we
646 # need to loop again to check if the connection
647 # slot is still available.
648 attempts = 0
649 while True:
650 fut: asyncio.Future[None] = self._loop.create_future()
651 keyed_waiters = self._waiters[key]
652 keyed_waiters[fut] = None
653 if attempts:
654 # If we have waited before, we need to move the waiter
655 # to the front of the queue as otherwise we might get
656 # starved and hit the timeout.
657 keyed_waiters.move_to_end(fut, last=False)
659 try:
660 # Traces happen in the try block to ensure that the
661 # the waiter is still cleaned up if an exception is raised.
662 if traces:
663 for trace in traces:
664 await trace.send_connection_queued_start()
665 await fut
666 if traces:
667 for trace in traces:
668 await trace.send_connection_queued_end()
669 finally:
670 # pop the waiter from the queue if its still
671 # there and not already removed by _release_waiter
672 keyed_waiters.pop(fut, None)
673 if not self._waiters.get(key, True):
674 del self._waiters[key]
676 if self._available_connections(key) > 0:
677 break
678 attempts += 1
680 async def _get(
681 self, key: "ConnectionKey", traces: list["Trace"]
682 ) -> Connection | None:
683 """Get next reusable connection for the key or None.
685 The connection will be marked as acquired.
686 """
687 if (conns := self._conns.get(key)) is None:
688 return None
690 t1 = monotonic()
691 while conns:
692 proto, t0 = conns.popleft()
693 # We will we reuse the connection if its connected and
694 # the keepalive timeout has not been exceeded
695 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout:
696 if not conns:
697 # The very last connection was reclaimed: drop the key
698 del self._conns[key]
699 self._acquired.add(proto)
700 if self._limit_per_host:
701 self._acquired_per_host[key].add(proto)
702 if traces:
703 for trace in traces:
704 try:
705 await trace.send_connection_reuseconn()
706 except BaseException:
707 self._release_acquired(key, proto)
708 raise
709 return Connection(self, key, proto, self._loop)
711 # Connection cannot be reused, close it
712 transport = proto.transport
713 proto.close()
714 # only for SSL transports
715 if not self._cleanup_closed_disabled and key.is_ssl:
716 self._cleanup_closed_transports.append(transport)
718 # No more connections: drop the key
719 del self._conns[key]
720 return None
722 def _release_waiter(self) -> None:
723 """
724 Iterates over all waiters until one to be released is found.
726 The one to be released is not finished and
727 belongs to a host that has available connections.
728 """
729 if not self._waiters:
730 return
732 # Having the dict keys ordered this avoids to iterate
733 # at the same order at each call.
734 queues = list(self._waiters)
735 random.shuffle(queues)
737 for key in queues:
738 if self._available_connections(key) < 1:
739 continue
741 waiters = self._waiters[key]
742 while waiters:
743 waiter, _ = waiters.popitem(last=False)
744 if not waiter.done():
745 waiter.set_result(None)
746 return
748 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
749 """Release acquired connection."""
750 if self._closed:
751 # acquired connection is already released on connector closing
752 return
754 self._acquired.discard(proto)
755 if self._limit_per_host and (conns := self._acquired_per_host.get(key)):
756 conns.discard(proto)
757 if not conns:
758 del self._acquired_per_host[key]
759 self._release_waiter()
761 def _release(
762 self,
763 key: "ConnectionKey",
764 protocol: ResponseHandler,
765 *,
766 should_close: bool = False,
767 ) -> None:
768 if self._closed:
769 # acquired connection is already released on connector closing
770 return
772 self._release_acquired(key, protocol)
774 if self._force_close or should_close or protocol.should_close:
775 transport = protocol.transport
776 protocol.close()
777 if key.is_ssl and not self._cleanup_closed_disabled:
778 self._cleanup_closed_transports.append(transport)
779 return
781 self._conns[key].append((protocol, monotonic()))
783 if self._cleanup_handle is None:
784 self._cleanup_handle = helpers.weakref_handle(
785 self,
786 "_cleanup",
787 self._keepalive_timeout,
788 self._loop,
789 timeout_ceil_threshold=self._timeout_ceil_threshold,
790 )
792 async def _create_connection(
793 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
794 ) -> ResponseHandler:
795 raise NotImplementedError()
798class _DNSCacheTable:
799 def __init__(self, ttl: float | None = None, max_size: int = 1000) -> None:
800 self._addrs_rr: OrderedDict[
801 tuple[str, int], tuple[Iterator[ResolveResult], int]
802 ] = OrderedDict()
803 self._timestamps: dict[tuple[str, int], float] = {}
804 self._ttl = ttl
805 self._max_size = max_size
807 def __contains__(self, host: object) -> bool:
808 return host in self._addrs_rr
810 def add(self, key: tuple[str, int], addrs: list[ResolveResult]) -> None:
811 if key in self._addrs_rr:
812 self._addrs_rr.move_to_end(key)
814 self._addrs_rr[key] = (cycle(addrs), len(addrs))
816 if self._ttl is not None:
817 self._timestamps[key] = monotonic()
819 if len(self._addrs_rr) > self._max_size:
820 oldest_key, _ = self._addrs_rr.popitem(last=False)
821 self._timestamps.pop(oldest_key, None)
823 def remove(self, key: tuple[str, int]) -> None:
824 self._addrs_rr.pop(key, None)
825 self._timestamps.pop(key, None)
827 def clear(self) -> None:
828 self._addrs_rr.clear()
829 self._timestamps.clear()
831 def next_addrs(self, key: tuple[str, int]) -> list[ResolveResult]:
832 loop, length = self._addrs_rr[key]
833 addrs = list(islice(loop, length))
834 # Consume one more element to shift internal state of `cycle`
835 next(loop)
836 self._addrs_rr.move_to_end(key)
837 return addrs
839 def expired(self, key: tuple[str, int]) -> bool:
840 if self._ttl is None:
841 return False
843 return self._timestamps[key] + self._ttl < monotonic()
846def _make_ssl_context(verified: bool) -> SSLContext:
847 """Create SSL context.
849 This method is not async-friendly and should be called from a thread
850 because it will load certificates from disk and do other blocking I/O.
851 """
852 if ssl is None:
853 # No ssl support
854 return None # type: ignore[unreachable]
855 if verified:
856 sslcontext = ssl.create_default_context()
857 else:
858 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
859 sslcontext.options |= ssl.OP_NO_SSLv2
860 sslcontext.options |= ssl.OP_NO_SSLv3
861 sslcontext.check_hostname = False
862 sslcontext.verify_mode = ssl.CERT_NONE
863 sslcontext.options |= ssl.OP_NO_COMPRESSION
864 sslcontext.set_default_verify_paths()
865 sslcontext.set_alpn_protocols(("http/1.1",))
866 return sslcontext
869# The default SSLContext objects are created at import time
870# since they do blocking I/O to load certificates from disk,
871# and imports should always be done before the event loop starts
872# or in a thread.
873_SSL_CONTEXT_VERIFIED = _make_ssl_context(True)
874_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False)
877class TCPConnector(BaseConnector):
878 """TCP connector.
880 verify_ssl - Set to True to check ssl certifications.
881 fingerprint - Pass the binary sha256
882 digest of the expected certificate in DER format to verify
883 that the certificate the server presents matches. See also
884 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning
885 resolver - Enable DNS lookups and use this
886 resolver
887 use_dns_cache - Use memory cache for DNS lookups.
888 ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
889 family - socket address family
890 local_addr - local tuple of (host, port) to bind socket to
892 keepalive_timeout - (optional) Keep-alive timeout.
893 force_close - Set to True to force close and do reconnect
894 after each request (and between redirects).
895 limit - The total number of simultaneous connections.
896 limit_per_host - Number of simultaneous connections to one host.
897 enable_cleanup_closed - Enables clean-up closed ssl transports.
898 Disabled by default.
899 happy_eyeballs_delay - This is the “Connection Attempt Delay”
900 as defined in RFC 8305. To disable
901 the happy eyeballs algorithm, set to None.
902 interleave - “First Address Family Count” as defined in RFC 8305
903 loop - Optional event loop.
904 socket_factory - A SocketFactoryType function that, if supplied,
905 will be used to create sockets given an
906 AddrInfoType.
907 ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0.
908 Grace period for SSL shutdown handshake on TLS
909 connections. Default is 0 seconds (immediate abort).
910 This parameter allowed for a clean SSL shutdown by
911 notifying the remote peer of connection closure,
912 while avoiding excessive delays during connector cleanup.
913 Note: Only takes effect on Python 3.11+.
914 """
916 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"})
918 def __init__(
919 self,
920 *,
921 use_dns_cache: bool = True,
922 ttl_dns_cache: int | None = 10,
923 dns_cache_max_size: int = 1000,
924 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC,
925 ssl: bool | Fingerprint | SSLContext = True,
926 local_addr: tuple[str, int] | None = None,
927 resolver: AbstractResolver | None = None,
928 keepalive_timeout: None | float | _SENTINEL = sentinel,
929 force_close: bool = False,
930 limit: int = 100,
931 limit_per_host: int = 0,
932 enable_cleanup_closed: bool = False,
933 timeout_ceil_threshold: float = 5,
934 happy_eyeballs_delay: float | None = 0.25,
935 interleave: int | None = None,
936 socket_factory: SocketFactoryType | None = None,
937 ssl_shutdown_timeout: _SENTINEL | None | float = sentinel,
938 ):
939 super().__init__(
940 keepalive_timeout=keepalive_timeout,
941 force_close=force_close,
942 limit=limit,
943 limit_per_host=limit_per_host,
944 enable_cleanup_closed=enable_cleanup_closed,
945 timeout_ceil_threshold=timeout_ceil_threshold,
946 )
948 if not isinstance(ssl, SSL_ALLOWED_TYPES):
949 raise TypeError(
950 "ssl should be SSLContext, Fingerprint, or bool, "
951 f"got {ssl!r} instead."
952 )
953 self._ssl = ssl
955 self._resolver: AbstractResolver
956 if resolver is None:
957 self._resolver = DefaultResolver()
958 self._resolver_owner = True
959 else:
960 self._resolver = resolver
961 self._resolver_owner = False
963 self._use_dns_cache = use_dns_cache
964 self._cached_hosts = _DNSCacheTable(
965 ttl=ttl_dns_cache, max_size=dns_cache_max_size
966 )
967 self._throttle_dns_futures: dict[tuple[str, int], set[asyncio.Future[None]]] = (
968 {}
969 )
970 self._family = family
971 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr)
972 self._happy_eyeballs_delay = happy_eyeballs_delay
973 self._interleave = interleave
974 self._resolve_host_tasks: set[asyncio.Task[list[ResolveResult]]] = set()
975 self._socket_factory = socket_factory
976 self._ssl_shutdown_timeout: float | None
978 # Handle ssl_shutdown_timeout with warning for Python < 3.11
979 if ssl_shutdown_timeout is sentinel:
980 self._ssl_shutdown_timeout = 0
981 else:
982 # Deprecation warning for ssl_shutdown_timeout parameter
983 warnings.warn(
984 "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0",
985 DeprecationWarning,
986 stacklevel=2,
987 )
988 if (
989 sys.version_info < (3, 11)
990 and ssl_shutdown_timeout is not None
991 and ssl_shutdown_timeout != 0
992 ):
993 warnings.warn(
994 f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; "
995 "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.",
996 RuntimeWarning,
997 stacklevel=2,
998 )
999 self._ssl_shutdown_timeout = ssl_shutdown_timeout
1001 async def close(self, *, abort_ssl: bool = False) -> None:
1002 """Close all opened transports.
1004 :param abort_ssl: If True, SSL connections will be aborted immediately
1005 without performing the shutdown handshake. If False (default),
1006 the behavior is determined by ssl_shutdown_timeout:
1007 - If ssl_shutdown_timeout=0: connections are aborted
1008 - If ssl_shutdown_timeout>0: graceful shutdown is performed
1009 """
1010 # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default
1011 await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0)
1012 if self._resolver_owner:
1013 await self._resolver.close()
1015 def _close_immediately(self, *, abort_ssl: bool = False) -> list[Awaitable[object]]:
1016 for fut in chain.from_iterable(self._throttle_dns_futures.values()):
1017 fut.cancel()
1019 waiters = super()._close_immediately(abort_ssl=abort_ssl)
1021 for t in self._resolve_host_tasks:
1022 t.cancel()
1023 waiters.append(t)
1025 return waiters
1027 @property
1028 def family(self) -> int:
1029 """Socket family like AF_INET."""
1030 return self._family
1032 @property
1033 def use_dns_cache(self) -> bool:
1034 """True if local DNS caching is enabled."""
1035 return self._use_dns_cache
1037 def clear_dns_cache(self, host: str | None = None, port: int | None = None) -> None:
1038 """Remove specified host/port or clear all dns local cache."""
1039 if host is not None and port is not None:
1040 self._cached_hosts.remove((host, port))
1041 elif host is not None or port is not None:
1042 raise ValueError("either both host and port or none of them are allowed")
1043 else:
1044 self._cached_hosts.clear()
1046 async def _resolve_host(
1047 self, host: str, port: int, traces: Sequence["Trace"] | None = None
1048 ) -> list[ResolveResult]:
1049 """Resolve host and return list of addresses."""
1050 if is_ip_address(host):
1051 # Reject legacy numeric IPv4 forms (e.g. 2130706433, 127.1) that
1052 # socket would map onto an address, slipping past a connector-level
1053 # policy that only sees the raw host.
1054 if ":" not in host and not is_canonical_ipv4_address(host):
1055 raise InvalidUrlClientError(host, "is not a canonical IPv4 address")
1056 return [
1057 {
1058 "hostname": host,
1059 "host": host,
1060 "port": port,
1061 "family": self._family,
1062 "proto": 0,
1063 "flags": 0,
1064 }
1065 ]
1067 if not self._use_dns_cache:
1068 if traces:
1069 for trace in traces:
1070 await trace.send_dns_resolvehost_start(host)
1072 if self._closed:
1073 raise ClientConnectionError("Connector is closed")
1075 res = await self._resolver.resolve(host, port, family=self._family)
1077 if traces:
1078 for trace in traces:
1079 await trace.send_dns_resolvehost_end(host)
1081 return res
1083 key = (host, port)
1084 if key in self._cached_hosts and not self._cached_hosts.expired(key):
1085 # get result early, before any await (#4014)
1086 result = self._cached_hosts.next_addrs(key)
1088 if traces:
1089 for trace in traces:
1090 await trace.send_dns_cache_hit(host)
1091 return result
1093 futures: set[asyncio.Future[None]]
1094 #
1095 # If multiple connectors are resolving the same host, we wait
1096 # for the first one to resolve and then use the result for all of them.
1097 # We use a throttle to ensure that we only resolve the host once
1098 # and then use the result for all the waiters.
1099 #
1100 if key in self._throttle_dns_futures:
1101 # get futures early, before any await (#4014)
1102 futures = self._throttle_dns_futures[key]
1103 future: asyncio.Future[None] = self._loop.create_future()
1104 futures.add(future)
1105 if traces:
1106 for trace in traces:
1107 await trace.send_dns_cache_hit(host)
1108 try:
1109 await future
1110 finally:
1111 futures.discard(future)
1112 return self._cached_hosts.next_addrs(key)
1114 # update dict early, before any await (#4014)
1115 self._throttle_dns_futures[key] = futures = set()
1116 # In this case we need to create a task to ensure that we can shield
1117 # the task from cancellation as cancelling this lookup should not cancel
1118 # the underlying lookup or else the cancel event will get broadcast to
1119 # all the waiters across all connections.
1120 #
1121 coro = self._resolve_host_with_throttle(key, host, port, futures, traces)
1122 loop = asyncio.get_running_loop()
1123 if sys.version_info >= (3, 12):
1124 # Optimization for Python 3.12, try to send immediately
1125 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True)
1126 else:
1127 resolved_host_task = loop.create_task(coro)
1129 if not resolved_host_task.done():
1130 self._resolve_host_tasks.add(resolved_host_task)
1131 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard)
1133 try:
1134 return await asyncio.shield(resolved_host_task)
1135 except asyncio.CancelledError:
1137 def drop_exception(fut: "asyncio.Future[list[ResolveResult]]") -> None:
1138 with suppress(Exception, asyncio.CancelledError):
1139 fut.result()
1141 resolved_host_task.add_done_callback(drop_exception)
1142 raise
1144 async def _resolve_host_with_throttle(
1145 self,
1146 key: tuple[str, int],
1147 host: str,
1148 port: int,
1149 futures: set[asyncio.Future[None]],
1150 traces: Sequence["Trace"] | None,
1151 ) -> list[ResolveResult]:
1152 """Resolve host and set result for all waiters.
1154 This method must be run in a task and shielded from cancellation
1155 to avoid cancelling the underlying lookup.
1156 """
1157 try:
1158 if traces:
1159 for trace in traces:
1160 await trace.send_dns_cache_miss(host)
1162 for trace in traces:
1163 await trace.send_dns_resolvehost_start(host)
1165 addrs = await self._resolver.resolve(host, port, family=self._family)
1166 if traces:
1167 for trace in traces:
1168 await trace.send_dns_resolvehost_end(host)
1170 self._cached_hosts.add(key, addrs)
1171 for fut in futures:
1172 set_result(fut, None)
1173 except BaseException as e:
1174 # any DNS exception is set for the waiters to raise the same exception.
1175 # This coro is always run in task that is shielded from cancellation so
1176 # we should never be propagating cancellation here.
1177 for fut in futures:
1178 set_exception(fut, e)
1179 raise
1180 finally:
1181 self._throttle_dns_futures.pop(key)
1183 return self._cached_hosts.next_addrs(key)
1185 async def _create_connection(
1186 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
1187 ) -> ResponseHandler:
1188 """Create connection.
1190 Has same keyword arguments as BaseEventLoop.create_connection.
1191 """
1192 if req.proxy:
1193 _, proto = await self._create_proxy_connection(req, traces, timeout)
1194 else:
1195 _, proto = await self._create_direct_connection(req, traces, timeout)
1197 return proto
1199 def _get_ssl_context(self, req: ClientRequestBase) -> SSLContext | None:
1200 """Logic to get the correct SSL context
1202 0. if req.ssl is false, return None
1204 1. if ssl_context is specified in req, use it
1205 2. if _ssl_context is specified in self, use it
1206 3. otherwise:
1207 1. if verify_ssl is not specified in req, use self.ssl_context
1208 (will generate a default context according to self.verify_ssl)
1209 2. if verify_ssl is True in req, generate a default SSL context
1210 3. if verify_ssl is False in req, generate a SSL context that
1211 won't verify
1212 """
1213 if not req.is_ssl():
1214 return None
1216 if ssl is None: # pragma: no cover
1217 raise RuntimeError("SSL is not supported.")
1218 sslcontext = req.ssl
1219 if isinstance(sslcontext, ssl.SSLContext):
1220 return sslcontext
1221 if sslcontext is not True:
1222 # not verified or fingerprinted
1223 return _SSL_CONTEXT_UNVERIFIED
1224 sslcontext = self._ssl
1225 if isinstance(sslcontext, ssl.SSLContext):
1226 return sslcontext
1227 if sslcontext is not True:
1228 # not verified or fingerprinted
1229 return _SSL_CONTEXT_UNVERIFIED
1230 return _SSL_CONTEXT_VERIFIED
1232 def _get_fingerprint(self, req: ClientRequestBase) -> "Fingerprint | None":
1233 ret = req.ssl
1234 if isinstance(ret, Fingerprint):
1235 return ret
1236 ret = self._ssl
1237 if isinstance(ret, Fingerprint):
1238 return ret
1239 return None
1241 async def _wrap_create_connection(
1242 self,
1243 *args: Any,
1244 addr_infos: list[AddrInfoType],
1245 req: ClientRequestBase,
1246 timeout: "ClientTimeout",
1247 client_error: type[Exception] = ClientConnectorError,
1248 **kwargs: Any,
1249 ) -> tuple[asyncio.Transport, ResponseHandler]:
1250 try:
1251 async with ceil_timeout(
1252 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1253 ):
1254 sock = await aiohappyeyeballs.start_connection(
1255 addr_infos=addr_infos,
1256 local_addr_infos=self._local_addr_infos,
1257 happy_eyeballs_delay=self._happy_eyeballs_delay,
1258 interleave=self._interleave,
1259 loop=self._loop,
1260 socket_factory=self._socket_factory,
1261 )
1262 # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used
1263 if (
1264 kwargs.get("ssl")
1265 and self._ssl_shutdown_timeout
1266 and sys.version_info >= (3, 11)
1267 ):
1268 kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout
1269 return await self._loop.create_connection(*args, **kwargs, sock=sock)
1270 except cert_errors as exc:
1271 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1272 except ssl_errors as exc:
1273 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1274 except OSError as exc:
1275 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1276 raise
1277 raise client_error(req.connection_key, exc) from exc
1279 def _warn_about_tls_in_tls(
1280 self,
1281 underlying_transport: asyncio.Transport,
1282 req: ClientRequest,
1283 ) -> None:
1284 """Issue a warning if the requested URL has HTTPS scheme."""
1285 if req.url.scheme != "https":
1286 return
1288 # TLS-in-TLS only applies when the proxy itself is HTTPS.
1289 # When the proxy is HTTP, start_tls upgrades a plain TCP connection,
1290 # which is standard TLS and works on all event loops and Python versions.
1291 if req.proxy is None or req.proxy.scheme != "https":
1292 return
1294 # Check if uvloop is being used, which supports TLS in TLS,
1295 # otherwise assume that asyncio's native transport is being used.
1296 if type(underlying_transport).__module__.startswith("uvloop"):
1297 return
1299 # Support in asyncio was added in Python 3.11 (bpo-44011)
1300 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr(
1301 underlying_transport,
1302 "_start_tls_compatible",
1303 False,
1304 )
1306 if asyncio_supports_tls_in_tls:
1307 return
1309 warnings.warn(
1310 "An HTTPS request is being sent through an HTTPS proxy. "
1311 "This support for TLS in TLS is known to be disabled "
1312 "in the stdlib asyncio. This is why you'll probably see "
1313 "an error in the log below.\n\n"
1314 "It is possible to enable it via monkeypatching. "
1315 "For more details, see:\n"
1316 "* https://bugs.python.org/issue37179\n"
1317 "* https://github.com/python/cpython/pull/28073\n\n"
1318 "You can temporarily patch this as follows:\n"
1319 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
1320 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1321 RuntimeWarning,
1322 source=self,
1323 # Why `4`? At least 3 of the calls in the stack originate
1324 # from the methods in this class.
1325 stacklevel=3,
1326 )
1328 async def _start_tls_connection(
1329 self,
1330 underlying_transport: asyncio.Transport,
1331 req: ClientRequest,
1332 timeout: "ClientTimeout",
1333 client_error: type[Exception] = ClientConnectorError,
1334 ) -> tuple[asyncio.BaseTransport, ResponseHandler]:
1335 """Wrap the raw TCP transport with TLS."""
1336 tls_proto = self._factory() # Create a brand new proto for TLS
1337 sslcontext = self._get_ssl_context(req)
1338 if TYPE_CHECKING:
1339 # _start_tls_connection is unreachable in the current code path
1340 # if sslcontext is None.
1341 assert sslcontext is not None
1343 try:
1344 async with ceil_timeout(
1345 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1346 ):
1347 try:
1348 # ssl_shutdown_timeout is only available in Python 3.11+
1349 if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout:
1350 tls_transport = await self._loop.start_tls(
1351 underlying_transport,
1352 tls_proto,
1353 sslcontext,
1354 server_hostname=req.server_hostname or req.url.raw_host,
1355 ssl_handshake_timeout=timeout.total,
1356 ssl_shutdown_timeout=self._ssl_shutdown_timeout,
1357 )
1358 else:
1359 tls_transport = await self._loop.start_tls(
1360 underlying_transport,
1361 tls_proto,
1362 sslcontext,
1363 server_hostname=req.server_hostname or req.url.raw_host,
1364 ssl_handshake_timeout=timeout.total,
1365 )
1366 except BaseException:
1367 # We need to close the underlying transport since
1368 # `start_tls()` probably failed before it had a
1369 # chance to do this:
1370 if self._ssl_shutdown_timeout == 0:
1371 underlying_transport.abort()
1372 else:
1373 underlying_transport.close()
1374 raise
1375 if isinstance(tls_transport, asyncio.Transport):
1376 fingerprint = self._get_fingerprint(req)
1377 if fingerprint:
1378 try:
1379 fingerprint.check(tls_transport)
1380 except ServerFingerprintMismatch:
1381 tls_transport.close()
1382 if not self._cleanup_closed_disabled:
1383 self._cleanup_closed_transports.append(tls_transport)
1384 raise
1385 except cert_errors as exc:
1386 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1387 except ssl_errors as exc:
1388 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1389 except OSError as exc:
1390 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1391 raise
1392 raise client_error(req.connection_key, exc) from exc
1393 except TypeError as type_err:
1394 # Example cause looks like this:
1395 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
1396 # object at 0x7f760615e460> is not supported by start_tls()
1398 raise ClientConnectionError(
1399 "Cannot initialize a TLS-in-TLS connection to host "
1400 f"{req.url.host!s}:{req.url.port:d} through an underlying connection "
1401 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
1402 f"[{type_err!s}]"
1403 ) from type_err
1404 else:
1405 if tls_transport is None:
1406 msg = "Failed to start TLS (possibly caused by closing transport)"
1407 raise client_error(req.connection_key, OSError(msg))
1408 tls_proto.connection_made(
1409 tls_transport
1410 ) # Kick the state machine of the new TLS protocol
1412 return tls_transport, tls_proto
1414 def _convert_hosts_to_addr_infos(
1415 self, hosts: list[ResolveResult]
1416 ) -> list[AddrInfoType]:
1417 """Converts the list of hosts to a list of addr_infos.
1419 The list of hosts is the result of a DNS lookup. The list of
1420 addr_infos is the result of a call to `socket.getaddrinfo()`.
1421 """
1422 addr_infos: list[AddrInfoType] = []
1423 for hinfo in hosts:
1424 host = hinfo["host"]
1425 is_ipv6 = ":" in host
1426 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET
1427 if self._family and self._family != family:
1428 continue
1429 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"])
1430 addr_infos.append(
1431 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)
1432 )
1433 return addr_infos
1435 async def _create_direct_connection(
1436 self,
1437 req: ClientRequestBase,
1438 traces: list["Trace"],
1439 timeout: "ClientTimeout",
1440 *,
1441 client_error: type[Exception] = ClientConnectorError,
1442 ) -> tuple[asyncio.Transport, ResponseHandler]:
1443 sslcontext = self._get_ssl_context(req)
1444 fingerprint = self._get_fingerprint(req)
1446 host = req.url.raw_host
1447 assert host is not None
1448 # Replace multiple trailing dots with a single one.
1449 # A trailing dot is only present for fully-qualified domain names.
1450 # See https://github.com/aio-libs/aiohttp/pull/7364.
1451 if host.endswith(".."):
1452 host = host.rstrip(".") + "."
1453 port = req.url.port
1454 assert port is not None
1455 try:
1456 # Cancelling this lookup should not cancel the underlying lookup
1457 # or else the cancel event will get broadcast to all the waiters
1458 # across all connections.
1459 hosts = await self._resolve_host(host, port, traces=traces)
1460 except OSError as exc:
1461 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1462 raise
1463 # in case of proxy it is not ClientProxyConnectionError
1464 # it is problem of resolving proxy ip itself
1465 raise ClientConnectorDNSError(req.connection_key, exc) from exc
1467 last_exc: Exception | None = None
1468 addr_infos = self._convert_hosts_to_addr_infos(hosts)
1469 while addr_infos:
1470 # Strip trailing dots, certificates contain FQDN without dots.
1471 # See https://github.com/aio-libs/aiohttp/issues/3636
1472 server_hostname = (
1473 (req.server_hostname or host).rstrip(".") if sslcontext else None
1474 )
1476 try:
1477 transp, proto = await self._wrap_create_connection(
1478 self._factory,
1479 timeout=timeout,
1480 ssl=sslcontext,
1481 addr_infos=addr_infos,
1482 server_hostname=server_hostname,
1483 req=req,
1484 client_error=client_error,
1485 )
1486 except (ClientConnectorError, asyncio.TimeoutError) as exc:
1487 last_exc = exc
1488 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave)
1489 continue
1491 if req.is_ssl() and fingerprint:
1492 try:
1493 fingerprint.check(transp)
1494 except ServerFingerprintMismatch as exc:
1495 transp.close()
1496 if not self._cleanup_closed_disabled:
1497 self._cleanup_closed_transports.append(transp)
1498 last_exc = exc
1499 # Remove the bad peer from the list of addr_infos
1500 sock: socket.socket = transp.get_extra_info("socket")
1501 bad_peer = sock.getpeername()
1502 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer)
1503 continue
1505 return transp, proto
1506 assert last_exc is not None
1507 raise last_exc
1509 async def _create_proxy_connection(
1510 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
1511 ) -> tuple[asyncio.BaseTransport, ResponseHandler]:
1512 proxy_req = self._update_proxy_auth_header_and_build_proxy_req(req)
1514 # create connection to proxy server
1515 transport, proto = await self._create_direct_connection(
1516 proxy_req, [], timeout, client_error=ClientProxyConnectionError
1517 )
1519 if req.is_ssl():
1520 self._warn_about_tls_in_tls(transport, req)
1522 # For HTTPS requests over HTTP proxy
1523 # we must notify proxy to tunnel connection
1524 # so we send CONNECT command:
1525 # CONNECT www.python.org:443 HTTP/1.1
1526 # Host: www.python.org
1527 #
1528 # next we must do TLS handshake and so on
1529 # to do this we must wrap raw socket into secure one
1530 # asyncio handles this perfectly
1531 proxy_req.method = hdrs.METH_CONNECT
1532 proxy_req.url = req.url
1533 key = req.connection_key._replace(proxy=None, proxy_headers_hash=None)
1534 conn = _ConnectTunnelConnection(self, key, proto, self._loop)
1535 proxy_resp = await proxy_req._send(conn)
1536 try:
1537 protocol = conn._protocol
1538 assert protocol is not None
1540 # read_until_eof=True will ensure the connection isn't closed
1541 # once the response is received and processed allowing
1542 # START_TLS to work on the connection below.
1543 protocol.set_response_params(
1544 read_until_eof=True,
1545 timeout_ceil_threshold=self._timeout_ceil_threshold,
1546 )
1547 resp = await proxy_resp.start(conn)
1548 except BaseException:
1549 proxy_resp.close()
1550 conn.close()
1551 raise
1552 else:
1553 conn._protocol = None
1554 try:
1555 if resp.status != 200:
1556 message = resp.reason
1557 if message is None:
1558 message = HTTPStatus(resp.status).phrase
1559 raise ClientHttpProxyError(
1560 proxy_resp.request_info,
1561 resp.history,
1562 status=resp.status,
1563 message=message,
1564 headers=resp.headers,
1565 )
1566 except BaseException:
1567 # It shouldn't be closed in `finally` because it's fed to
1568 # `loop.start_tls()` and the docs say not to touch it after
1569 # passing there.
1570 transport.close()
1571 raise
1573 return await self._start_tls_connection(
1574 # Access the old transport for the last time before it's
1575 # closed and forgotten forever:
1576 transport,
1577 req=req,
1578 timeout=timeout,
1579 )
1580 finally:
1581 proxy_resp.close()
1583 return transport, proto
1586class UnixConnector(BaseConnector):
1587 """Unix socket connector.
1589 path - Unix socket path.
1590 keepalive_timeout - (optional) Keep-alive timeout.
1591 force_close - Set to True to force close and do reconnect
1592 after each request (and between redirects).
1593 limit - The total number of simultaneous connections.
1594 limit_per_host - Number of simultaneous connections to one host.
1595 loop - Optional event loop.
1596 """
1598 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"})
1600 def __init__(
1601 self,
1602 path: str,
1603 force_close: bool = False,
1604 keepalive_timeout: _SENTINEL | float | None = sentinel,
1605 limit: int = 100,
1606 limit_per_host: int = 0,
1607 ) -> None:
1608 super().__init__(
1609 force_close=force_close,
1610 keepalive_timeout=keepalive_timeout,
1611 limit=limit,
1612 limit_per_host=limit_per_host,
1613 )
1614 self._path = path
1616 @property
1617 def path(self) -> str:
1618 """Path to unix socket."""
1619 return self._path
1621 async def _create_connection(
1622 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
1623 ) -> ResponseHandler:
1624 try:
1625 async with ceil_timeout(
1626 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1627 ):
1628 _, proto = await self._loop.create_unix_connection(
1629 self._factory, self._path
1630 )
1631 except OSError as exc:
1632 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1633 raise
1634 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
1636 return proto
1639class NamedPipeConnector(BaseConnector):
1640 """Named pipe connector.
1642 Only supported by the proactor event loop.
1643 See also: https://docs.python.org/3/library/asyncio-eventloop.html
1645 path - Windows named pipe path.
1646 keepalive_timeout - (optional) Keep-alive timeout.
1647 force_close - Set to True to force close and do reconnect
1648 after each request (and between redirects).
1649 limit - The total number of simultaneous connections.
1650 limit_per_host - Number of simultaneous connections to one host.
1651 loop - Optional event loop.
1652 """
1654 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"})
1656 def __init__(
1657 self,
1658 path: str,
1659 force_close: bool = False,
1660 keepalive_timeout: _SENTINEL | float | None = sentinel,
1661 limit: int = 100,
1662 limit_per_host: int = 0,
1663 ) -> None:
1664 super().__init__(
1665 force_close=force_close,
1666 keepalive_timeout=keepalive_timeout,
1667 limit=limit,
1668 limit_per_host=limit_per_host,
1669 )
1670 if not isinstance(
1671 self._loop,
1672 asyncio.ProactorEventLoop, # type: ignore[attr-defined]
1673 ):
1674 raise RuntimeError(
1675 "Named Pipes only available in proactor loop under windows"
1676 )
1677 self._path = path
1679 @property
1680 def path(self) -> str:
1681 """Path to the named pipe."""
1682 return self._path
1684 async def _create_connection(
1685 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
1686 ) -> ResponseHandler:
1687 try:
1688 async with ceil_timeout(
1689 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1690 ):
1691 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined]
1692 self._factory, self._path
1693 )
1694 # the drain is required so that the connection_made is called
1695 # and transport is set otherwise it is not set before the
1696 # `assert conn.transport is not None`
1697 # in client.py's _request method
1698 await asyncio.sleep(0)
1699 # other option is to manually set transport like
1700 # `proto.transport = trans`
1701 except OSError as exc:
1702 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1703 raise
1704 raise ClientConnectorError(req.connection_key, exc) from exc
1706 return cast(ResponseHandler, proto)