Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/connector.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import functools
3import random
4import socket
5import sys
6import traceback
7import warnings
8from collections import OrderedDict, defaultdict, deque
9from contextlib import suppress
10from http import HTTPStatus
11from itertools import chain, cycle, islice
12from time import monotonic
13from types import TracebackType
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Awaitable,
18 Callable,
19 DefaultDict,
20 Deque,
21 Dict,
22 Iterator,
23 List,
24 Literal,
25 Optional,
26 Sequence,
27 Set,
28 Tuple,
29 Type,
30 Union,
31 cast,
32)
34import aiohappyeyeballs
35from aiohappyeyeballs import AddrInfoType, SocketFactoryType
37from . import hdrs, helpers
38from .abc import AbstractResolver, ResolveResult
39from .client_exceptions import (
40 ClientConnectionError,
41 ClientConnectorCertificateError,
42 ClientConnectorDNSError,
43 ClientConnectorError,
44 ClientConnectorSSLError,
45 ClientHttpProxyError,
46 ClientProxyConnectionError,
47 ServerFingerprintMismatch,
48 UnixClientConnectorError,
49 cert_errors,
50 ssl_errors,
51)
52from .client_proto import ResponseHandler
53from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params
54from .helpers import (
55 _SENTINEL,
56 ceil_timeout,
57 is_ip_address,
58 noop,
59 sentinel,
60 set_exception,
61 set_result,
62)
63from .log import client_logger
64from .resolver import DefaultResolver
66if sys.version_info >= (3, 12):
67 from collections.abc import Buffer
68else:
69 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
71if TYPE_CHECKING:
72 import ssl
74 SSLContext = ssl.SSLContext
75else:
76 try:
77 import ssl
79 SSLContext = ssl.SSLContext
80 except ImportError: # pragma: no cover
81 ssl = None # type: ignore[assignment]
82 SSLContext = object # type: ignore[misc,assignment]
84EMPTY_SCHEMA_SET = frozenset({""})
85HTTP_SCHEMA_SET = frozenset({"http", "https"})
86WS_SCHEMA_SET = frozenset({"ws", "wss"})
88HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET
89HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET
91NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < (
92 3,
93 13,
94 1,
95) or sys.version_info < (3, 12, 7)
96# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960
97# which first appeared in Python 3.12.7 and 3.13.1
100__all__ = (
101 "BaseConnector",
102 "TCPConnector",
103 "UnixConnector",
104 "NamedPipeConnector",
105 "AddrInfoType",
106 "SocketFactoryType",
107)
110if TYPE_CHECKING:
111 from .client import ClientTimeout
112 from .client_reqrep import ConnectionKey
113 from .tracing import Trace
116class _DeprecationWaiter:
117 __slots__ = ("_awaitable", "_awaited")
119 def __init__(self, awaitable: Awaitable[Any]) -> None:
120 self._awaitable = awaitable
121 self._awaited = False
123 def __await__(self) -> Any:
124 self._awaited = True
125 return self._awaitable.__await__()
127 def __del__(self) -> None:
128 if not self._awaited:
129 warnings.warn(
130 "Connector.close() is a coroutine, "
131 "please use await connector.close()",
132 DeprecationWarning,
133 )
136async def _wait_for_close(waiters: List[Awaitable[object]]) -> None:
137 """Wait for all waiters to finish closing."""
138 results = await asyncio.gather(*waiters, return_exceptions=True)
139 for res in results:
140 if isinstance(res, Exception):
141 client_logger.debug("Error while closing connector: %r", res)
144class Connection:
146 _source_traceback = None
148 def __init__(
149 self,
150 connector: "BaseConnector",
151 key: "ConnectionKey",
152 protocol: ResponseHandler,
153 loop: asyncio.AbstractEventLoop,
154 ) -> None:
155 self._key = key
156 self._connector = connector
157 self._loop = loop
158 self._protocol: Optional[ResponseHandler] = protocol
159 self._callbacks: List[Callable[[], None]] = []
161 if loop.get_debug():
162 self._source_traceback = traceback.extract_stack(sys._getframe(1))
164 def __repr__(self) -> str:
165 return f"Connection<{self._key}>"
167 def __del__(self, _warnings: Any = warnings) -> None:
168 if self._protocol is not None:
169 kwargs = {"source": self}
170 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs)
171 if self._loop.is_closed():
172 return
174 self._connector._release(self._key, self._protocol, should_close=True)
176 context = {"client_connection": self, "message": "Unclosed connection"}
177 if self._source_traceback is not None:
178 context["source_traceback"] = self._source_traceback
179 self._loop.call_exception_handler(context)
181 def __bool__(self) -> Literal[True]:
182 """Force subclasses to not be falsy, to make checks simpler."""
183 return True
185 @property
186 def loop(self) -> asyncio.AbstractEventLoop:
187 warnings.warn(
188 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2
189 )
190 return self._loop
192 @property
193 def transport(self) -> Optional[asyncio.Transport]:
194 if self._protocol is None:
195 return None
196 return self._protocol.transport
198 @property
199 def protocol(self) -> Optional[ResponseHandler]:
200 return self._protocol
202 def add_callback(self, callback: Callable[[], None]) -> None:
203 if callback is not None:
204 self._callbacks.append(callback)
206 def _notify_release(self) -> None:
207 callbacks, self._callbacks = self._callbacks[:], []
209 for cb in callbacks:
210 with suppress(Exception):
211 cb()
213 def close(self) -> None:
214 self._notify_release()
216 if self._protocol is not None:
217 self._connector._release(self._key, self._protocol, should_close=True)
218 self._protocol = None
220 def release(self) -> None:
221 self._notify_release()
223 if self._protocol is not None:
224 self._connector._release(self._key, self._protocol)
225 self._protocol = None
227 @property
228 def closed(self) -> bool:
229 return self._protocol is None or not self._protocol.is_connected()
232class _ConnectTunnelConnection(Connection):
233 """Special connection wrapper for CONNECT tunnels that must never be pooled.
235 This connection wraps the proxy connection that will be upgraded with TLS.
236 It must never be released to the pool because:
237 1. Its 'closed' future will never complete, causing session.close() to hang
238 2. It represents an intermediate state, not a reusable connection
239 3. The real connection (with TLS) will be created separately
240 """
242 def release(self) -> None:
243 """Do nothing - don't pool or close the connection.
245 These connections are an intermediate state during the CONNECT tunnel
246 setup and will be cleaned up naturally after the TLS upgrade. If they
247 were to be pooled, they would never be properly closed, causing
248 session.close() to wait forever for their 'closed' future.
249 """
252class _TransportPlaceholder:
253 """placeholder for BaseConnector.connect function"""
255 __slots__ = ("closed", "transport")
257 def __init__(self, closed_future: asyncio.Future[Optional[Exception]]) -> None:
258 """Initialize a placeholder for a transport."""
259 self.closed = closed_future
260 self.transport = None
262 def close(self) -> None:
263 """Close the placeholder."""
265 def abort(self) -> None:
266 """Abort the placeholder (does nothing)."""
269class BaseConnector:
270 """Base connector class.
272 keepalive_timeout - (optional) Keep-alive timeout.
273 force_close - Set to True to force close and do reconnect
274 after each request (and between redirects).
275 limit - The total number of simultaneous connections.
276 limit_per_host - Number of simultaneous connections to one host.
277 enable_cleanup_closed - Enables clean-up closed ssl transports.
278 Disabled by default.
279 timeout_ceil_threshold - Trigger ceiling of timeout values when
280 it's above timeout_ceil_threshold.
281 loop - Optional event loop.
282 """
284 _closed = True # prevent AttributeError in __del__ if ctor was failed
285 _source_traceback = None
287 # abort transport after 2 seconds (cleanup broken connections)
288 _cleanup_closed_period = 2.0
290 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET
292 def __init__(
293 self,
294 *,
295 keepalive_timeout: Union[object, None, float] = sentinel,
296 force_close: bool = False,
297 limit: int = 100,
298 limit_per_host: int = 0,
299 enable_cleanup_closed: bool = False,
300 loop: Optional[asyncio.AbstractEventLoop] = None,
301 timeout_ceil_threshold: float = 5,
302 ) -> None:
304 if force_close:
305 if keepalive_timeout is not None and keepalive_timeout is not sentinel:
306 raise ValueError(
307 "keepalive_timeout cannot be set if force_close is True"
308 )
309 else:
310 if keepalive_timeout is sentinel:
311 keepalive_timeout = 15.0
313 loop = loop or asyncio.get_running_loop()
314 self._timeout_ceil_threshold = timeout_ceil_threshold
316 self._closed = False
317 if loop.get_debug():
318 self._source_traceback = traceback.extract_stack(sys._getframe(1))
320 # Connection pool of reusable connections.
321 # We use a deque to store connections because it has O(1) popleft()
322 # and O(1) append() operations to implement a FIFO queue.
323 self._conns: DefaultDict[
324 ConnectionKey, Deque[Tuple[ResponseHandler, float]]
325 ] = defaultdict(deque)
326 self._limit = limit
327 self._limit_per_host = limit_per_host
328 self._acquired: Set[ResponseHandler] = set()
329 self._acquired_per_host: DefaultDict[ConnectionKey, Set[ResponseHandler]] = (
330 defaultdict(set)
331 )
332 self._keepalive_timeout = cast(float, keepalive_timeout)
333 self._force_close = force_close
335 # {host_key: FIFO list of waiters}
336 # The FIFO is implemented with an OrderedDict with None keys because
337 # python does not have an ordered set.
338 self._waiters: DefaultDict[
339 ConnectionKey, OrderedDict[asyncio.Future[None], None]
340 ] = defaultdict(OrderedDict)
342 self._loop = loop
343 self._factory = functools.partial(ResponseHandler, loop=loop)
345 # start keep-alive connection cleanup task
346 self._cleanup_handle: Optional[asyncio.TimerHandle] = None
348 # start cleanup closed transports task
349 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None
351 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED:
352 warnings.warn(
353 "enable_cleanup_closed ignored because "
354 "https://github.com/python/cpython/pull/118960 is fixed "
355 f"in Python version {sys.version_info}",
356 DeprecationWarning,
357 stacklevel=2,
358 )
359 enable_cleanup_closed = False
361 self._cleanup_closed_disabled = not enable_cleanup_closed
362 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = []
363 self._placeholder_future: asyncio.Future[Optional[Exception]] = (
364 loop.create_future()
365 )
366 self._placeholder_future.set_result(None)
367 self._cleanup_closed()
369 def __del__(self, _warnings: Any = warnings) -> None:
370 if self._closed:
371 return
372 if not self._conns:
373 return
375 conns = [repr(c) for c in self._conns.values()]
377 self._close()
379 kwargs = {"source": self}
380 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs)
381 context = {
382 "connector": self,
383 "connections": conns,
384 "message": "Unclosed connector",
385 }
386 if self._source_traceback is not None:
387 context["source_traceback"] = self._source_traceback
388 self._loop.call_exception_handler(context)
390 def __enter__(self) -> "BaseConnector":
391 warnings.warn(
392 '"with Connector():" is deprecated, '
393 'use "async with Connector():" instead',
394 DeprecationWarning,
395 )
396 return self
398 def __exit__(self, *exc: Any) -> None:
399 self._close()
401 async def __aenter__(self) -> "BaseConnector":
402 return self
404 async def __aexit__(
405 self,
406 exc_type: Optional[Type[BaseException]] = None,
407 exc_value: Optional[BaseException] = None,
408 exc_traceback: Optional[TracebackType] = None,
409 ) -> None:
410 await self.close()
412 @property
413 def force_close(self) -> bool:
414 """Ultimately close connection on releasing if True."""
415 return self._force_close
417 @property
418 def limit(self) -> int:
419 """The total number for simultaneous connections.
421 If limit is 0 the connector has no limit.
422 The default limit size is 100.
423 """
424 return self._limit
426 @property
427 def limit_per_host(self) -> int:
428 """The limit for simultaneous connections to the same endpoint.
430 Endpoints are the same if they are have equal
431 (host, port, is_ssl) triple.
432 """
433 return self._limit_per_host
435 def _cleanup(self) -> None:
436 """Cleanup unused transports."""
437 if self._cleanup_handle:
438 self._cleanup_handle.cancel()
439 # _cleanup_handle should be unset, otherwise _release() will not
440 # recreate it ever!
441 self._cleanup_handle = None
443 now = monotonic()
444 timeout = self._keepalive_timeout
446 if self._conns:
447 connections = defaultdict(deque)
448 deadline = now - timeout
449 for key, conns in self._conns.items():
450 alive: Deque[Tuple[ResponseHandler, float]] = deque()
451 for proto, use_time in conns:
452 if proto.is_connected() and use_time - deadline >= 0:
453 alive.append((proto, use_time))
454 continue
455 transport = proto.transport
456 proto.close()
457 if not self._cleanup_closed_disabled and key.is_ssl:
458 self._cleanup_closed_transports.append(transport)
460 if alive:
461 connections[key] = alive
463 self._conns = connections
465 if self._conns:
466 self._cleanup_handle = helpers.weakref_handle(
467 self,
468 "_cleanup",
469 timeout,
470 self._loop,
471 timeout_ceil_threshold=self._timeout_ceil_threshold,
472 )
474 def _cleanup_closed(self) -> None:
475 """Double confirmation for transport close.
477 Some broken ssl servers may leave socket open without proper close.
478 """
479 if self._cleanup_closed_handle:
480 self._cleanup_closed_handle.cancel()
482 for transport in self._cleanup_closed_transports:
483 if transport is not None:
484 transport.abort()
486 self._cleanup_closed_transports = []
488 if not self._cleanup_closed_disabled:
489 self._cleanup_closed_handle = helpers.weakref_handle(
490 self,
491 "_cleanup_closed",
492 self._cleanup_closed_period,
493 self._loop,
494 timeout_ceil_threshold=self._timeout_ceil_threshold,
495 )
497 def close(self, *, abort_ssl: bool = False) -> Awaitable[None]:
498 """Close all opened transports.
500 :param abort_ssl: If True, SSL connections will be aborted immediately
501 without performing the shutdown handshake. This provides
502 faster cleanup at the cost of less graceful disconnection.
503 """
504 if not (waiters := self._close(abort_ssl=abort_ssl)):
505 # If there are no connections to close, we can return a noop
506 # awaitable to avoid scheduling a task on the event loop.
507 return _DeprecationWaiter(noop())
508 coro = _wait_for_close(waiters)
509 if sys.version_info >= (3, 12):
510 # Optimization for Python 3.12, try to close connections
511 # immediately to avoid having to schedule the task on the event loop.
512 task = asyncio.Task(coro, loop=self._loop, eager_start=True)
513 else:
514 task = self._loop.create_task(coro)
515 return _DeprecationWaiter(task)
517 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]:
518 waiters: List[Awaitable[object]] = []
520 if self._closed:
521 return waiters
523 self._closed = True
525 try:
526 if self._loop.is_closed():
527 return waiters
529 # cancel cleanup task
530 if self._cleanup_handle:
531 self._cleanup_handle.cancel()
533 # cancel cleanup close task
534 if self._cleanup_closed_handle:
535 self._cleanup_closed_handle.cancel()
537 for data in self._conns.values():
538 for proto, _ in data:
539 if (
540 abort_ssl
541 and proto.transport
542 and proto.transport.get_extra_info("sslcontext") is not None
543 ):
544 proto.abort()
545 else:
546 proto.close()
547 if closed := proto.closed:
548 waiters.append(closed)
550 for proto in self._acquired:
551 if (
552 abort_ssl
553 and proto.transport
554 and proto.transport.get_extra_info("sslcontext") is not None
555 ):
556 proto.abort()
557 else:
558 proto.close()
559 if closed := proto.closed:
560 waiters.append(closed)
562 for transport in self._cleanup_closed_transports:
563 if transport is not None:
564 transport.abort()
566 return waiters
568 finally:
569 self._conns.clear()
570 self._acquired.clear()
571 for keyed_waiters in self._waiters.values():
572 for keyed_waiter in keyed_waiters:
573 keyed_waiter.cancel()
574 self._waiters.clear()
575 self._cleanup_handle = None
576 self._cleanup_closed_transports.clear()
577 self._cleanup_closed_handle = None
579 @property
580 def closed(self) -> bool:
581 """Is connector closed.
583 A readonly property.
584 """
585 return self._closed
587 def _available_connections(self, key: "ConnectionKey") -> int:
588 """
589 Return number of available connections.
591 The limit, limit_per_host and the connection key are taken into account.
593 If it returns less than 1 means that there are no connections
594 available.
595 """
596 # check total available connections
597 # If there are no limits, this will always return 1
598 total_remain = 1
600 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0:
601 return total_remain
603 # check limit per host
604 if host_remain := self._limit_per_host:
605 if acquired := self._acquired_per_host.get(key):
606 host_remain -= len(acquired)
607 if total_remain > host_remain:
608 return host_remain
610 return total_remain
612 def _update_proxy_auth_header_and_build_proxy_req(
613 self, req: ClientRequest
614 ) -> ClientRequest:
615 """Set Proxy-Authorization header for non-SSL proxy requests and builds the proxy request for SSL proxy requests."""
616 url = req.proxy
617 assert url is not None
618 headers: Dict[str, str] = {}
619 if req.proxy_headers is not None:
620 headers = req.proxy_headers # type: ignore[assignment]
621 headers[hdrs.HOST] = req.headers[hdrs.HOST]
622 proxy_req = ClientRequest(
623 hdrs.METH_GET,
624 url,
625 headers=headers,
626 auth=req.proxy_auth,
627 loop=self._loop,
628 ssl=req.ssl,
629 )
630 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
631 if auth is not None:
632 if not req.is_ssl():
633 req.headers[hdrs.PROXY_AUTHORIZATION] = auth
634 else:
635 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
636 return proxy_req
638 async def connect(
639 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
640 ) -> Connection:
641 """Get from pool or create new connection."""
642 key = req.connection_key
643 if (conn := await self._get(key, traces)) is not None:
644 # If we do not have to wait and we can get a connection from the pool
645 # we can avoid the timeout ceil logic and directly return the connection
646 if req.proxy:
647 self._update_proxy_auth_header_and_build_proxy_req(req)
648 return conn
650 async with ceil_timeout(timeout.connect, timeout.ceil_threshold):
651 if self._available_connections(key) <= 0:
652 await self._wait_for_available_connection(key, traces)
653 if (conn := await self._get(key, traces)) is not None:
654 if req.proxy:
655 self._update_proxy_auth_header_and_build_proxy_req(req)
656 return conn
658 placeholder = cast(
659 ResponseHandler, _TransportPlaceholder(self._placeholder_future)
660 )
661 self._acquired.add(placeholder)
662 if self._limit_per_host:
663 self._acquired_per_host[key].add(placeholder)
665 try:
666 # Traces are done inside the try block to ensure that the
667 # that the placeholder is still cleaned up if an exception
668 # is raised.
669 if traces:
670 for trace in traces:
671 await trace.send_connection_create_start()
672 proto = await self._create_connection(req, traces, timeout)
673 if traces:
674 for trace in traces:
675 await trace.send_connection_create_end()
676 except BaseException:
677 self._release_acquired(key, placeholder)
678 raise
679 else:
680 if self._closed:
681 proto.close()
682 raise ClientConnectionError("Connector is closed.")
684 # The connection was successfully created, drop the placeholder
685 # and add the real connection to the acquired set. There should
686 # be no awaits after the proto is added to the acquired set
687 # to ensure that the connection is not left in the acquired set
688 # on cancellation.
689 self._acquired.remove(placeholder)
690 self._acquired.add(proto)
691 if self._limit_per_host:
692 acquired_per_host = self._acquired_per_host[key]
693 acquired_per_host.remove(placeholder)
694 acquired_per_host.add(proto)
695 return Connection(self, key, proto, self._loop)
697 async def _wait_for_available_connection(
698 self, key: "ConnectionKey", traces: List["Trace"]
699 ) -> None:
700 """Wait for an available connection slot."""
701 # We loop here because there is a race between
702 # the connection limit check and the connection
703 # being acquired. If the connection is acquired
704 # between the check and the await statement, we
705 # need to loop again to check if the connection
706 # slot is still available.
707 attempts = 0
708 while True:
709 fut: asyncio.Future[None] = self._loop.create_future()
710 keyed_waiters = self._waiters[key]
711 keyed_waiters[fut] = None
712 if attempts:
713 # If we have waited before, we need to move the waiter
714 # to the front of the queue as otherwise we might get
715 # starved and hit the timeout.
716 keyed_waiters.move_to_end(fut, last=False)
718 try:
719 # Traces happen in the try block to ensure that the
720 # the waiter is still cleaned up if an exception is raised.
721 if traces:
722 for trace in traces:
723 await trace.send_connection_queued_start()
724 await fut
725 if traces:
726 for trace in traces:
727 await trace.send_connection_queued_end()
728 finally:
729 # pop the waiter from the queue if its still
730 # there and not already removed by _release_waiter
731 keyed_waiters.pop(fut, None)
732 if not self._waiters.get(key, True):
733 del self._waiters[key]
735 if self._available_connections(key) > 0:
736 break
737 attempts += 1
739 async def _get(
740 self, key: "ConnectionKey", traces: List["Trace"]
741 ) -> Optional[Connection]:
742 """Get next reusable connection for the key or None.
744 The connection will be marked as acquired.
745 """
746 if (conns := self._conns.get(key)) is None:
747 return None
749 t1 = monotonic()
750 while conns:
751 proto, t0 = conns.popleft()
752 # We will we reuse the connection if its connected and
753 # the keepalive timeout has not been exceeded
754 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout:
755 if not conns:
756 # The very last connection was reclaimed: drop the key
757 del self._conns[key]
758 self._acquired.add(proto)
759 if self._limit_per_host:
760 self._acquired_per_host[key].add(proto)
761 if traces:
762 for trace in traces:
763 try:
764 await trace.send_connection_reuseconn()
765 except BaseException:
766 self._release_acquired(key, proto)
767 raise
768 return Connection(self, key, proto, self._loop)
770 # Connection cannot be reused, close it
771 transport = proto.transport
772 proto.close()
773 # only for SSL transports
774 if not self._cleanup_closed_disabled and key.is_ssl:
775 self._cleanup_closed_transports.append(transport)
777 # No more connections: drop the key
778 del self._conns[key]
779 return None
781 def _release_waiter(self) -> None:
782 """
783 Iterates over all waiters until one to be released is found.
785 The one to be released is not finished and
786 belongs to a host that has available connections.
787 """
788 if not self._waiters:
789 return
791 # Having the dict keys ordered this avoids to iterate
792 # at the same order at each call.
793 queues = list(self._waiters)
794 random.shuffle(queues)
796 for key in queues:
797 if self._available_connections(key) < 1:
798 continue
800 waiters = self._waiters[key]
801 while waiters:
802 waiter, _ = waiters.popitem(last=False)
803 if not waiter.done():
804 waiter.set_result(None)
805 return
807 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
808 """Release acquired connection."""
809 if self._closed:
810 # acquired connection is already released on connector closing
811 return
813 self._acquired.discard(proto)
814 if self._limit_per_host and (conns := self._acquired_per_host.get(key)):
815 conns.discard(proto)
816 if not conns:
817 del self._acquired_per_host[key]
818 self._release_waiter()
820 def _release(
821 self,
822 key: "ConnectionKey",
823 protocol: ResponseHandler,
824 *,
825 should_close: bool = False,
826 ) -> None:
827 if self._closed:
828 # acquired connection is already released on connector closing
829 return
831 self._release_acquired(key, protocol)
833 if self._force_close or should_close or protocol.should_close:
834 transport = protocol.transport
835 protocol.close()
837 if key.is_ssl and not self._cleanup_closed_disabled:
838 self._cleanup_closed_transports.append(transport)
839 return
841 self._conns[key].append((protocol, monotonic()))
843 if self._cleanup_handle is None:
844 self._cleanup_handle = helpers.weakref_handle(
845 self,
846 "_cleanup",
847 self._keepalive_timeout,
848 self._loop,
849 timeout_ceil_threshold=self._timeout_ceil_threshold,
850 )
852 async def _create_connection(
853 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
854 ) -> ResponseHandler:
855 raise NotImplementedError()
858class _DNSCacheTable:
859 def __init__(self, ttl: Optional[float] = None) -> None:
860 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[ResolveResult], int]] = {}
861 self._timestamps: Dict[Tuple[str, int], float] = {}
862 self._ttl = ttl
864 def __contains__(self, host: object) -> bool:
865 return host in self._addrs_rr
867 def add(self, key: Tuple[str, int], addrs: List[ResolveResult]) -> None:
868 self._addrs_rr[key] = (cycle(addrs), len(addrs))
870 if self._ttl is not None:
871 self._timestamps[key] = monotonic()
873 def remove(self, key: Tuple[str, int]) -> None:
874 self._addrs_rr.pop(key, None)
876 if self._ttl is not None:
877 self._timestamps.pop(key, None)
879 def clear(self) -> None:
880 self._addrs_rr.clear()
881 self._timestamps.clear()
883 def next_addrs(self, key: Tuple[str, int]) -> List[ResolveResult]:
884 loop, length = self._addrs_rr[key]
885 addrs = list(islice(loop, length))
886 # Consume one more element to shift internal state of `cycle`
887 next(loop)
888 return addrs
890 def expired(self, key: Tuple[str, int]) -> bool:
891 if self._ttl is None:
892 return False
894 return self._timestamps[key] + self._ttl < monotonic()
897def _make_ssl_context(verified: bool) -> SSLContext:
898 """Create SSL context.
900 This method is not async-friendly and should be called from a thread
901 because it will load certificates from disk and do other blocking I/O.
902 """
903 if ssl is None:
904 # No ssl support
905 return None
906 if verified:
907 sslcontext = ssl.create_default_context()
908 else:
909 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
910 sslcontext.options |= ssl.OP_NO_SSLv2
911 sslcontext.options |= ssl.OP_NO_SSLv3
912 sslcontext.check_hostname = False
913 sslcontext.verify_mode = ssl.CERT_NONE
914 sslcontext.options |= ssl.OP_NO_COMPRESSION
915 sslcontext.set_default_verify_paths()
916 sslcontext.set_alpn_protocols(("http/1.1",))
917 return sslcontext
920# The default SSLContext objects are created at import time
921# since they do blocking I/O to load certificates from disk,
922# and imports should always be done before the event loop starts
923# or in a thread.
924_SSL_CONTEXT_VERIFIED = _make_ssl_context(True)
925_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False)
928class TCPConnector(BaseConnector):
929 """TCP connector.
931 verify_ssl - Set to True to check ssl certifications.
932 fingerprint - Pass the binary sha256
933 digest of the expected certificate in DER format to verify
934 that the certificate the server presents matches. See also
935 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning
936 resolver - Enable DNS lookups and use this
937 resolver
938 use_dns_cache - Use memory cache for DNS lookups.
939 ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
940 family - socket address family
941 local_addr - local tuple of (host, port) to bind socket to
943 keepalive_timeout - (optional) Keep-alive timeout.
944 force_close - Set to True to force close and do reconnect
945 after each request (and between redirects).
946 limit - The total number of simultaneous connections.
947 limit_per_host - Number of simultaneous connections to one host.
948 enable_cleanup_closed - Enables clean-up closed ssl transports.
949 Disabled by default.
950 happy_eyeballs_delay - This is the “Connection Attempt Delay”
951 as defined in RFC 8305. To disable
952 the happy eyeballs algorithm, set to None.
953 interleave - “First Address Family Count” as defined in RFC 8305
954 loop - Optional event loop.
955 socket_factory - A SocketFactoryType function that, if supplied,
956 will be used to create sockets given an
957 AddrInfoType.
958 ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0.
959 Grace period for SSL shutdown handshake on TLS
960 connections. Default is 0 seconds (immediate abort).
961 This parameter allowed for a clean SSL shutdown by
962 notifying the remote peer of connection closure,
963 while avoiding excessive delays during connector cleanup.
964 Note: Only takes effect on Python 3.11+.
965 """
967 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"})
969 def __init__(
970 self,
971 *,
972 verify_ssl: bool = True,
973 fingerprint: Optional[bytes] = None,
974 use_dns_cache: bool = True,
975 ttl_dns_cache: Optional[int] = 10,
976 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC,
977 ssl_context: Optional[SSLContext] = None,
978 ssl: Union[bool, Fingerprint, SSLContext] = True,
979 local_addr: Optional[Tuple[str, int]] = None,
980 resolver: Optional[AbstractResolver] = None,
981 keepalive_timeout: Union[None, float, object] = sentinel,
982 force_close: bool = False,
983 limit: int = 100,
984 limit_per_host: int = 0,
985 enable_cleanup_closed: bool = False,
986 loop: Optional[asyncio.AbstractEventLoop] = None,
987 timeout_ceil_threshold: float = 5,
988 happy_eyeballs_delay: Optional[float] = 0.25,
989 interleave: Optional[int] = None,
990 socket_factory: Optional[SocketFactoryType] = None,
991 ssl_shutdown_timeout: Union[_SENTINEL, None, float] = sentinel,
992 ):
993 super().__init__(
994 keepalive_timeout=keepalive_timeout,
995 force_close=force_close,
996 limit=limit,
997 limit_per_host=limit_per_host,
998 enable_cleanup_closed=enable_cleanup_closed,
999 loop=loop,
1000 timeout_ceil_threshold=timeout_ceil_threshold,
1001 )
1003 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
1005 self._resolver: AbstractResolver
1006 if resolver is None:
1007 self._resolver = DefaultResolver(loop=self._loop)
1008 self._resolver_owner = True
1009 else:
1010 self._resolver = resolver
1011 self._resolver_owner = False
1013 self._use_dns_cache = use_dns_cache
1014 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)
1015 self._throttle_dns_futures: Dict[
1016 Tuple[str, int], Set["asyncio.Future[None]"]
1017 ] = {}
1018 self._family = family
1019 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr)
1020 self._happy_eyeballs_delay = happy_eyeballs_delay
1021 self._interleave = interleave
1022 self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set()
1023 self._socket_factory = socket_factory
1024 self._ssl_shutdown_timeout: Optional[float]
1025 # Handle ssl_shutdown_timeout with warning for Python < 3.11
1026 if ssl_shutdown_timeout is sentinel:
1027 self._ssl_shutdown_timeout = 0
1028 else:
1029 # Deprecation warning for ssl_shutdown_timeout parameter
1030 warnings.warn(
1031 "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0",
1032 DeprecationWarning,
1033 stacklevel=2,
1034 )
1035 if (
1036 sys.version_info < (3, 11)
1037 and ssl_shutdown_timeout is not None
1038 and ssl_shutdown_timeout != 0
1039 ):
1040 warnings.warn(
1041 f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; "
1042 "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.",
1043 RuntimeWarning,
1044 stacklevel=2,
1045 )
1046 self._ssl_shutdown_timeout = ssl_shutdown_timeout
1048 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]:
1049 """Close all ongoing DNS calls."""
1050 for fut in chain.from_iterable(self._throttle_dns_futures.values()):
1051 fut.cancel()
1053 waiters = super()._close(abort_ssl=abort_ssl)
1055 for t in self._resolve_host_tasks:
1056 t.cancel()
1057 waiters.append(t)
1059 return waiters
1061 async def close(self, *, abort_ssl: bool = False) -> None:
1062 """
1063 Close all opened transports.
1065 :param abort_ssl: If True, SSL connections will be aborted immediately
1066 without performing the shutdown handshake. If False (default),
1067 the behavior is determined by ssl_shutdown_timeout:
1068 - If ssl_shutdown_timeout=0: connections are aborted
1069 - If ssl_shutdown_timeout>0: graceful shutdown is performed
1070 """
1071 if self._resolver_owner:
1072 await self._resolver.close()
1073 # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default
1074 await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0)
1076 @property
1077 def family(self) -> int:
1078 """Socket family like AF_INET."""
1079 return self._family
1081 @property
1082 def use_dns_cache(self) -> bool:
1083 """True if local DNS caching is enabled."""
1084 return self._use_dns_cache
1086 def clear_dns_cache(
1087 self, host: Optional[str] = None, port: Optional[int] = None
1088 ) -> None:
1089 """Remove specified host/port or clear all dns local cache."""
1090 if host is not None and port is not None:
1091 self._cached_hosts.remove((host, port))
1092 elif host is not None or port is not None:
1093 raise ValueError("either both host and port or none of them are allowed")
1094 else:
1095 self._cached_hosts.clear()
1097 async def _resolve_host(
1098 self, host: str, port: int, traces: Optional[Sequence["Trace"]] = None
1099 ) -> List[ResolveResult]:
1100 """Resolve host and return list of addresses."""
1101 if is_ip_address(host):
1102 return [
1103 {
1104 "hostname": host,
1105 "host": host,
1106 "port": port,
1107 "family": self._family,
1108 "proto": 0,
1109 "flags": 0,
1110 }
1111 ]
1113 if not self._use_dns_cache:
1115 if traces:
1116 for trace in traces:
1117 await trace.send_dns_resolvehost_start(host)
1119 res = await self._resolver.resolve(host, port, family=self._family)
1121 if traces:
1122 for trace in traces:
1123 await trace.send_dns_resolvehost_end(host)
1125 return res
1127 key = (host, port)
1128 if key in self._cached_hosts and not self._cached_hosts.expired(key):
1129 # get result early, before any await (#4014)
1130 result = self._cached_hosts.next_addrs(key)
1132 if traces:
1133 for trace in traces:
1134 await trace.send_dns_cache_hit(host)
1135 return result
1137 futures: Set["asyncio.Future[None]"]
1138 #
1139 # If multiple connectors are resolving the same host, we wait
1140 # for the first one to resolve and then use the result for all of them.
1141 # We use a throttle to ensure that we only resolve the host once
1142 # and then use the result for all the waiters.
1143 #
1144 if key in self._throttle_dns_futures:
1145 # get futures early, before any await (#4014)
1146 futures = self._throttle_dns_futures[key]
1147 future: asyncio.Future[None] = self._loop.create_future()
1148 futures.add(future)
1149 if traces:
1150 for trace in traces:
1151 await trace.send_dns_cache_hit(host)
1152 try:
1153 await future
1154 finally:
1155 futures.discard(future)
1156 return self._cached_hosts.next_addrs(key)
1158 # update dict early, before any await (#4014)
1159 self._throttle_dns_futures[key] = futures = set()
1160 # In this case we need to create a task to ensure that we can shield
1161 # the task from cancellation as cancelling this lookup should not cancel
1162 # the underlying lookup or else the cancel event will get broadcast to
1163 # all the waiters across all connections.
1164 #
1165 coro = self._resolve_host_with_throttle(key, host, port, futures, traces)
1166 loop = asyncio.get_running_loop()
1167 if sys.version_info >= (3, 12):
1168 # Optimization for Python 3.12, try to send immediately
1169 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True)
1170 else:
1171 resolved_host_task = loop.create_task(coro)
1173 if not resolved_host_task.done():
1174 self._resolve_host_tasks.add(resolved_host_task)
1175 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard)
1177 try:
1178 return await asyncio.shield(resolved_host_task)
1179 except asyncio.CancelledError:
1181 def drop_exception(fut: "asyncio.Future[List[ResolveResult]]") -> None:
1182 with suppress(Exception, asyncio.CancelledError):
1183 fut.result()
1185 resolved_host_task.add_done_callback(drop_exception)
1186 raise
1188 async def _resolve_host_with_throttle(
1189 self,
1190 key: Tuple[str, int],
1191 host: str,
1192 port: int,
1193 futures: Set["asyncio.Future[None]"],
1194 traces: Optional[Sequence["Trace"]],
1195 ) -> List[ResolveResult]:
1196 """Resolve host and set result for all waiters.
1198 This method must be run in a task and shielded from cancellation
1199 to avoid cancelling the underlying lookup.
1200 """
1201 try:
1202 if traces:
1203 for trace in traces:
1204 await trace.send_dns_cache_miss(host)
1206 for trace in traces:
1207 await trace.send_dns_resolvehost_start(host)
1209 addrs = await self._resolver.resolve(host, port, family=self._family)
1210 if traces:
1211 for trace in traces:
1212 await trace.send_dns_resolvehost_end(host)
1214 self._cached_hosts.add(key, addrs)
1215 for fut in futures:
1216 set_result(fut, None)
1217 except BaseException as e:
1218 # any DNS exception is set for the waiters to raise the same exception.
1219 # This coro is always run in task that is shielded from cancellation so
1220 # we should never be propagating cancellation here.
1221 for fut in futures:
1222 set_exception(fut, e)
1223 raise
1224 finally:
1225 self._throttle_dns_futures.pop(key)
1227 return self._cached_hosts.next_addrs(key)
1229 async def _create_connection(
1230 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1231 ) -> ResponseHandler:
1232 """Create connection.
1234 Has same keyword arguments as BaseEventLoop.create_connection.
1235 """
1236 if req.proxy:
1237 _, proto = await self._create_proxy_connection(req, traces, timeout)
1238 else:
1239 _, proto = await self._create_direct_connection(req, traces, timeout)
1241 return proto
1243 def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]:
1244 """Logic to get the correct SSL context
1246 0. if req.ssl is false, return None
1248 1. if ssl_context is specified in req, use it
1249 2. if _ssl_context is specified in self, use it
1250 3. otherwise:
1251 1. if verify_ssl is not specified in req, use self.ssl_context
1252 (will generate a default context according to self.verify_ssl)
1253 2. if verify_ssl is True in req, generate a default SSL context
1254 3. if verify_ssl is False in req, generate a SSL context that
1255 won't verify
1256 """
1257 if not req.is_ssl():
1258 return None
1260 if ssl is None: # pragma: no cover
1261 raise RuntimeError("SSL is not supported.")
1262 sslcontext = req.ssl
1263 if isinstance(sslcontext, ssl.SSLContext):
1264 return sslcontext
1265 if sslcontext is not True:
1266 # not verified or fingerprinted
1267 return _SSL_CONTEXT_UNVERIFIED
1268 sslcontext = self._ssl
1269 if isinstance(sslcontext, ssl.SSLContext):
1270 return sslcontext
1271 if sslcontext is not True:
1272 # not verified or fingerprinted
1273 return _SSL_CONTEXT_UNVERIFIED
1274 return _SSL_CONTEXT_VERIFIED
1276 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]:
1277 ret = req.ssl
1278 if isinstance(ret, Fingerprint):
1279 return ret
1280 ret = self._ssl
1281 if isinstance(ret, Fingerprint):
1282 return ret
1283 return None
1285 async def _wrap_create_connection(
1286 self,
1287 *args: Any,
1288 addr_infos: List[AddrInfoType],
1289 req: ClientRequest,
1290 timeout: "ClientTimeout",
1291 client_error: Type[Exception] = ClientConnectorError,
1292 **kwargs: Any,
1293 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1294 try:
1295 async with ceil_timeout(
1296 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1297 ):
1298 sock = await aiohappyeyeballs.start_connection(
1299 addr_infos=addr_infos,
1300 local_addr_infos=self._local_addr_infos,
1301 happy_eyeballs_delay=self._happy_eyeballs_delay,
1302 interleave=self._interleave,
1303 loop=self._loop,
1304 socket_factory=self._socket_factory,
1305 )
1306 # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used
1307 if (
1308 kwargs.get("ssl")
1309 and self._ssl_shutdown_timeout
1310 and sys.version_info >= (3, 11)
1311 ):
1312 kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout
1313 return await self._loop.create_connection(*args, **kwargs, sock=sock)
1314 except cert_errors as exc:
1315 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1316 except ssl_errors as exc:
1317 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1318 except OSError as exc:
1319 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1320 raise
1321 raise client_error(req.connection_key, exc) from exc
1323 async def _wrap_existing_connection(
1324 self,
1325 *args: Any,
1326 req: ClientRequest,
1327 timeout: "ClientTimeout",
1328 client_error: Type[Exception] = ClientConnectorError,
1329 **kwargs: Any,
1330 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1331 try:
1332 async with ceil_timeout(
1333 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1334 ):
1335 return await self._loop.create_connection(*args, **kwargs)
1336 except cert_errors as exc:
1337 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1338 except ssl_errors as exc:
1339 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1340 except OSError as exc:
1341 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1342 raise
1343 raise client_error(req.connection_key, exc) from exc
1345 def _fail_on_no_start_tls(self, req: "ClientRequest") -> None:
1346 """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``.
1348 It is necessary for TLS-in-TLS so that it is possible to
1349 send HTTPS queries through HTTPS proxies.
1351 This doesn't affect regular HTTP requests, though.
1352 """
1353 if not req.is_ssl():
1354 return
1356 proxy_url = req.proxy
1357 assert proxy_url is not None
1358 if proxy_url.scheme != "https":
1359 return
1361 self._check_loop_for_start_tls()
1363 def _check_loop_for_start_tls(self) -> None:
1364 try:
1365 self._loop.start_tls
1366 except AttributeError as attr_exc:
1367 raise RuntimeError(
1368 "An HTTPS request is being sent through an HTTPS proxy. "
1369 "This needs support for TLS in TLS but it is not implemented "
1370 "in your runtime for the stdlib asyncio.\n\n"
1371 "Please upgrade to Python 3.11 or higher. For more details, "
1372 "please see:\n"
1373 "* https://bugs.python.org/issue37179\n"
1374 "* https://github.com/python/cpython/pull/28073\n"
1375 "* https://docs.aiohttp.org/en/stable/"
1376 "client_advanced.html#proxy-support\n"
1377 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1378 ) from attr_exc
1380 def _loop_supports_start_tls(self) -> bool:
1381 try:
1382 self._check_loop_for_start_tls()
1383 except RuntimeError:
1384 return False
1385 else:
1386 return True
1388 def _warn_about_tls_in_tls(
1389 self,
1390 underlying_transport: asyncio.Transport,
1391 req: ClientRequest,
1392 ) -> None:
1393 """Issue a warning if the requested URL has HTTPS scheme."""
1394 if req.request_info.url.scheme != "https":
1395 return
1397 # Check if uvloop is being used, which supports TLS in TLS,
1398 # otherwise assume that asyncio's native transport is being used.
1399 if type(underlying_transport).__module__.startswith("uvloop"):
1400 return
1402 # Support in asyncio was added in Python 3.11 (bpo-44011)
1403 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr(
1404 underlying_transport,
1405 "_start_tls_compatible",
1406 False,
1407 )
1409 if asyncio_supports_tls_in_tls:
1410 return
1412 warnings.warn(
1413 "An HTTPS request is being sent through an HTTPS proxy. "
1414 "This support for TLS in TLS is known to be disabled "
1415 "in the stdlib asyncio (Python <3.11). This is why you'll probably see "
1416 "an error in the log below.\n\n"
1417 "It is possible to enable it via monkeypatching. "
1418 "For more details, see:\n"
1419 "* https://bugs.python.org/issue37179\n"
1420 "* https://github.com/python/cpython/pull/28073\n\n"
1421 "You can temporarily patch this as follows:\n"
1422 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
1423 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1424 RuntimeWarning,
1425 source=self,
1426 # Why `4`? At least 3 of the calls in the stack originate
1427 # from the methods in this class.
1428 stacklevel=3,
1429 )
1431 async def _start_tls_connection(
1432 self,
1433 underlying_transport: asyncio.Transport,
1434 req: ClientRequest,
1435 timeout: "ClientTimeout",
1436 client_error: Type[Exception] = ClientConnectorError,
1437 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1438 """Wrap the raw TCP transport with TLS."""
1439 tls_proto = self._factory() # Create a brand new proto for TLS
1440 sslcontext = self._get_ssl_context(req)
1441 if TYPE_CHECKING:
1442 # _start_tls_connection is unreachable in the current code path
1443 # if sslcontext is None.
1444 assert sslcontext is not None
1446 try:
1447 async with ceil_timeout(
1448 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1449 ):
1450 try:
1451 # ssl_shutdown_timeout is only available in Python 3.11+
1452 if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout:
1453 tls_transport = await self._loop.start_tls(
1454 underlying_transport,
1455 tls_proto,
1456 sslcontext,
1457 server_hostname=req.server_hostname or req.host,
1458 ssl_handshake_timeout=timeout.total,
1459 ssl_shutdown_timeout=self._ssl_shutdown_timeout,
1460 )
1461 else:
1462 tls_transport = await self._loop.start_tls(
1463 underlying_transport,
1464 tls_proto,
1465 sslcontext,
1466 server_hostname=req.server_hostname or req.host,
1467 ssl_handshake_timeout=timeout.total,
1468 )
1469 except BaseException:
1470 # We need to close the underlying transport since
1471 # `start_tls()` probably failed before it had a
1472 # chance to do this:
1473 if self._ssl_shutdown_timeout == 0:
1474 underlying_transport.abort()
1475 else:
1476 underlying_transport.close()
1477 raise
1478 if isinstance(tls_transport, asyncio.Transport):
1479 fingerprint = self._get_fingerprint(req)
1480 if fingerprint:
1481 try:
1482 fingerprint.check(tls_transport)
1483 except ServerFingerprintMismatch:
1484 tls_transport.close()
1485 if not self._cleanup_closed_disabled:
1486 self._cleanup_closed_transports.append(tls_transport)
1487 raise
1488 except cert_errors as exc:
1489 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1490 except ssl_errors as exc:
1491 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1492 except OSError as exc:
1493 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1494 raise
1495 raise client_error(req.connection_key, exc) from exc
1496 except TypeError as type_err:
1497 # Example cause looks like this:
1498 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
1499 # object at 0x7f760615e460> is not supported by start_tls()
1501 raise ClientConnectionError(
1502 "Cannot initialize a TLS-in-TLS connection to host "
1503 f"{req.host!s}:{req.port:d} through an underlying connection "
1504 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
1505 f"[{type_err!s}]"
1506 ) from type_err
1507 else:
1508 if tls_transport is None:
1509 msg = "Failed to start TLS (possibly caused by closing transport)"
1510 raise client_error(req.connection_key, OSError(msg))
1511 tls_proto.connection_made(
1512 tls_transport
1513 ) # Kick the state machine of the new TLS protocol
1515 return tls_transport, tls_proto
1517 def _convert_hosts_to_addr_infos(
1518 self, hosts: List[ResolveResult]
1519 ) -> List[AddrInfoType]:
1520 """Converts the list of hosts to a list of addr_infos.
1522 The list of hosts is the result of a DNS lookup. The list of
1523 addr_infos is the result of a call to `socket.getaddrinfo()`.
1524 """
1525 addr_infos: List[AddrInfoType] = []
1526 for hinfo in hosts:
1527 host = hinfo["host"]
1528 is_ipv6 = ":" in host
1529 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET
1530 if self._family and self._family != family:
1531 continue
1532 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"])
1533 addr_infos.append(
1534 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)
1535 )
1536 return addr_infos
1538 async def _create_direct_connection(
1539 self,
1540 req: ClientRequest,
1541 traces: List["Trace"],
1542 timeout: "ClientTimeout",
1543 *,
1544 client_error: Type[Exception] = ClientConnectorError,
1545 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1546 sslcontext = self._get_ssl_context(req)
1547 fingerprint = self._get_fingerprint(req)
1549 host = req.url.raw_host
1550 assert host is not None
1551 # Replace multiple trailing dots with a single one.
1552 # A trailing dot is only present for fully-qualified domain names.
1553 # See https://github.com/aio-libs/aiohttp/pull/7364.
1554 if host.endswith(".."):
1555 host = host.rstrip(".") + "."
1556 port = req.port
1557 assert port is not None
1558 try:
1559 # Cancelling this lookup should not cancel the underlying lookup
1560 # or else the cancel event will get broadcast to all the waiters
1561 # across all connections.
1562 hosts = await self._resolve_host(host, port, traces=traces)
1563 except OSError as exc:
1564 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1565 raise
1566 # in case of proxy it is not ClientProxyConnectionError
1567 # it is problem of resolving proxy ip itself
1568 raise ClientConnectorDNSError(req.connection_key, exc) from exc
1570 last_exc: Optional[Exception] = None
1571 addr_infos = self._convert_hosts_to_addr_infos(hosts)
1572 while addr_infos:
1573 # Strip trailing dots, certificates contain FQDN without dots.
1574 # See https://github.com/aio-libs/aiohttp/issues/3636
1575 server_hostname = (
1576 (req.server_hostname or host).rstrip(".") if sslcontext else None
1577 )
1579 try:
1580 transp, proto = await self._wrap_create_connection(
1581 self._factory,
1582 timeout=timeout,
1583 ssl=sslcontext,
1584 addr_infos=addr_infos,
1585 server_hostname=server_hostname,
1586 req=req,
1587 client_error=client_error,
1588 )
1589 except (ClientConnectorError, asyncio.TimeoutError) as exc:
1590 last_exc = exc
1591 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave)
1592 continue
1594 if req.is_ssl() and fingerprint:
1595 try:
1596 fingerprint.check(transp)
1597 except ServerFingerprintMismatch as exc:
1598 transp.close()
1599 if not self._cleanup_closed_disabled:
1600 self._cleanup_closed_transports.append(transp)
1601 last_exc = exc
1602 # Remove the bad peer from the list of addr_infos
1603 sock: socket.socket = transp.get_extra_info("socket")
1604 bad_peer = sock.getpeername()
1605 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer)
1606 continue
1608 return transp, proto
1609 else:
1610 assert last_exc is not None
1611 raise last_exc
1613 async def _create_proxy_connection(
1614 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1615 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1616 self._fail_on_no_start_tls(req)
1617 runtime_has_start_tls = self._loop_supports_start_tls()
1618 proxy_req = self._update_proxy_auth_header_and_build_proxy_req(req)
1620 # create connection to proxy server
1621 transport, proto = await self._create_direct_connection(
1622 proxy_req, [], timeout, client_error=ClientProxyConnectionError
1623 )
1625 if req.is_ssl():
1626 if runtime_has_start_tls:
1627 self._warn_about_tls_in_tls(transport, req)
1629 # For HTTPS requests over HTTP proxy
1630 # we must notify proxy to tunnel connection
1631 # so we send CONNECT command:
1632 # CONNECT www.python.org:443 HTTP/1.1
1633 # Host: www.python.org
1634 #
1635 # next we must do TLS handshake and so on
1636 # to do this we must wrap raw socket into secure one
1637 # asyncio handles this perfectly
1638 proxy_req.method = hdrs.METH_CONNECT
1639 proxy_req.url = req.url
1640 key = req.connection_key._replace(
1641 proxy=None, proxy_auth=None, proxy_headers_hash=None
1642 )
1643 conn = _ConnectTunnelConnection(self, key, proto, self._loop)
1644 proxy_resp = await proxy_req.send(conn)
1645 try:
1646 protocol = conn._protocol
1647 assert protocol is not None
1649 # read_until_eof=True will ensure the connection isn't closed
1650 # once the response is received and processed allowing
1651 # START_TLS to work on the connection below.
1652 protocol.set_response_params(
1653 read_until_eof=runtime_has_start_tls,
1654 timeout_ceil_threshold=self._timeout_ceil_threshold,
1655 )
1656 resp = await proxy_resp.start(conn)
1657 except BaseException:
1658 proxy_resp.close()
1659 conn.close()
1660 raise
1661 else:
1662 conn._protocol = None
1663 try:
1664 if resp.status != 200:
1665 message = resp.reason
1666 if message is None:
1667 message = HTTPStatus(resp.status).phrase
1668 raise ClientHttpProxyError(
1669 proxy_resp.request_info,
1670 resp.history,
1671 status=resp.status,
1672 message=message,
1673 headers=resp.headers,
1674 )
1675 if not runtime_has_start_tls:
1676 rawsock = transport.get_extra_info("socket", default=None)
1677 if rawsock is None:
1678 raise RuntimeError(
1679 "Transport does not expose socket instance"
1680 )
1681 # Duplicate the socket, so now we can close proxy transport
1682 rawsock = rawsock.dup()
1683 except BaseException:
1684 # It shouldn't be closed in `finally` because it's fed to
1685 # `loop.start_tls()` and the docs say not to touch it after
1686 # passing there.
1687 transport.close()
1688 raise
1689 finally:
1690 if not runtime_has_start_tls:
1691 transport.close()
1693 if not runtime_has_start_tls:
1694 # HTTP proxy with support for upgrade to HTTPS
1695 sslcontext = self._get_ssl_context(req)
1696 return await self._wrap_existing_connection(
1697 self._factory,
1698 timeout=timeout,
1699 ssl=sslcontext,
1700 sock=rawsock,
1701 server_hostname=req.host,
1702 req=req,
1703 )
1705 return await self._start_tls_connection(
1706 # Access the old transport for the last time before it's
1707 # closed and forgotten forever:
1708 transport,
1709 req=req,
1710 timeout=timeout,
1711 )
1712 finally:
1713 proxy_resp.close()
1715 return transport, proto
1718class UnixConnector(BaseConnector):
1719 """Unix socket connector.
1721 path - Unix socket path.
1722 keepalive_timeout - (optional) Keep-alive timeout.
1723 force_close - Set to True to force close and do reconnect
1724 after each request (and between redirects).
1725 limit - The total number of simultaneous connections.
1726 limit_per_host - Number of simultaneous connections to one host.
1727 loop - Optional event loop.
1728 """
1730 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"})
1732 def __init__(
1733 self,
1734 path: str,
1735 force_close: bool = False,
1736 keepalive_timeout: Union[object, float, None] = sentinel,
1737 limit: int = 100,
1738 limit_per_host: int = 0,
1739 loop: Optional[asyncio.AbstractEventLoop] = None,
1740 ) -> None:
1741 super().__init__(
1742 force_close=force_close,
1743 keepalive_timeout=keepalive_timeout,
1744 limit=limit,
1745 limit_per_host=limit_per_host,
1746 loop=loop,
1747 )
1748 self._path = path
1750 @property
1751 def path(self) -> str:
1752 """Path to unix socket."""
1753 return self._path
1755 async def _create_connection(
1756 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1757 ) -> ResponseHandler:
1758 try:
1759 async with ceil_timeout(
1760 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1761 ):
1762 _, proto = await self._loop.create_unix_connection(
1763 self._factory, self._path
1764 )
1765 except OSError as exc:
1766 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1767 raise
1768 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
1770 return proto
1773class NamedPipeConnector(BaseConnector):
1774 """Named pipe connector.
1776 Only supported by the proactor event loop.
1777 See also: https://docs.python.org/3/library/asyncio-eventloop.html
1779 path - Windows named pipe path.
1780 keepalive_timeout - (optional) Keep-alive timeout.
1781 force_close - Set to True to force close and do reconnect
1782 after each request (and between redirects).
1783 limit - The total number of simultaneous connections.
1784 limit_per_host - Number of simultaneous connections to one host.
1785 loop - Optional event loop.
1786 """
1788 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"})
1790 def __init__(
1791 self,
1792 path: str,
1793 force_close: bool = False,
1794 keepalive_timeout: Union[object, float, None] = sentinel,
1795 limit: int = 100,
1796 limit_per_host: int = 0,
1797 loop: Optional[asyncio.AbstractEventLoop] = None,
1798 ) -> None:
1799 super().__init__(
1800 force_close=force_close,
1801 keepalive_timeout=keepalive_timeout,
1802 limit=limit,
1803 limit_per_host=limit_per_host,
1804 loop=loop,
1805 )
1806 if not isinstance(
1807 self._loop,
1808 asyncio.ProactorEventLoop, # type: ignore[attr-defined]
1809 ):
1810 raise RuntimeError(
1811 "Named Pipes only available in proactor loop under windows"
1812 )
1813 self._path = path
1815 @property
1816 def path(self) -> str:
1817 """Path to the named pipe."""
1818 return self._path
1820 async def _create_connection(
1821 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1822 ) -> ResponseHandler:
1823 try:
1824 async with ceil_timeout(
1825 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1826 ):
1827 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined]
1828 self._factory, self._path
1829 )
1830 # the drain is required so that the connection_made is called
1831 # and transport is set otherwise it is not set before the
1832 # `assert conn.transport is not None`
1833 # in client.py's _request method
1834 await asyncio.sleep(0)
1835 # other option is to manually set transport like
1836 # `proto.transport = trans`
1837 except OSError as exc:
1838 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1839 raise
1840 raise ClientConnectorError(req.connection_key, exc) from exc
1842 return cast(ResponseHandler, proto)