Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/connector.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import functools
3import random
4import socket
5import sys
6import traceback
7import warnings
8from collections import OrderedDict, defaultdict, deque
9from collections.abc import Awaitable, Callable, Iterator, Sequence
10from contextlib import suppress
11from http import HTTPStatus
12from itertools import chain, cycle, islice
13from time import monotonic
14from types import TracebackType
15from typing import TYPE_CHECKING, Any, Literal, cast
17import aiohappyeyeballs
18from aiohappyeyeballs import AddrInfoType, SocketFactoryType
19from multidict import CIMultiDict
21from . import hdrs, helpers
22from .abc import AbstractResolver, ResolveResult
23from .client_exceptions import (
24 ClientConnectionError,
25 ClientConnectorCertificateError,
26 ClientConnectorDNSError,
27 ClientConnectorError,
28 ClientConnectorSSLError,
29 ClientHttpProxyError,
30 ClientProxyConnectionError,
31 ServerFingerprintMismatch,
32 UnixClientConnectorError,
33 cert_errors,
34 ssl_errors,
35)
36from .client_proto import ResponseHandler
37from .client_reqrep import (
38 SSL_ALLOWED_TYPES,
39 ClientRequest,
40 ClientRequestBase,
41 Fingerprint,
42)
43from .helpers import (
44 _SENTINEL,
45 ceil_timeout,
46 is_ip_address,
47 sentinel,
48 set_exception,
49 set_result,
50)
51from .log import client_logger
52from .resolver import DefaultResolver
54if sys.version_info >= (3, 12):
55 from collections.abc import Buffer
56else:
57 Buffer = "bytes | bytearray | memoryview[int] | memoryview[bytes]"
59try:
60 import ssl
62 SSLContext = ssl.SSLContext
63except ImportError: # pragma: no cover
64 ssl = None # type: ignore[assignment]
65 SSLContext = object # type: ignore[misc,assignment]
67EMPTY_SCHEMA_SET = frozenset({""})
68HTTP_SCHEMA_SET = frozenset({"http", "https"})
69WS_SCHEMA_SET = frozenset({"ws", "wss"})
71HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET
72HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET
74NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < (
75 3,
76 13,
77 1,
78) or sys.version_info < (3, 12, 8)
79# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960
80# which first appeared in Python 3.12.8 and 3.13.1
83__all__ = (
84 "BaseConnector",
85 "TCPConnector",
86 "UnixConnector",
87 "NamedPipeConnector",
88 "AddrInfoType",
89 "SocketFactoryType",
90)
93if TYPE_CHECKING:
94 from .client import ClientTimeout
95 from .client_reqrep import ConnectionKey
96 from .tracing import Trace
99class Connection:
100 """Represents a single connection."""
102 __slots__ = (
103 "_key",
104 "_connector",
105 "_loop",
106 "_protocol",
107 "_callbacks",
108 "_source_traceback",
109 )
111 def __init__(
112 self,
113 connector: "BaseConnector",
114 key: "ConnectionKey",
115 protocol: ResponseHandler,
116 loop: asyncio.AbstractEventLoop,
117 ) -> None:
118 self._key = key
119 self._connector = connector
120 self._loop = loop
121 self._protocol: ResponseHandler | None = protocol
122 self._callbacks: list[Callable[[], None]] = []
123 self._source_traceback = (
124 traceback.extract_stack(sys._getframe(1)) if loop.get_debug() else None
125 )
127 def __repr__(self) -> str:
128 return f"Connection<{self._key}>"
130 def __del__(self, _warnings: Any = warnings) -> None:
131 if self._protocol is not None:
132 _warnings.warn(
133 f"Unclosed connection {self!r}", ResourceWarning, source=self
134 )
135 if self._loop.is_closed():
136 return
138 self._connector._release(self._key, self._protocol, should_close=True)
140 context = {"client_connection": self, "message": "Unclosed connection"}
141 if self._source_traceback is not None:
142 context["source_traceback"] = self._source_traceback
143 self._loop.call_exception_handler(context)
145 def __bool__(self) -> Literal[True]:
146 """Force subclasses to not be falsy, to make checks simpler."""
147 return True
149 @property
150 def transport(self) -> asyncio.Transport | None:
151 if self._protocol is None:
152 return None
153 return self._protocol.transport
155 @property
156 def protocol(self) -> ResponseHandler | None:
157 return self._protocol
159 def add_callback(self, callback: Callable[[], None]) -> None:
160 if callback is not None:
161 self._callbacks.append(callback)
163 def _notify_release(self) -> None:
164 callbacks, self._callbacks = self._callbacks[:], []
166 for cb in callbacks:
167 with suppress(Exception):
168 cb()
170 def close(self) -> None:
171 self._notify_release()
173 if self._protocol is not None:
174 self._connector._release(self._key, self._protocol, should_close=True)
175 self._protocol = None
177 def release(self) -> None:
178 self._notify_release()
180 if self._protocol is not None:
181 self._connector._release(self._key, self._protocol)
182 self._protocol = None
184 @property
185 def closed(self) -> bool:
186 return self._protocol is None or not self._protocol.is_connected()
189class _ConnectTunnelConnection(Connection):
190 """Special connection wrapper for CONNECT tunnels that must never be pooled.
192 This connection wraps the proxy connection that will be upgraded with TLS.
193 It must never be released to the pool because:
194 1. Its 'closed' future will never complete, causing session.close() to hang
195 2. It represents an intermediate state, not a reusable connection
196 3. The real connection (with TLS) will be created separately
197 """
199 def release(self) -> None:
200 """Do nothing - don't pool or close the connection.
202 These connections are an intermediate state during the CONNECT tunnel
203 setup and will be cleaned up naturally after the TLS upgrade. If they
204 were to be pooled, they would never be properly closed, causing
205 session.close() to wait forever for their 'closed' future.
206 """
209class _TransportPlaceholder:
210 """placeholder for BaseConnector.connect function"""
212 __slots__ = ("closed", "transport")
214 def __init__(self, closed_future: asyncio.Future[Exception | None]) -> None:
215 """Initialize a placeholder for a transport."""
216 self.closed = closed_future
217 self.transport = None
219 def close(self) -> None:
220 """Close the placeholder."""
222 def abort(self) -> None:
223 """Abort the placeholder (does nothing)."""
226class BaseConnector:
227 """Base connector class.
229 keepalive_timeout - (optional) Keep-alive timeout.
230 force_close - Set to True to force close and do reconnect
231 after each request (and between redirects).
232 limit - The total number of simultaneous connections.
233 limit_per_host - Number of simultaneous connections to one host.
234 enable_cleanup_closed - Enables clean-up closed ssl transports.
235 Disabled by default.
236 timeout_ceil_threshold - Trigger ceiling of timeout values when
237 it's above timeout_ceil_threshold.
238 loop - Optional event loop.
239 """
241 _closed = True # prevent AttributeError in __del__ if ctor was failed
242 _source_traceback = None
244 # abort transport after 2 seconds (cleanup broken connections)
245 _cleanup_closed_period = 2.0
247 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET
249 def __init__(
250 self,
251 *,
252 keepalive_timeout: _SENTINEL | None | float = sentinel,
253 force_close: bool = False,
254 limit: int = 100,
255 limit_per_host: int = 0,
256 enable_cleanup_closed: bool = False,
257 timeout_ceil_threshold: float = 5,
258 ) -> None:
259 if force_close:
260 if keepalive_timeout is not None and keepalive_timeout is not sentinel:
261 raise ValueError(
262 "keepalive_timeout cannot be set if force_close is True"
263 )
264 else:
265 if keepalive_timeout is sentinel:
266 keepalive_timeout = 15.0
268 self._timeout_ceil_threshold = timeout_ceil_threshold
270 loop = asyncio.get_running_loop()
272 self._closed = False
273 if loop.get_debug():
274 self._source_traceback = traceback.extract_stack(sys._getframe(1))
276 # Connection pool of reusable connections.
277 # We use a deque to store connections because it has O(1) popleft()
278 # and O(1) append() operations to implement a FIFO queue.
279 self._conns: defaultdict[
280 ConnectionKey, deque[tuple[ResponseHandler, float]]
281 ] = defaultdict(deque)
282 self._limit = limit
283 self._limit_per_host = limit_per_host
284 self._acquired: set[ResponseHandler] = set()
285 self._acquired_per_host: defaultdict[ConnectionKey, set[ResponseHandler]] = (
286 defaultdict(set)
287 )
288 self._keepalive_timeout = cast(float, keepalive_timeout)
289 self._force_close = force_close
291 # {host_key: FIFO list of waiters}
292 # The FIFO is implemented with an OrderedDict with None keys because
293 # python does not have an ordered set.
294 self._waiters: defaultdict[
295 ConnectionKey, OrderedDict[asyncio.Future[None], None]
296 ] = defaultdict(OrderedDict)
298 self._loop = loop
299 self._factory = functools.partial(ResponseHandler, loop=loop)
301 # start keep-alive connection cleanup task
302 self._cleanup_handle: asyncio.TimerHandle | None = None
304 # start cleanup closed transports task
305 self._cleanup_closed_handle: asyncio.TimerHandle | None = None
307 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED:
308 warnings.warn(
309 "enable_cleanup_closed ignored because "
310 "https://github.com/python/cpython/pull/118960 is fixed "
311 f"in Python version {sys.version_info}",
312 DeprecationWarning,
313 stacklevel=2,
314 )
315 enable_cleanup_closed = False
317 self._cleanup_closed_disabled = not enable_cleanup_closed
318 self._cleanup_closed_transports: list[asyncio.Transport | None] = []
320 self._placeholder_future: asyncio.Future[Exception | None] = (
321 loop.create_future()
322 )
323 self._placeholder_future.set_result(None)
324 self._cleanup_closed()
326 def __del__(self, _warnings: Any = warnings) -> None:
327 if self._closed:
328 return
329 if not self._conns:
330 return
332 conns = [repr(c) for c in self._conns.values()]
334 self._close_immediately()
336 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, source=self)
337 context = {
338 "connector": self,
339 "connections": conns,
340 "message": "Unclosed connector",
341 }
342 if self._source_traceback is not None:
343 context["source_traceback"] = self._source_traceback
344 self._loop.call_exception_handler(context)
346 async def __aenter__(self) -> "BaseConnector":
347 return self
349 async def __aexit__(
350 self,
351 exc_type: type[BaseException] | None = None,
352 exc_value: BaseException | None = None,
353 exc_traceback: TracebackType | None = None,
354 ) -> None:
355 await self.close()
357 @property
358 def force_close(self) -> bool:
359 """Ultimately close connection on releasing if True."""
360 return self._force_close
362 @property
363 def limit(self) -> int:
364 """The total number for simultaneous connections.
366 If limit is 0 the connector has no limit.
367 The default limit size is 100.
368 """
369 return self._limit
371 @property
372 def limit_per_host(self) -> int:
373 """The limit for simultaneous connections to the same endpoint.
375 Endpoints are the same if they are have equal
376 (host, port, is_ssl) triple.
377 """
378 return self._limit_per_host
380 def _cleanup(self) -> None:
381 """Cleanup unused transports."""
382 if self._cleanup_handle:
383 self._cleanup_handle.cancel()
384 # _cleanup_handle should be unset, otherwise _release() will not
385 # recreate it ever!
386 self._cleanup_handle = None
388 now = monotonic()
389 timeout = self._keepalive_timeout
391 if self._conns:
392 connections = defaultdict(deque)
393 deadline = now - timeout
394 for key, conns in self._conns.items():
395 alive: deque[tuple[ResponseHandler, float]] = deque()
396 for proto, use_time in conns:
397 if proto.is_connected() and use_time - deadline >= 0:
398 alive.append((proto, use_time))
399 continue
400 transport = proto.transport
401 proto.close()
402 if not self._cleanup_closed_disabled and key.is_ssl:
403 self._cleanup_closed_transports.append(transport)
405 if alive:
406 connections[key] = alive
408 self._conns = connections
410 if self._conns:
411 self._cleanup_handle = helpers.weakref_handle(
412 self,
413 "_cleanup",
414 timeout,
415 self._loop,
416 timeout_ceil_threshold=self._timeout_ceil_threshold,
417 )
419 def _cleanup_closed(self) -> None:
420 """Double confirmation for transport close.
422 Some broken ssl servers may leave socket open without proper close.
423 """
424 if self._cleanup_closed_handle:
425 self._cleanup_closed_handle.cancel()
427 for transport in self._cleanup_closed_transports:
428 if transport is not None:
429 transport.abort()
431 self._cleanup_closed_transports = []
433 if not self._cleanup_closed_disabled:
434 self._cleanup_closed_handle = helpers.weakref_handle(
435 self,
436 "_cleanup_closed",
437 self._cleanup_closed_period,
438 self._loop,
439 timeout_ceil_threshold=self._timeout_ceil_threshold,
440 )
442 async def close(self, *, abort_ssl: bool = False) -> None:
443 """Close all opened transports.
445 :param abort_ssl: If True, SSL connections will be aborted immediately
446 without performing the shutdown handshake. This provides
447 faster cleanup at the cost of less graceful disconnection.
448 """
449 waiters = self._close_immediately(abort_ssl=abort_ssl)
450 if waiters:
451 results = await asyncio.gather(*waiters, return_exceptions=True)
452 for res in results:
453 if isinstance(res, Exception):
454 err_msg = "Error while closing connector: " + repr(res)
455 client_logger.debug(err_msg)
457 def _close_immediately(self, *, abort_ssl: bool = False) -> list[Awaitable[object]]:
458 waiters: list[Awaitable[object]] = []
460 if self._closed:
461 return waiters
463 self._closed = True
465 try:
466 if self._loop.is_closed():
467 return waiters
469 # cancel cleanup task
470 if self._cleanup_handle:
471 self._cleanup_handle.cancel()
473 # cancel cleanup close task
474 if self._cleanup_closed_handle:
475 self._cleanup_closed_handle.cancel()
477 for data in self._conns.values():
478 for proto, _ in data:
479 if (
480 abort_ssl
481 and proto.transport
482 and proto.transport.get_extra_info("sslcontext") is not None
483 ):
484 proto.abort()
485 else:
486 proto.close()
487 if closed := proto.closed:
488 waiters.append(closed)
490 for proto in self._acquired:
491 if (
492 abort_ssl
493 and proto.transport
494 and proto.transport.get_extra_info("sslcontext") is not None
495 ):
496 proto.abort()
497 else:
498 proto.close()
499 if closed := proto.closed:
500 waiters.append(closed)
502 # TODO (A.Yushovskiy, 24-May-2019) collect transp. closing futures
503 for transport in self._cleanup_closed_transports:
504 if transport is not None:
505 transport.abort()
507 return waiters
509 finally:
510 self._conns.clear()
511 self._acquired.clear()
512 for keyed_waiters in self._waiters.values():
513 for keyed_waiter in keyed_waiters:
514 keyed_waiter.cancel()
515 self._waiters.clear()
516 self._cleanup_handle = None
517 self._cleanup_closed_transports.clear()
518 self._cleanup_closed_handle = None
520 @property
521 def closed(self) -> bool:
522 """Is connector closed.
524 A readonly property.
525 """
526 return self._closed
528 def _available_connections(self, key: "ConnectionKey") -> int:
529 """
530 Return number of available connections.
532 The limit, limit_per_host and the connection key are taken into account.
534 If it returns less than 1 means that there are no connections
535 available.
536 """
537 # check total available connections
538 # If there are no limits, this will always return 1
539 total_remain = 1
541 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0:
542 return total_remain
544 # check limit per host
545 if host_remain := self._limit_per_host:
546 if acquired := self._acquired_per_host.get(key):
547 host_remain -= len(acquired)
548 if total_remain > host_remain:
549 return host_remain
551 return total_remain
553 def _update_proxy_auth_header_and_build_proxy_req(
554 self, req: ClientRequest
555 ) -> ClientRequestBase:
556 """Set Proxy-Authorization header for non-SSL proxy requests and builds the proxy request for SSL proxy requests."""
557 url = req.proxy
558 assert url is not None
559 headers = req.proxy_headers or CIMultiDict[str]()
560 headers[hdrs.HOST] = req.headers[hdrs.HOST]
561 proxy_req = ClientRequestBase(
562 hdrs.METH_GET,
563 url,
564 headers=headers,
565 auth=req.proxy_auth,
566 loop=self._loop,
567 ssl=req.ssl,
568 )
569 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
570 if auth is not None:
571 if not req.is_ssl():
572 req.headers[hdrs.PROXY_AUTHORIZATION] = auth
573 else:
574 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
575 return proxy_req
577 async def connect(
578 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
579 ) -> Connection:
580 """Get from pool or create new connection."""
581 key = req.connection_key
582 if (conn := await self._get(key, traces)) is not None:
583 # If we do not have to wait and we can get a connection from the pool
584 # we can avoid the timeout ceil logic and directly return the connection
585 if req.proxy:
586 self._update_proxy_auth_header_and_build_proxy_req(req)
587 return conn
589 async with ceil_timeout(timeout.connect, timeout.ceil_threshold):
590 if self._available_connections(key) <= 0:
591 await self._wait_for_available_connection(key, traces)
592 if (conn := await self._get(key, traces)) is not None:
593 if req.proxy:
594 self._update_proxy_auth_header_and_build_proxy_req(req)
595 return conn
597 placeholder = cast(
598 ResponseHandler, _TransportPlaceholder(self._placeholder_future)
599 )
600 self._acquired.add(placeholder)
601 if self._limit_per_host:
602 self._acquired_per_host[key].add(placeholder)
604 try:
605 # Traces are done inside the try block to ensure that the
606 # that the placeholder is still cleaned up if an exception
607 # is raised.
608 if traces:
609 for trace in traces:
610 await trace.send_connection_create_start()
611 proto = await self._create_connection(req, traces, timeout)
612 if traces:
613 for trace in traces:
614 await trace.send_connection_create_end()
615 except BaseException:
616 self._release_acquired(key, placeholder)
617 raise
618 else:
619 if self._closed:
620 proto.close()
621 raise ClientConnectionError("Connector is closed.")
623 # The connection was successfully created, drop the placeholder
624 # and add the real connection to the acquired set. There should
625 # be no awaits after the proto is added to the acquired set
626 # to ensure that the connection is not left in the acquired set
627 # on cancellation.
628 self._acquired.remove(placeholder)
629 self._acquired.add(proto)
630 if self._limit_per_host:
631 acquired_per_host = self._acquired_per_host[key]
632 acquired_per_host.remove(placeholder)
633 acquired_per_host.add(proto)
634 return Connection(self, key, proto, self._loop)
636 async def _wait_for_available_connection(
637 self, key: "ConnectionKey", traces: list["Trace"]
638 ) -> None:
639 """Wait for an available connection slot."""
640 # We loop here because there is a race between
641 # the connection limit check and the connection
642 # being acquired. If the connection is acquired
643 # between the check and the await statement, we
644 # need to loop again to check if the connection
645 # slot is still available.
646 attempts = 0
647 while True:
648 fut: asyncio.Future[None] = self._loop.create_future()
649 keyed_waiters = self._waiters[key]
650 keyed_waiters[fut] = None
651 if attempts:
652 # If we have waited before, we need to move the waiter
653 # to the front of the queue as otherwise we might get
654 # starved and hit the timeout.
655 keyed_waiters.move_to_end(fut, last=False)
657 try:
658 # Traces happen in the try block to ensure that the
659 # the waiter is still cleaned up if an exception is raised.
660 if traces:
661 for trace in traces:
662 await trace.send_connection_queued_start()
663 await fut
664 if traces:
665 for trace in traces:
666 await trace.send_connection_queued_end()
667 finally:
668 # pop the waiter from the queue if its still
669 # there and not already removed by _release_waiter
670 keyed_waiters.pop(fut, None)
671 if not self._waiters.get(key, True):
672 del self._waiters[key]
674 if self._available_connections(key) > 0:
675 break
676 attempts += 1
678 async def _get(
679 self, key: "ConnectionKey", traces: list["Trace"]
680 ) -> Connection | None:
681 """Get next reusable connection for the key or None.
683 The connection will be marked as acquired.
684 """
685 if (conns := self._conns.get(key)) is None:
686 return None
688 t1 = monotonic()
689 while conns:
690 proto, t0 = conns.popleft()
691 # We will we reuse the connection if its connected and
692 # the keepalive timeout has not been exceeded
693 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout:
694 if not conns:
695 # The very last connection was reclaimed: drop the key
696 del self._conns[key]
697 self._acquired.add(proto)
698 if self._limit_per_host:
699 self._acquired_per_host[key].add(proto)
700 if traces:
701 for trace in traces:
702 try:
703 await trace.send_connection_reuseconn()
704 except BaseException:
705 self._release_acquired(key, proto)
706 raise
707 return Connection(self, key, proto, self._loop)
709 # Connection cannot be reused, close it
710 transport = proto.transport
711 proto.close()
712 # only for SSL transports
713 if not self._cleanup_closed_disabled and key.is_ssl:
714 self._cleanup_closed_transports.append(transport)
716 # No more connections: drop the key
717 del self._conns[key]
718 return None
720 def _release_waiter(self) -> None:
721 """
722 Iterates over all waiters until one to be released is found.
724 The one to be released is not finished and
725 belongs to a host that has available connections.
726 """
727 if not self._waiters:
728 return
730 # Having the dict keys ordered this avoids to iterate
731 # at the same order at each call.
732 queues = list(self._waiters)
733 random.shuffle(queues)
735 for key in queues:
736 if self._available_connections(key) < 1:
737 continue
739 waiters = self._waiters[key]
740 while waiters:
741 waiter, _ = waiters.popitem(last=False)
742 if not waiter.done():
743 waiter.set_result(None)
744 return
746 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
747 """Release acquired connection."""
748 if self._closed:
749 # acquired connection is already released on connector closing
750 return
752 self._acquired.discard(proto)
753 if self._limit_per_host and (conns := self._acquired_per_host.get(key)):
754 conns.discard(proto)
755 if not conns:
756 del self._acquired_per_host[key]
757 self._release_waiter()
759 def _release(
760 self,
761 key: "ConnectionKey",
762 protocol: ResponseHandler,
763 *,
764 should_close: bool = False,
765 ) -> None:
766 if self._closed:
767 # acquired connection is already released on connector closing
768 return
770 self._release_acquired(key, protocol)
772 if self._force_close or should_close or protocol.should_close:
773 transport = protocol.transport
774 protocol.close()
775 if key.is_ssl and not self._cleanup_closed_disabled:
776 self._cleanup_closed_transports.append(transport)
777 return
779 self._conns[key].append((protocol, monotonic()))
781 if self._cleanup_handle is None:
782 self._cleanup_handle = helpers.weakref_handle(
783 self,
784 "_cleanup",
785 self._keepalive_timeout,
786 self._loop,
787 timeout_ceil_threshold=self._timeout_ceil_threshold,
788 )
790 async def _create_connection(
791 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
792 ) -> ResponseHandler:
793 raise NotImplementedError()
796class _DNSCacheTable:
797 def __init__(self, ttl: float | None = None, max_size: int = 1000) -> None:
798 self._addrs_rr: OrderedDict[
799 tuple[str, int], tuple[Iterator[ResolveResult], int]
800 ] = OrderedDict()
801 self._timestamps: dict[tuple[str, int], float] = {}
802 self._ttl = ttl
803 self._max_size = max_size
805 def __contains__(self, host: object) -> bool:
806 return host in self._addrs_rr
808 def add(self, key: tuple[str, int], addrs: list[ResolveResult]) -> None:
809 if key in self._addrs_rr:
810 self._addrs_rr.move_to_end(key)
812 self._addrs_rr[key] = (cycle(addrs), len(addrs))
814 if self._ttl is not None:
815 self._timestamps[key] = monotonic()
817 if len(self._addrs_rr) > self._max_size:
818 oldest_key, _ = self._addrs_rr.popitem(last=False)
819 self._timestamps.pop(oldest_key, None)
821 def remove(self, key: tuple[str, int]) -> None:
822 self._addrs_rr.pop(key, None)
823 self._timestamps.pop(key, None)
825 def clear(self) -> None:
826 self._addrs_rr.clear()
827 self._timestamps.clear()
829 def next_addrs(self, key: tuple[str, int]) -> list[ResolveResult]:
830 loop, length = self._addrs_rr[key]
831 addrs = list(islice(loop, length))
832 # Consume one more element to shift internal state of `cycle`
833 next(loop)
834 self._addrs_rr.move_to_end(key)
835 return addrs
837 def expired(self, key: tuple[str, int]) -> bool:
838 if self._ttl is None:
839 return False
841 return self._timestamps[key] + self._ttl < monotonic()
844def _make_ssl_context(verified: bool) -> SSLContext:
845 """Create SSL context.
847 This method is not async-friendly and should be called from a thread
848 because it will load certificates from disk and do other blocking I/O.
849 """
850 if ssl is None:
851 # No ssl support
852 return None # type: ignore[unreachable]
853 if verified:
854 sslcontext = ssl.create_default_context()
855 else:
856 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
857 sslcontext.options |= ssl.OP_NO_SSLv2
858 sslcontext.options |= ssl.OP_NO_SSLv3
859 sslcontext.check_hostname = False
860 sslcontext.verify_mode = ssl.CERT_NONE
861 sslcontext.options |= ssl.OP_NO_COMPRESSION
862 sslcontext.set_default_verify_paths()
863 sslcontext.set_alpn_protocols(("http/1.1",))
864 return sslcontext
867# The default SSLContext objects are created at import time
868# since they do blocking I/O to load certificates from disk,
869# and imports should always be done before the event loop starts
870# or in a thread.
871_SSL_CONTEXT_VERIFIED = _make_ssl_context(True)
872_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False)
875class TCPConnector(BaseConnector):
876 """TCP connector.
878 verify_ssl - Set to True to check ssl certifications.
879 fingerprint - Pass the binary sha256
880 digest of the expected certificate in DER format to verify
881 that the certificate the server presents matches. See also
882 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning
883 resolver - Enable DNS lookups and use this
884 resolver
885 use_dns_cache - Use memory cache for DNS lookups.
886 ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
887 family - socket address family
888 local_addr - local tuple of (host, port) to bind socket to
890 keepalive_timeout - (optional) Keep-alive timeout.
891 force_close - Set to True to force close and do reconnect
892 after each request (and between redirects).
893 limit - The total number of simultaneous connections.
894 limit_per_host - Number of simultaneous connections to one host.
895 enable_cleanup_closed - Enables clean-up closed ssl transports.
896 Disabled by default.
897 happy_eyeballs_delay - This is the “Connection Attempt Delay”
898 as defined in RFC 8305. To disable
899 the happy eyeballs algorithm, set to None.
900 interleave - “First Address Family Count” as defined in RFC 8305
901 loop - Optional event loop.
902 socket_factory - A SocketFactoryType function that, if supplied,
903 will be used to create sockets given an
904 AddrInfoType.
905 ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0.
906 Grace period for SSL shutdown handshake on TLS
907 connections. Default is 0 seconds (immediate abort).
908 This parameter allowed for a clean SSL shutdown by
909 notifying the remote peer of connection closure,
910 while avoiding excessive delays during connector cleanup.
911 Note: Only takes effect on Python 3.11+.
912 """
914 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"})
916 def __init__(
917 self,
918 *,
919 use_dns_cache: bool = True,
920 ttl_dns_cache: int | None = 10,
921 dns_cache_max_size: int = 1000,
922 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC,
923 ssl: bool | Fingerprint | SSLContext = True,
924 local_addr: tuple[str, int] | None = None,
925 resolver: AbstractResolver | None = None,
926 keepalive_timeout: None | float | _SENTINEL = sentinel,
927 force_close: bool = False,
928 limit: int = 100,
929 limit_per_host: int = 0,
930 enable_cleanup_closed: bool = False,
931 timeout_ceil_threshold: float = 5,
932 happy_eyeballs_delay: float | None = 0.25,
933 interleave: int | None = None,
934 socket_factory: SocketFactoryType | None = None,
935 ssl_shutdown_timeout: _SENTINEL | None | float = sentinel,
936 ):
937 super().__init__(
938 keepalive_timeout=keepalive_timeout,
939 force_close=force_close,
940 limit=limit,
941 limit_per_host=limit_per_host,
942 enable_cleanup_closed=enable_cleanup_closed,
943 timeout_ceil_threshold=timeout_ceil_threshold,
944 )
946 if not isinstance(ssl, SSL_ALLOWED_TYPES):
947 raise TypeError(
948 "ssl should be SSLContext, Fingerprint, or bool, "
949 f"got {ssl!r} instead."
950 )
951 self._ssl = ssl
953 self._resolver: AbstractResolver
954 if resolver is None:
955 self._resolver = DefaultResolver()
956 self._resolver_owner = True
957 else:
958 self._resolver = resolver
959 self._resolver_owner = False
961 self._use_dns_cache = use_dns_cache
962 self._cached_hosts = _DNSCacheTable(
963 ttl=ttl_dns_cache, max_size=dns_cache_max_size
964 )
965 self._throttle_dns_futures: dict[tuple[str, int], set[asyncio.Future[None]]] = (
966 {}
967 )
968 self._family = family
969 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr)
970 self._happy_eyeballs_delay = happy_eyeballs_delay
971 self._interleave = interleave
972 self._resolve_host_tasks: set[asyncio.Task[list[ResolveResult]]] = set()
973 self._socket_factory = socket_factory
974 self._ssl_shutdown_timeout: float | None
976 # Handle ssl_shutdown_timeout with warning for Python < 3.11
977 if ssl_shutdown_timeout is sentinel:
978 self._ssl_shutdown_timeout = 0
979 else:
980 # Deprecation warning for ssl_shutdown_timeout parameter
981 warnings.warn(
982 "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0",
983 DeprecationWarning,
984 stacklevel=2,
985 )
986 if (
987 sys.version_info < (3, 11)
988 and ssl_shutdown_timeout is not None
989 and ssl_shutdown_timeout != 0
990 ):
991 warnings.warn(
992 f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; "
993 "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.",
994 RuntimeWarning,
995 stacklevel=2,
996 )
997 self._ssl_shutdown_timeout = ssl_shutdown_timeout
999 async def close(self, *, abort_ssl: bool = False) -> None:
1000 """Close all opened transports.
1002 :param abort_ssl: If True, SSL connections will be aborted immediately
1003 without performing the shutdown handshake. If False (default),
1004 the behavior is determined by ssl_shutdown_timeout:
1005 - If ssl_shutdown_timeout=0: connections are aborted
1006 - If ssl_shutdown_timeout>0: graceful shutdown is performed
1007 """
1008 if self._resolver_owner:
1009 await self._resolver.close()
1010 # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default
1011 await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0)
1013 def _close_immediately(self, *, abort_ssl: bool = False) -> list[Awaitable[object]]:
1014 for fut in chain.from_iterable(self._throttle_dns_futures.values()):
1015 fut.cancel()
1017 waiters = super()._close_immediately(abort_ssl=abort_ssl)
1019 for t in self._resolve_host_tasks:
1020 t.cancel()
1021 waiters.append(t)
1023 return waiters
1025 @property
1026 def family(self) -> int:
1027 """Socket family like AF_INET."""
1028 return self._family
1030 @property
1031 def use_dns_cache(self) -> bool:
1032 """True if local DNS caching is enabled."""
1033 return self._use_dns_cache
1035 def clear_dns_cache(self, host: str | None = None, port: int | None = None) -> None:
1036 """Remove specified host/port or clear all dns local cache."""
1037 if host is not None and port is not None:
1038 self._cached_hosts.remove((host, port))
1039 elif host is not None or port is not None:
1040 raise ValueError("either both host and port or none of them are allowed")
1041 else:
1042 self._cached_hosts.clear()
1044 async def _resolve_host(
1045 self, host: str, port: int, traces: Sequence["Trace"] | None = None
1046 ) -> list[ResolveResult]:
1047 """Resolve host and return list of addresses."""
1048 if is_ip_address(host):
1049 return [
1050 {
1051 "hostname": host,
1052 "host": host,
1053 "port": port,
1054 "family": self._family,
1055 "proto": 0,
1056 "flags": 0,
1057 }
1058 ]
1060 if not self._use_dns_cache:
1061 if traces:
1062 for trace in traces:
1063 await trace.send_dns_resolvehost_start(host)
1065 res = await self._resolver.resolve(host, port, family=self._family)
1067 if traces:
1068 for trace in traces:
1069 await trace.send_dns_resolvehost_end(host)
1071 return res
1073 key = (host, port)
1074 if key in self._cached_hosts and not self._cached_hosts.expired(key):
1075 # get result early, before any await (#4014)
1076 result = self._cached_hosts.next_addrs(key)
1078 if traces:
1079 for trace in traces:
1080 await trace.send_dns_cache_hit(host)
1081 return result
1083 futures: set[asyncio.Future[None]]
1084 #
1085 # If multiple connectors are resolving the same host, we wait
1086 # for the first one to resolve and then use the result for all of them.
1087 # We use a throttle to ensure that we only resolve the host once
1088 # and then use the result for all the waiters.
1089 #
1090 if key in self._throttle_dns_futures:
1091 # get futures early, before any await (#4014)
1092 futures = self._throttle_dns_futures[key]
1093 future: asyncio.Future[None] = self._loop.create_future()
1094 futures.add(future)
1095 if traces:
1096 for trace in traces:
1097 await trace.send_dns_cache_hit(host)
1098 try:
1099 await future
1100 finally:
1101 futures.discard(future)
1102 return self._cached_hosts.next_addrs(key)
1104 # update dict early, before any await (#4014)
1105 self._throttle_dns_futures[key] = futures = set()
1106 # In this case we need to create a task to ensure that we can shield
1107 # the task from cancellation as cancelling this lookup should not cancel
1108 # the underlying lookup or else the cancel event will get broadcast to
1109 # all the waiters across all connections.
1110 #
1111 coro = self._resolve_host_with_throttle(key, host, port, futures, traces)
1112 loop = asyncio.get_running_loop()
1113 if sys.version_info >= (3, 12):
1114 # Optimization for Python 3.12, try to send immediately
1115 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True)
1116 else:
1117 resolved_host_task = loop.create_task(coro)
1119 if not resolved_host_task.done():
1120 self._resolve_host_tasks.add(resolved_host_task)
1121 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard)
1123 try:
1124 return await asyncio.shield(resolved_host_task)
1125 except asyncio.CancelledError:
1127 def drop_exception(fut: "asyncio.Future[list[ResolveResult]]") -> None:
1128 with suppress(Exception, asyncio.CancelledError):
1129 fut.result()
1131 resolved_host_task.add_done_callback(drop_exception)
1132 raise
1134 async def _resolve_host_with_throttle(
1135 self,
1136 key: tuple[str, int],
1137 host: str,
1138 port: int,
1139 futures: set[asyncio.Future[None]],
1140 traces: Sequence["Trace"] | None,
1141 ) -> list[ResolveResult]:
1142 """Resolve host and set result for all waiters.
1144 This method must be run in a task and shielded from cancellation
1145 to avoid cancelling the underlying lookup.
1146 """
1147 try:
1148 if traces:
1149 for trace in traces:
1150 await trace.send_dns_cache_miss(host)
1152 for trace in traces:
1153 await trace.send_dns_resolvehost_start(host)
1155 addrs = await self._resolver.resolve(host, port, family=self._family)
1156 if traces:
1157 for trace in traces:
1158 await trace.send_dns_resolvehost_end(host)
1160 self._cached_hosts.add(key, addrs)
1161 for fut in futures:
1162 set_result(fut, None)
1163 except BaseException as e:
1164 # any DNS exception is set for the waiters to raise the same exception.
1165 # This coro is always run in task that is shielded from cancellation so
1166 # we should never be propagating cancellation here.
1167 for fut in futures:
1168 set_exception(fut, e)
1169 raise
1170 finally:
1171 self._throttle_dns_futures.pop(key)
1173 return self._cached_hosts.next_addrs(key)
1175 async def _create_connection(
1176 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
1177 ) -> ResponseHandler:
1178 """Create connection.
1180 Has same keyword arguments as BaseEventLoop.create_connection.
1181 """
1182 if req.proxy:
1183 _, proto = await self._create_proxy_connection(req, traces, timeout)
1184 else:
1185 _, proto = await self._create_direct_connection(req, traces, timeout)
1187 return proto
1189 def _get_ssl_context(self, req: ClientRequestBase) -> SSLContext | None:
1190 """Logic to get the correct SSL context
1192 0. if req.ssl is false, return None
1194 1. if ssl_context is specified in req, use it
1195 2. if _ssl_context is specified in self, use it
1196 3. otherwise:
1197 1. if verify_ssl is not specified in req, use self.ssl_context
1198 (will generate a default context according to self.verify_ssl)
1199 2. if verify_ssl is True in req, generate a default SSL context
1200 3. if verify_ssl is False in req, generate a SSL context that
1201 won't verify
1202 """
1203 if not req.is_ssl():
1204 return None
1206 if ssl is None: # pragma: no cover
1207 raise RuntimeError("SSL is not supported.")
1208 sslcontext = req.ssl
1209 if isinstance(sslcontext, ssl.SSLContext):
1210 return sslcontext
1211 if sslcontext is not True:
1212 # not verified or fingerprinted
1213 return _SSL_CONTEXT_UNVERIFIED
1214 sslcontext = self._ssl
1215 if isinstance(sslcontext, ssl.SSLContext):
1216 return sslcontext
1217 if sslcontext is not True:
1218 # not verified or fingerprinted
1219 return _SSL_CONTEXT_UNVERIFIED
1220 return _SSL_CONTEXT_VERIFIED
1222 def _get_fingerprint(self, req: ClientRequestBase) -> "Fingerprint | None":
1223 ret = req.ssl
1224 if isinstance(ret, Fingerprint):
1225 return ret
1226 ret = self._ssl
1227 if isinstance(ret, Fingerprint):
1228 return ret
1229 return None
1231 async def _wrap_create_connection(
1232 self,
1233 *args: Any,
1234 addr_infos: list[AddrInfoType],
1235 req: ClientRequestBase,
1236 timeout: "ClientTimeout",
1237 client_error: type[Exception] = ClientConnectorError,
1238 **kwargs: Any,
1239 ) -> tuple[asyncio.Transport, ResponseHandler]:
1240 try:
1241 async with ceil_timeout(
1242 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1243 ):
1244 sock = await aiohappyeyeballs.start_connection(
1245 addr_infos=addr_infos,
1246 local_addr_infos=self._local_addr_infos,
1247 happy_eyeballs_delay=self._happy_eyeballs_delay,
1248 interleave=self._interleave,
1249 loop=self._loop,
1250 socket_factory=self._socket_factory,
1251 )
1252 # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used
1253 if (
1254 kwargs.get("ssl")
1255 and self._ssl_shutdown_timeout
1256 and sys.version_info >= (3, 11)
1257 ):
1258 kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout
1259 return await self._loop.create_connection(*args, **kwargs, sock=sock)
1260 except cert_errors as exc:
1261 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1262 except ssl_errors as exc:
1263 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1264 except OSError as exc:
1265 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1266 raise
1267 raise client_error(req.connection_key, exc) from exc
1269 def _warn_about_tls_in_tls(
1270 self,
1271 underlying_transport: asyncio.Transport,
1272 req: ClientRequest,
1273 ) -> None:
1274 """Issue a warning if the requested URL has HTTPS scheme."""
1275 if req.url.scheme != "https":
1276 return
1278 # TLS-in-TLS only applies when the proxy itself is HTTPS.
1279 # When the proxy is HTTP, start_tls upgrades a plain TCP connection,
1280 # which is standard TLS and works on all event loops and Python versions.
1281 if req.proxy is None or req.proxy.scheme != "https":
1282 return
1284 # Check if uvloop is being used, which supports TLS in TLS,
1285 # otherwise assume that asyncio's native transport is being used.
1286 if type(underlying_transport).__module__.startswith("uvloop"):
1287 return
1289 # Support in asyncio was added in Python 3.11 (bpo-44011)
1290 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr(
1291 underlying_transport,
1292 "_start_tls_compatible",
1293 False,
1294 )
1296 if asyncio_supports_tls_in_tls:
1297 return
1299 warnings.warn(
1300 "An HTTPS request is being sent through an HTTPS proxy. "
1301 "This support for TLS in TLS is known to be disabled "
1302 "in the stdlib asyncio. This is why you'll probably see "
1303 "an error in the log below.\n\n"
1304 "It is possible to enable it via monkeypatching. "
1305 "For more details, see:\n"
1306 "* https://bugs.python.org/issue37179\n"
1307 "* https://github.com/python/cpython/pull/28073\n\n"
1308 "You can temporarily patch this as follows:\n"
1309 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
1310 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1311 RuntimeWarning,
1312 source=self,
1313 # Why `4`? At least 3 of the calls in the stack originate
1314 # from the methods in this class.
1315 stacklevel=3,
1316 )
1318 async def _start_tls_connection(
1319 self,
1320 underlying_transport: asyncio.Transport,
1321 req: ClientRequest,
1322 timeout: "ClientTimeout",
1323 client_error: type[Exception] = ClientConnectorError,
1324 ) -> tuple[asyncio.BaseTransport, ResponseHandler]:
1325 """Wrap the raw TCP transport with TLS."""
1326 tls_proto = self._factory() # Create a brand new proto for TLS
1327 sslcontext = self._get_ssl_context(req)
1328 if TYPE_CHECKING:
1329 # _start_tls_connection is unreachable in the current code path
1330 # if sslcontext is None.
1331 assert sslcontext is not None
1333 try:
1334 async with ceil_timeout(
1335 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1336 ):
1337 try:
1338 # ssl_shutdown_timeout is only available in Python 3.11+
1339 if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout:
1340 tls_transport = await self._loop.start_tls(
1341 underlying_transport,
1342 tls_proto,
1343 sslcontext,
1344 server_hostname=req.server_hostname or req.url.raw_host,
1345 ssl_handshake_timeout=timeout.total,
1346 ssl_shutdown_timeout=self._ssl_shutdown_timeout,
1347 )
1348 else:
1349 tls_transport = await self._loop.start_tls(
1350 underlying_transport,
1351 tls_proto,
1352 sslcontext,
1353 server_hostname=req.server_hostname or req.url.raw_host,
1354 ssl_handshake_timeout=timeout.total,
1355 )
1356 except BaseException:
1357 # We need to close the underlying transport since
1358 # `start_tls()` probably failed before it had a
1359 # chance to do this:
1360 if self._ssl_shutdown_timeout == 0:
1361 underlying_transport.abort()
1362 else:
1363 underlying_transport.close()
1364 raise
1365 if isinstance(tls_transport, asyncio.Transport):
1366 fingerprint = self._get_fingerprint(req)
1367 if fingerprint:
1368 try:
1369 fingerprint.check(tls_transport)
1370 except ServerFingerprintMismatch:
1371 tls_transport.close()
1372 if not self._cleanup_closed_disabled:
1373 self._cleanup_closed_transports.append(tls_transport)
1374 raise
1375 except cert_errors as exc:
1376 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1377 except ssl_errors as exc:
1378 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1379 except OSError as exc:
1380 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1381 raise
1382 raise client_error(req.connection_key, exc) from exc
1383 except TypeError as type_err:
1384 # Example cause looks like this:
1385 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
1386 # object at 0x7f760615e460> is not supported by start_tls()
1388 raise ClientConnectionError(
1389 "Cannot initialize a TLS-in-TLS connection to host "
1390 f"{req.url.host!s}:{req.url.port:d} through an underlying connection "
1391 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
1392 f"[{type_err!s}]"
1393 ) from type_err
1394 else:
1395 if tls_transport is None:
1396 msg = "Failed to start TLS (possibly caused by closing transport)"
1397 raise client_error(req.connection_key, OSError(msg))
1398 tls_proto.connection_made(
1399 tls_transport
1400 ) # Kick the state machine of the new TLS protocol
1402 return tls_transport, tls_proto
1404 def _convert_hosts_to_addr_infos(
1405 self, hosts: list[ResolveResult]
1406 ) -> list[AddrInfoType]:
1407 """Converts the list of hosts to a list of addr_infos.
1409 The list of hosts is the result of a DNS lookup. The list of
1410 addr_infos is the result of a call to `socket.getaddrinfo()`.
1411 """
1412 addr_infos: list[AddrInfoType] = []
1413 for hinfo in hosts:
1414 host = hinfo["host"]
1415 is_ipv6 = ":" in host
1416 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET
1417 if self._family and self._family != family:
1418 continue
1419 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"])
1420 addr_infos.append(
1421 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)
1422 )
1423 return addr_infos
1425 async def _create_direct_connection(
1426 self,
1427 req: ClientRequestBase,
1428 traces: list["Trace"],
1429 timeout: "ClientTimeout",
1430 *,
1431 client_error: type[Exception] = ClientConnectorError,
1432 ) -> tuple[asyncio.Transport, ResponseHandler]:
1433 sslcontext = self._get_ssl_context(req)
1434 fingerprint = self._get_fingerprint(req)
1436 host = req.url.raw_host
1437 assert host is not None
1438 # Replace multiple trailing dots with a single one.
1439 # A trailing dot is only present for fully-qualified domain names.
1440 # See https://github.com/aio-libs/aiohttp/pull/7364.
1441 if host.endswith(".."):
1442 host = host.rstrip(".") + "."
1443 port = req.url.port
1444 assert port is not None
1445 try:
1446 # Cancelling this lookup should not cancel the underlying lookup
1447 # or else the cancel event will get broadcast to all the waiters
1448 # across all connections.
1449 hosts = await self._resolve_host(host, port, traces=traces)
1450 except OSError as exc:
1451 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1452 raise
1453 # in case of proxy it is not ClientProxyConnectionError
1454 # it is problem of resolving proxy ip itself
1455 raise ClientConnectorDNSError(req.connection_key, exc) from exc
1457 last_exc: Exception | None = None
1458 addr_infos = self._convert_hosts_to_addr_infos(hosts)
1459 while addr_infos:
1460 # Strip trailing dots, certificates contain FQDN without dots.
1461 # See https://github.com/aio-libs/aiohttp/issues/3636
1462 server_hostname = (
1463 (req.server_hostname or host).rstrip(".") if sslcontext else None
1464 )
1466 try:
1467 transp, proto = await self._wrap_create_connection(
1468 self._factory,
1469 timeout=timeout,
1470 ssl=sslcontext,
1471 addr_infos=addr_infos,
1472 server_hostname=server_hostname,
1473 req=req,
1474 client_error=client_error,
1475 )
1476 except (ClientConnectorError, asyncio.TimeoutError) as exc:
1477 last_exc = exc
1478 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave)
1479 continue
1481 if req.is_ssl() and fingerprint:
1482 try:
1483 fingerprint.check(transp)
1484 except ServerFingerprintMismatch as exc:
1485 transp.close()
1486 if not self._cleanup_closed_disabled:
1487 self._cleanup_closed_transports.append(transp)
1488 last_exc = exc
1489 # Remove the bad peer from the list of addr_infos
1490 sock: socket.socket = transp.get_extra_info("socket")
1491 bad_peer = sock.getpeername()
1492 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer)
1493 continue
1495 return transp, proto
1496 assert last_exc is not None
1497 raise last_exc
1499 async def _create_proxy_connection(
1500 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
1501 ) -> tuple[asyncio.BaseTransport, ResponseHandler]:
1502 proxy_req = self._update_proxy_auth_header_and_build_proxy_req(req)
1504 # create connection to proxy server
1505 transport, proto = await self._create_direct_connection(
1506 proxy_req, [], timeout, client_error=ClientProxyConnectionError
1507 )
1509 if req.is_ssl():
1510 self._warn_about_tls_in_tls(transport, req)
1512 # For HTTPS requests over HTTP proxy
1513 # we must notify proxy to tunnel connection
1514 # so we send CONNECT command:
1515 # CONNECT www.python.org:443 HTTP/1.1
1516 # Host: www.python.org
1517 #
1518 # next we must do TLS handshake and so on
1519 # to do this we must wrap raw socket into secure one
1520 # asyncio handles this perfectly
1521 proxy_req.method = hdrs.METH_CONNECT
1522 proxy_req.url = req.url
1523 key = req.connection_key._replace(
1524 proxy=None, proxy_auth=None, proxy_headers_hash=None
1525 )
1526 conn = _ConnectTunnelConnection(self, key, proto, self._loop)
1527 proxy_resp = await proxy_req._send(conn)
1528 try:
1529 protocol = conn._protocol
1530 assert protocol is not None
1532 # read_until_eof=True will ensure the connection isn't closed
1533 # once the response is received and processed allowing
1534 # START_TLS to work on the connection below.
1535 protocol.set_response_params(
1536 read_until_eof=True,
1537 timeout_ceil_threshold=self._timeout_ceil_threshold,
1538 )
1539 resp = await proxy_resp.start(conn)
1540 except BaseException:
1541 proxy_resp.close()
1542 conn.close()
1543 raise
1544 else:
1545 conn._protocol = None
1546 try:
1547 if resp.status != 200:
1548 message = resp.reason
1549 if message is None:
1550 message = HTTPStatus(resp.status).phrase
1551 raise ClientHttpProxyError(
1552 proxy_resp.request_info,
1553 resp.history,
1554 status=resp.status,
1555 message=message,
1556 headers=resp.headers,
1557 )
1558 except BaseException:
1559 # It shouldn't be closed in `finally` because it's fed to
1560 # `loop.start_tls()` and the docs say not to touch it after
1561 # passing there.
1562 transport.close()
1563 raise
1565 return await self._start_tls_connection(
1566 # Access the old transport for the last time before it's
1567 # closed and forgotten forever:
1568 transport,
1569 req=req,
1570 timeout=timeout,
1571 )
1572 finally:
1573 proxy_resp.close()
1575 return transport, proto
1578class UnixConnector(BaseConnector):
1579 """Unix socket connector.
1581 path - Unix socket path.
1582 keepalive_timeout - (optional) Keep-alive timeout.
1583 force_close - Set to True to force close and do reconnect
1584 after each request (and between redirects).
1585 limit - The total number of simultaneous connections.
1586 limit_per_host - Number of simultaneous connections to one host.
1587 loop - Optional event loop.
1588 """
1590 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"})
1592 def __init__(
1593 self,
1594 path: str,
1595 force_close: bool = False,
1596 keepalive_timeout: _SENTINEL | float | None = sentinel,
1597 limit: int = 100,
1598 limit_per_host: int = 0,
1599 ) -> None:
1600 super().__init__(
1601 force_close=force_close,
1602 keepalive_timeout=keepalive_timeout,
1603 limit=limit,
1604 limit_per_host=limit_per_host,
1605 )
1606 self._path = path
1608 @property
1609 def path(self) -> str:
1610 """Path to unix socket."""
1611 return self._path
1613 async def _create_connection(
1614 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
1615 ) -> ResponseHandler:
1616 try:
1617 async with ceil_timeout(
1618 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1619 ):
1620 _, proto = await self._loop.create_unix_connection(
1621 self._factory, self._path
1622 )
1623 except OSError as exc:
1624 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1625 raise
1626 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
1628 return proto
1631class NamedPipeConnector(BaseConnector):
1632 """Named pipe connector.
1634 Only supported by the proactor event loop.
1635 See also: https://docs.python.org/3/library/asyncio-eventloop.html
1637 path - Windows named pipe path.
1638 keepalive_timeout - (optional) Keep-alive timeout.
1639 force_close - Set to True to force close and do reconnect
1640 after each request (and between redirects).
1641 limit - The total number of simultaneous connections.
1642 limit_per_host - Number of simultaneous connections to one host.
1643 loop - Optional event loop.
1644 """
1646 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"})
1648 def __init__(
1649 self,
1650 path: str,
1651 force_close: bool = False,
1652 keepalive_timeout: _SENTINEL | float | None = sentinel,
1653 limit: int = 100,
1654 limit_per_host: int = 0,
1655 ) -> None:
1656 super().__init__(
1657 force_close=force_close,
1658 keepalive_timeout=keepalive_timeout,
1659 limit=limit,
1660 limit_per_host=limit_per_host,
1661 )
1662 if not isinstance(
1663 self._loop,
1664 asyncio.ProactorEventLoop, # type: ignore[attr-defined]
1665 ):
1666 raise RuntimeError(
1667 "Named Pipes only available in proactor loop under windows"
1668 )
1669 self._path = path
1671 @property
1672 def path(self) -> str:
1673 """Path to the named pipe."""
1674 return self._path
1676 async def _create_connection(
1677 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
1678 ) -> ResponseHandler:
1679 try:
1680 async with ceil_timeout(
1681 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1682 ):
1683 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined]
1684 self._factory, self._path
1685 )
1686 # the drain is required so that the connection_made is called
1687 # and transport is set otherwise it is not set before the
1688 # `assert conn.transport is not None`
1689 # in client.py's _request method
1690 await asyncio.sleep(0)
1691 # other option is to manually set transport like
1692 # `proto.transport = trans`
1693 except OSError as exc:
1694 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1695 raise
1696 raise ClientConnectorError(req.connection_key, exc) from exc
1698 return cast(ResponseHandler, proto)