Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/connector.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import functools
3import random
4import socket
5import sys
6import traceback
7import warnings
8from collections import OrderedDict, defaultdict, deque
9from contextlib import suppress
10from http import HTTPStatus
11from itertools import chain, cycle, islice
12from time import monotonic
13from types import TracebackType
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Awaitable,
18 Callable,
19 DefaultDict,
20 Deque,
21 Dict,
22 Iterator,
23 List,
24 Literal,
25 Optional,
26 Sequence,
27 Set,
28 Tuple,
29 Type,
30 Union,
31 cast,
32)
34import aiohappyeyeballs
35from aiohappyeyeballs import AddrInfoType, SocketFactoryType
37from . import hdrs, helpers
38from .abc import AbstractResolver, ResolveResult
39from .client_exceptions import (
40 ClientConnectionError,
41 ClientConnectorCertificateError,
42 ClientConnectorDNSError,
43 ClientConnectorError,
44 ClientConnectorSSLError,
45 ClientHttpProxyError,
46 ClientProxyConnectionError,
47 ServerFingerprintMismatch,
48 UnixClientConnectorError,
49 cert_errors,
50 ssl_errors,
51)
52from .client_proto import ResponseHandler
53from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params
54from .helpers import (
55 _SENTINEL,
56 ceil_timeout,
57 is_ip_address,
58 noop,
59 sentinel,
60 set_exception,
61 set_result,
62)
63from .log import client_logger
64from .resolver import DefaultResolver
66if sys.version_info >= (3, 12):
67 from collections.abc import Buffer
68else:
69 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
71if TYPE_CHECKING:
72 import ssl
74 SSLContext = ssl.SSLContext
75else:
76 try:
77 import ssl
79 SSLContext = ssl.SSLContext
80 except ImportError: # pragma: no cover
81 ssl = None # type: ignore[assignment]
82 SSLContext = object # type: ignore[misc,assignment]
84EMPTY_SCHEMA_SET = frozenset({""})
85HTTP_SCHEMA_SET = frozenset({"http", "https"})
86WS_SCHEMA_SET = frozenset({"ws", "wss"})
88HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET
89HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET
91NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < (
92 3,
93 13,
94 1,
95) or sys.version_info < (3, 12, 8)
96# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960
97# which first appeared in Python 3.12.8 and 3.13.1
100__all__ = (
101 "BaseConnector",
102 "TCPConnector",
103 "UnixConnector",
104 "NamedPipeConnector",
105 "AddrInfoType",
106 "SocketFactoryType",
107)
110if TYPE_CHECKING:
111 from .client import ClientTimeout
112 from .client_reqrep import ConnectionKey
113 from .tracing import Trace
116class _DeprecationWaiter:
117 __slots__ = ("_awaitable", "_awaited")
119 def __init__(self, awaitable: Awaitable[Any]) -> None:
120 self._awaitable = awaitable
121 self._awaited = False
123 def __await__(self) -> Any:
124 self._awaited = True
125 return self._awaitable.__await__()
127 def __del__(self) -> None:
128 if not self._awaited:
129 warnings.warn(
130 "Connector.close() is a coroutine, "
131 "please use await connector.close()",
132 DeprecationWarning,
133 )
136async def _wait_for_close(waiters: List[Awaitable[object]]) -> None:
137 """Wait for all waiters to finish closing."""
138 results = await asyncio.gather(*waiters, return_exceptions=True)
139 for res in results:
140 if isinstance(res, Exception):
141 client_logger.debug("Error while closing connector: %r", res)
144class Connection:
146 _source_traceback = None
148 def __init__(
149 self,
150 connector: "BaseConnector",
151 key: "ConnectionKey",
152 protocol: ResponseHandler,
153 loop: asyncio.AbstractEventLoop,
154 ) -> None:
155 self._key = key
156 self._connector = connector
157 self._loop = loop
158 self._protocol: Optional[ResponseHandler] = protocol
159 self._callbacks: List[Callable[[], None]] = []
161 if loop.get_debug():
162 self._source_traceback = traceback.extract_stack(sys._getframe(1))
164 def __repr__(self) -> str:
165 return f"Connection<{self._key}>"
167 def __del__(self, _warnings: Any = warnings) -> None:
168 if self._protocol is not None:
169 kwargs = {"source": self}
170 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs)
171 if self._loop.is_closed():
172 return
174 self._connector._release(self._key, self._protocol, should_close=True)
176 context = {"client_connection": self, "message": "Unclosed connection"}
177 if self._source_traceback is not None:
178 context["source_traceback"] = self._source_traceback
179 self._loop.call_exception_handler(context)
181 def __bool__(self) -> Literal[True]:
182 """Force subclasses to not be falsy, to make checks simpler."""
183 return True
185 @property
186 def loop(self) -> asyncio.AbstractEventLoop:
187 warnings.warn(
188 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2
189 )
190 return self._loop
192 @property
193 def transport(self) -> Optional[asyncio.Transport]:
194 if self._protocol is None:
195 return None
196 return self._protocol.transport
198 @property
199 def protocol(self) -> Optional[ResponseHandler]:
200 return self._protocol
202 def add_callback(self, callback: Callable[[], None]) -> None:
203 if callback is not None:
204 self._callbacks.append(callback)
206 def _notify_release(self) -> None:
207 callbacks, self._callbacks = self._callbacks[:], []
209 for cb in callbacks:
210 with suppress(Exception):
211 cb()
213 def close(self) -> None:
214 self._notify_release()
216 if self._protocol is not None:
217 self._connector._release(self._key, self._protocol, should_close=True)
218 self._protocol = None
220 def release(self) -> None:
221 self._notify_release()
223 if self._protocol is not None:
224 self._connector._release(self._key, self._protocol)
225 self._protocol = None
227 @property
228 def closed(self) -> bool:
229 return self._protocol is None or not self._protocol.is_connected()
232class _ConnectTunnelConnection(Connection):
233 """Special connection wrapper for CONNECT tunnels that must never be pooled.
235 This connection wraps the proxy connection that will be upgraded with TLS.
236 It must never be released to the pool because:
237 1. Its 'closed' future will never complete, causing session.close() to hang
238 2. It represents an intermediate state, not a reusable connection
239 3. The real connection (with TLS) will be created separately
240 """
242 def release(self) -> None:
243 """Do nothing - don't pool or close the connection.
245 These connections are an intermediate state during the CONNECT tunnel
246 setup and will be cleaned up naturally after the TLS upgrade. If they
247 were to be pooled, they would never be properly closed, causing
248 session.close() to wait forever for their 'closed' future.
249 """
252class _TransportPlaceholder:
253 """placeholder for BaseConnector.connect function"""
255 __slots__ = ("closed", "transport")
257 def __init__(self, closed_future: asyncio.Future[Optional[Exception]]) -> None:
258 """Initialize a placeholder for a transport."""
259 self.closed = closed_future
260 self.transport = None
262 def close(self) -> None:
263 """Close the placeholder."""
265 def abort(self) -> None:
266 """Abort the placeholder (does nothing)."""
269class BaseConnector:
270 """Base connector class.
272 keepalive_timeout - (optional) Keep-alive timeout.
273 force_close - Set to True to force close and do reconnect
274 after each request (and between redirects).
275 limit - The total number of simultaneous connections.
276 limit_per_host - Number of simultaneous connections to one host.
277 enable_cleanup_closed - Enables clean-up closed ssl transports.
278 Disabled by default.
279 timeout_ceil_threshold - Trigger ceiling of timeout values when
280 it's above timeout_ceil_threshold.
281 loop - Optional event loop.
282 """
284 _closed = True # prevent AttributeError in __del__ if ctor was failed
285 _source_traceback = None
287 # abort transport after 2 seconds (cleanup broken connections)
288 _cleanup_closed_period = 2.0
290 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET
292 def __init__(
293 self,
294 *,
295 keepalive_timeout: Union[object, None, float] = sentinel,
296 force_close: bool = False,
297 limit: int = 100,
298 limit_per_host: int = 0,
299 enable_cleanup_closed: bool = False,
300 loop: Optional[asyncio.AbstractEventLoop] = None,
301 timeout_ceil_threshold: float = 5,
302 ) -> None:
304 if force_close:
305 if keepalive_timeout is not None and keepalive_timeout is not sentinel:
306 raise ValueError(
307 "keepalive_timeout cannot be set if force_close is True"
308 )
309 else:
310 if keepalive_timeout is sentinel:
311 keepalive_timeout = 15.0
313 loop = loop or asyncio.get_running_loop()
314 self._timeout_ceil_threshold = timeout_ceil_threshold
316 self._closed = False
317 if loop.get_debug():
318 self._source_traceback = traceback.extract_stack(sys._getframe(1))
320 # Connection pool of reusable connections.
321 # We use a deque to store connections because it has O(1) popleft()
322 # and O(1) append() operations to implement a FIFO queue.
323 self._conns: DefaultDict[
324 ConnectionKey, Deque[Tuple[ResponseHandler, float]]
325 ] = defaultdict(deque)
326 self._limit = limit
327 self._limit_per_host = limit_per_host
328 self._acquired: Set[ResponseHandler] = set()
329 self._acquired_per_host: DefaultDict[ConnectionKey, Set[ResponseHandler]] = (
330 defaultdict(set)
331 )
332 self._keepalive_timeout = cast(float, keepalive_timeout)
333 self._force_close = force_close
335 # {host_key: FIFO list of waiters}
336 # The FIFO is implemented with an OrderedDict with None keys because
337 # python does not have an ordered set.
338 self._waiters: DefaultDict[
339 ConnectionKey, OrderedDict[asyncio.Future[None], None]
340 ] = defaultdict(OrderedDict)
342 self._loop = loop
343 self._factory = functools.partial(ResponseHandler, loop=loop)
345 # start keep-alive connection cleanup task
346 self._cleanup_handle: Optional[asyncio.TimerHandle] = None
348 # start cleanup closed transports task
349 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None
351 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED:
352 warnings.warn(
353 "enable_cleanup_closed ignored because "
354 "https://github.com/python/cpython/pull/118960 is fixed "
355 f"in Python version {sys.version_info}",
356 DeprecationWarning,
357 stacklevel=2,
358 )
359 enable_cleanup_closed = False
361 self._cleanup_closed_disabled = not enable_cleanup_closed
362 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = []
363 self._placeholder_future: asyncio.Future[Optional[Exception]] = (
364 loop.create_future()
365 )
366 self._placeholder_future.set_result(None)
367 self._cleanup_closed()
369 def __del__(self, _warnings: Any = warnings) -> None:
370 if self._closed:
371 return
372 if not self._conns:
373 return
375 conns = [repr(c) for c in self._conns.values()]
377 self._close()
379 kwargs = {"source": self}
380 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs)
381 context = {
382 "connector": self,
383 "connections": conns,
384 "message": "Unclosed connector",
385 }
386 if self._source_traceback is not None:
387 context["source_traceback"] = self._source_traceback
388 self._loop.call_exception_handler(context)
390 def __enter__(self) -> "BaseConnector":
391 warnings.warn(
392 '"with Connector():" is deprecated, '
393 'use "async with Connector():" instead',
394 DeprecationWarning,
395 )
396 return self
398 def __exit__(self, *exc: Any) -> None:
399 self._close()
401 async def __aenter__(self) -> "BaseConnector":
402 return self
404 async def __aexit__(
405 self,
406 exc_type: Optional[Type[BaseException]] = None,
407 exc_value: Optional[BaseException] = None,
408 exc_traceback: Optional[TracebackType] = None,
409 ) -> None:
410 await self.close()
412 @property
413 def force_close(self) -> bool:
414 """Ultimately close connection on releasing if True."""
415 return self._force_close
417 @property
418 def limit(self) -> int:
419 """The total number for simultaneous connections.
421 If limit is 0 the connector has no limit.
422 The default limit size is 100.
423 """
424 return self._limit
426 @property
427 def limit_per_host(self) -> int:
428 """The limit for simultaneous connections to the same endpoint.
430 Endpoints are the same if they are have equal
431 (host, port, is_ssl) triple.
432 """
433 return self._limit_per_host
435 def _cleanup(self) -> None:
436 """Cleanup unused transports."""
437 if self._cleanup_handle:
438 self._cleanup_handle.cancel()
439 # _cleanup_handle should be unset, otherwise _release() will not
440 # recreate it ever!
441 self._cleanup_handle = None
443 now = monotonic()
444 timeout = self._keepalive_timeout
446 if self._conns:
447 connections = defaultdict(deque)
448 deadline = now - timeout
449 for key, conns in self._conns.items():
450 alive: Deque[Tuple[ResponseHandler, float]] = deque()
451 for proto, use_time in conns:
452 if proto.is_connected() and use_time - deadline >= 0:
453 alive.append((proto, use_time))
454 continue
455 transport = proto.transport
456 proto.close()
457 if not self._cleanup_closed_disabled and key.is_ssl:
458 self._cleanup_closed_transports.append(transport)
460 if alive:
461 connections[key] = alive
463 self._conns = connections
465 if self._conns:
466 self._cleanup_handle = helpers.weakref_handle(
467 self,
468 "_cleanup",
469 timeout,
470 self._loop,
471 timeout_ceil_threshold=self._timeout_ceil_threshold,
472 )
474 def _cleanup_closed(self) -> None:
475 """Double confirmation for transport close.
477 Some broken ssl servers may leave socket open without proper close.
478 """
479 if self._cleanup_closed_handle:
480 self._cleanup_closed_handle.cancel()
482 for transport in self._cleanup_closed_transports:
483 if transport is not None:
484 transport.abort()
486 self._cleanup_closed_transports = []
488 if not self._cleanup_closed_disabled:
489 self._cleanup_closed_handle = helpers.weakref_handle(
490 self,
491 "_cleanup_closed",
492 self._cleanup_closed_period,
493 self._loop,
494 timeout_ceil_threshold=self._timeout_ceil_threshold,
495 )
497 def close(self, *, abort_ssl: bool = False) -> Awaitable[None]:
498 """Close all opened transports.
500 :param abort_ssl: If True, SSL connections will be aborted immediately
501 without performing the shutdown handshake. This provides
502 faster cleanup at the cost of less graceful disconnection.
503 """
504 if not (waiters := self._close(abort_ssl=abort_ssl)):
505 # If there are no connections to close, we can return a noop
506 # awaitable to avoid scheduling a task on the event loop.
507 return _DeprecationWaiter(noop())
508 coro = _wait_for_close(waiters)
509 if sys.version_info >= (3, 12):
510 # Optimization for Python 3.12, try to close connections
511 # immediately to avoid having to schedule the task on the event loop.
512 task = asyncio.Task(coro, loop=self._loop, eager_start=True)
513 else:
514 task = self._loop.create_task(coro)
515 return _DeprecationWaiter(task)
517 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]:
518 waiters: List[Awaitable[object]] = []
520 if self._closed:
521 return waiters
523 self._closed = True
525 try:
526 if self._loop.is_closed():
527 return waiters
529 # cancel cleanup task
530 if self._cleanup_handle:
531 self._cleanup_handle.cancel()
533 # cancel cleanup close task
534 if self._cleanup_closed_handle:
535 self._cleanup_closed_handle.cancel()
537 for data in self._conns.values():
538 for proto, _ in data:
539 if (
540 abort_ssl
541 and proto.transport
542 and proto.transport.get_extra_info("sslcontext") is not None
543 ):
544 proto.abort()
545 else:
546 proto.close()
547 if closed := proto.closed:
548 waiters.append(closed)
550 for proto in self._acquired:
551 if (
552 abort_ssl
553 and proto.transport
554 and proto.transport.get_extra_info("sslcontext") is not None
555 ):
556 proto.abort()
557 else:
558 proto.close()
559 if closed := proto.closed:
560 waiters.append(closed)
562 for transport in self._cleanup_closed_transports:
563 if transport is not None:
564 transport.abort()
566 return waiters
568 finally:
569 self._conns.clear()
570 self._acquired.clear()
571 for keyed_waiters in self._waiters.values():
572 for keyed_waiter in keyed_waiters:
573 keyed_waiter.cancel()
574 self._waiters.clear()
575 self._cleanup_handle = None
576 self._cleanup_closed_transports.clear()
577 self._cleanup_closed_handle = None
579 @property
580 def closed(self) -> bool:
581 """Is connector closed.
583 A readonly property.
584 """
585 return self._closed
587 def _available_connections(self, key: "ConnectionKey") -> int:
588 """
589 Return number of available connections.
591 The limit, limit_per_host and the connection key are taken into account.
593 If it returns less than 1 means that there are no connections
594 available.
595 """
596 # check total available connections
597 # If there are no limits, this will always return 1
598 total_remain = 1
600 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0:
601 return total_remain
603 # check limit per host
604 if host_remain := self._limit_per_host:
605 if acquired := self._acquired_per_host.get(key):
606 host_remain -= len(acquired)
607 if total_remain > host_remain:
608 return host_remain
610 return total_remain
612 def _update_proxy_auth_header_and_build_proxy_req(
613 self, req: ClientRequest
614 ) -> ClientRequest:
615 """Set Proxy-Authorization header for non-SSL proxy requests and builds the proxy request for SSL proxy requests."""
616 url = req.proxy
617 assert url is not None
618 headers: Dict[str, str] = {}
619 if req.proxy_headers is not None:
620 headers = req.proxy_headers # type: ignore[assignment]
621 headers[hdrs.HOST] = req.headers[hdrs.HOST]
622 proxy_req = ClientRequest(
623 hdrs.METH_GET,
624 url,
625 headers=headers,
626 auth=req.proxy_auth,
627 loop=self._loop,
628 ssl=req.ssl,
629 )
630 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
631 if auth is not None:
632 if not req.is_ssl():
633 req.headers[hdrs.PROXY_AUTHORIZATION] = auth
634 else:
635 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
636 return proxy_req
638 async def connect(
639 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
640 ) -> Connection:
641 """Get from pool or create new connection."""
642 key = req.connection_key
643 if (conn := await self._get(key, traces)) is not None:
644 # If we do not have to wait and we can get a connection from the pool
645 # we can avoid the timeout ceil logic and directly return the connection
646 if req.proxy:
647 self._update_proxy_auth_header_and_build_proxy_req(req)
648 return conn
650 async with ceil_timeout(timeout.connect, timeout.ceil_threshold):
651 if self._available_connections(key) <= 0:
652 await self._wait_for_available_connection(key, traces)
653 if (conn := await self._get(key, traces)) is not None:
654 if req.proxy:
655 self._update_proxy_auth_header_and_build_proxy_req(req)
656 return conn
658 placeholder = cast(
659 ResponseHandler, _TransportPlaceholder(self._placeholder_future)
660 )
661 self._acquired.add(placeholder)
662 if self._limit_per_host:
663 self._acquired_per_host[key].add(placeholder)
665 try:
666 # Traces are done inside the try block to ensure that the
667 # that the placeholder is still cleaned up if an exception
668 # is raised.
669 if traces:
670 for trace in traces:
671 await trace.send_connection_create_start()
672 proto = await self._create_connection(req, traces, timeout)
673 if traces:
674 for trace in traces:
675 await trace.send_connection_create_end()
676 except BaseException:
677 self._release_acquired(key, placeholder)
678 raise
679 else:
680 if self._closed:
681 proto.close()
682 raise ClientConnectionError("Connector is closed.")
684 # The connection was successfully created, drop the placeholder
685 # and add the real connection to the acquired set. There should
686 # be no awaits after the proto is added to the acquired set
687 # to ensure that the connection is not left in the acquired set
688 # on cancellation.
689 self._acquired.remove(placeholder)
690 self._acquired.add(proto)
691 if self._limit_per_host:
692 acquired_per_host = self._acquired_per_host[key]
693 acquired_per_host.remove(placeholder)
694 acquired_per_host.add(proto)
695 return Connection(self, key, proto, self._loop)
697 async def _wait_for_available_connection(
698 self, key: "ConnectionKey", traces: List["Trace"]
699 ) -> None:
700 """Wait for an available connection slot."""
701 # We loop here because there is a race between
702 # the connection limit check and the connection
703 # being acquired. If the connection is acquired
704 # between the check and the await statement, we
705 # need to loop again to check if the connection
706 # slot is still available.
707 attempts = 0
708 while True:
709 fut: asyncio.Future[None] = self._loop.create_future()
710 keyed_waiters = self._waiters[key]
711 keyed_waiters[fut] = None
712 if attempts:
713 # If we have waited before, we need to move the waiter
714 # to the front of the queue as otherwise we might get
715 # starved and hit the timeout.
716 keyed_waiters.move_to_end(fut, last=False)
718 try:
719 # Traces happen in the try block to ensure that the
720 # the waiter is still cleaned up if an exception is raised.
721 if traces:
722 for trace in traces:
723 await trace.send_connection_queued_start()
724 await fut
725 if traces:
726 for trace in traces:
727 await trace.send_connection_queued_end()
728 finally:
729 # pop the waiter from the queue if its still
730 # there and not already removed by _release_waiter
731 keyed_waiters.pop(fut, None)
732 if not self._waiters.get(key, True):
733 del self._waiters[key]
735 if self._available_connections(key) > 0:
736 break
737 attempts += 1
739 async def _get(
740 self, key: "ConnectionKey", traces: List["Trace"]
741 ) -> Optional[Connection]:
742 """Get next reusable connection for the key or None.
744 The connection will be marked as acquired.
745 """
746 if (conns := self._conns.get(key)) is None:
747 return None
749 t1 = monotonic()
750 while conns:
751 proto, t0 = conns.popleft()
752 # We will we reuse the connection if its connected and
753 # the keepalive timeout has not been exceeded
754 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout:
755 if not conns:
756 # The very last connection was reclaimed: drop the key
757 del self._conns[key]
758 self._acquired.add(proto)
759 if self._limit_per_host:
760 self._acquired_per_host[key].add(proto)
761 if traces:
762 for trace in traces:
763 try:
764 await trace.send_connection_reuseconn()
765 except BaseException:
766 self._release_acquired(key, proto)
767 raise
768 return Connection(self, key, proto, self._loop)
770 # Connection cannot be reused, close it
771 transport = proto.transport
772 proto.close()
773 # only for SSL transports
774 if not self._cleanup_closed_disabled and key.is_ssl:
775 self._cleanup_closed_transports.append(transport)
777 # No more connections: drop the key
778 del self._conns[key]
779 return None
781 def _release_waiter(self) -> None:
782 """
783 Iterates over all waiters until one to be released is found.
785 The one to be released is not finished and
786 belongs to a host that has available connections.
787 """
788 if not self._waiters:
789 return
791 # Having the dict keys ordered this avoids to iterate
792 # at the same order at each call.
793 queues = list(self._waiters)
794 random.shuffle(queues)
796 for key in queues:
797 if self._available_connections(key) < 1:
798 continue
800 waiters = self._waiters[key]
801 while waiters:
802 waiter, _ = waiters.popitem(last=False)
803 if not waiter.done():
804 waiter.set_result(None)
805 return
807 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
808 """Release acquired connection."""
809 if self._closed:
810 # acquired connection is already released on connector closing
811 return
813 self._acquired.discard(proto)
814 if self._limit_per_host and (conns := self._acquired_per_host.get(key)):
815 conns.discard(proto)
816 if not conns:
817 del self._acquired_per_host[key]
818 self._release_waiter()
820 def _release(
821 self,
822 key: "ConnectionKey",
823 protocol: ResponseHandler,
824 *,
825 should_close: bool = False,
826 ) -> None:
827 if self._closed:
828 # acquired connection is already released on connector closing
829 return
831 self._release_acquired(key, protocol)
833 if self._force_close or should_close or protocol.should_close:
834 transport = protocol.transport
835 protocol.close()
837 if key.is_ssl and not self._cleanup_closed_disabled:
838 self._cleanup_closed_transports.append(transport)
839 return
841 self._conns[key].append((protocol, monotonic()))
843 if self._cleanup_handle is None:
844 self._cleanup_handle = helpers.weakref_handle(
845 self,
846 "_cleanup",
847 self._keepalive_timeout,
848 self._loop,
849 timeout_ceil_threshold=self._timeout_ceil_threshold,
850 )
852 async def _create_connection(
853 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
854 ) -> ResponseHandler:
855 raise NotImplementedError()
858class _DNSCacheTable:
859 def __init__(self, ttl: Optional[float] = None, max_size: int = 1000) -> None:
860 self._addrs_rr: OrderedDict[
861 Tuple[str, int], Tuple[Iterator[ResolveResult], int]
862 ] = OrderedDict()
863 self._timestamps: Dict[Tuple[str, int], float] = {}
864 self._ttl = ttl
865 self._max_size = max_size
867 def __contains__(self, host: object) -> bool:
868 return host in self._addrs_rr
870 def add(self, key: Tuple[str, int], addrs: List[ResolveResult]) -> None:
871 if key in self._addrs_rr:
872 self._addrs_rr.move_to_end(key)
874 self._addrs_rr[key] = (cycle(addrs), len(addrs))
876 if self._ttl is not None:
877 self._timestamps[key] = monotonic()
879 if len(self._addrs_rr) > self._max_size:
880 oldest_key, _ = self._addrs_rr.popitem(last=False)
881 self._timestamps.pop(oldest_key, None)
883 def remove(self, key: Tuple[str, int]) -> None:
884 self._addrs_rr.pop(key, None)
885 self._timestamps.pop(key, None)
887 def clear(self) -> None:
888 self._addrs_rr.clear()
889 self._timestamps.clear()
891 def next_addrs(self, key: Tuple[str, int]) -> List[ResolveResult]:
892 loop, length = self._addrs_rr[key]
893 addrs = list(islice(loop, length))
894 # Consume one more element to shift internal state of `cycle`
895 next(loop)
896 self._addrs_rr.move_to_end(key)
897 return addrs
899 def expired(self, key: Tuple[str, int]) -> bool:
900 if self._ttl is None:
901 return False
903 return self._timestamps[key] + self._ttl < monotonic()
906def _make_ssl_context(verified: bool) -> SSLContext:
907 """Create SSL context.
909 This method is not async-friendly and should be called from a thread
910 because it will load certificates from disk and do other blocking I/O.
911 """
912 if ssl is None:
913 # No ssl support
914 return None
915 if verified:
916 sslcontext = ssl.create_default_context()
917 else:
918 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
919 sslcontext.options |= ssl.OP_NO_SSLv2
920 sslcontext.options |= ssl.OP_NO_SSLv3
921 sslcontext.check_hostname = False
922 sslcontext.verify_mode = ssl.CERT_NONE
923 sslcontext.options |= ssl.OP_NO_COMPRESSION
924 sslcontext.set_default_verify_paths()
925 sslcontext.set_alpn_protocols(("http/1.1",))
926 return sslcontext
929# The default SSLContext objects are created at import time
930# since they do blocking I/O to load certificates from disk,
931# and imports should always be done before the event loop starts
932# or in a thread.
933_SSL_CONTEXT_VERIFIED = _make_ssl_context(True)
934_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False)
937class TCPConnector(BaseConnector):
938 """TCP connector.
940 verify_ssl - Set to True to check ssl certifications.
941 fingerprint - Pass the binary sha256
942 digest of the expected certificate in DER format to verify
943 that the certificate the server presents matches. See also
944 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning
945 resolver - Enable DNS lookups and use this
946 resolver
947 use_dns_cache - Use memory cache for DNS lookups.
948 ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
949 family - socket address family
950 local_addr - local tuple of (host, port) to bind socket to
952 keepalive_timeout - (optional) Keep-alive timeout.
953 force_close - Set to True to force close and do reconnect
954 after each request (and between redirects).
955 limit - The total number of simultaneous connections.
956 limit_per_host - Number of simultaneous connections to one host.
957 enable_cleanup_closed - Enables clean-up closed ssl transports.
958 Disabled by default.
959 happy_eyeballs_delay - This is the “Connection Attempt Delay”
960 as defined in RFC 8305. To disable
961 the happy eyeballs algorithm, set to None.
962 interleave - “First Address Family Count” as defined in RFC 8305
963 loop - Optional event loop.
964 socket_factory - A SocketFactoryType function that, if supplied,
965 will be used to create sockets given an
966 AddrInfoType.
967 ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0.
968 Grace period for SSL shutdown handshake on TLS
969 connections. Default is 0 seconds (immediate abort).
970 This parameter allowed for a clean SSL shutdown by
971 notifying the remote peer of connection closure,
972 while avoiding excessive delays during connector cleanup.
973 Note: Only takes effect on Python 3.11+.
974 """
976 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"})
978 def __init__(
979 self,
980 *,
981 verify_ssl: bool = True,
982 fingerprint: Optional[bytes] = None,
983 use_dns_cache: bool = True,
984 ttl_dns_cache: Optional[int] = 10,
985 dns_cache_max_size: int = 1000,
986 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC,
987 ssl_context: Optional[SSLContext] = None,
988 ssl: Union[bool, Fingerprint, SSLContext] = True,
989 local_addr: Optional[Tuple[str, int]] = None,
990 resolver: Optional[AbstractResolver] = None,
991 keepalive_timeout: Union[None, float, object] = sentinel,
992 force_close: bool = False,
993 limit: int = 100,
994 limit_per_host: int = 0,
995 enable_cleanup_closed: bool = False,
996 loop: Optional[asyncio.AbstractEventLoop] = None,
997 timeout_ceil_threshold: float = 5,
998 happy_eyeballs_delay: Optional[float] = 0.25,
999 interleave: Optional[int] = None,
1000 socket_factory: Optional[SocketFactoryType] = None,
1001 ssl_shutdown_timeout: Union[_SENTINEL, None, float] = sentinel,
1002 ):
1003 super().__init__(
1004 keepalive_timeout=keepalive_timeout,
1005 force_close=force_close,
1006 limit=limit,
1007 limit_per_host=limit_per_host,
1008 enable_cleanup_closed=enable_cleanup_closed,
1009 loop=loop,
1010 timeout_ceil_threshold=timeout_ceil_threshold,
1011 )
1013 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
1015 self._resolver: AbstractResolver
1016 if resolver is None:
1017 self._resolver = DefaultResolver(loop=self._loop)
1018 self._resolver_owner = True
1019 else:
1020 self._resolver = resolver
1021 self._resolver_owner = False
1023 self._use_dns_cache = use_dns_cache
1024 self._cached_hosts = _DNSCacheTable(
1025 ttl=ttl_dns_cache, max_size=dns_cache_max_size
1026 )
1027 self._throttle_dns_futures: Dict[
1028 Tuple[str, int], Set["asyncio.Future[None]"]
1029 ] = {}
1030 self._family = family
1031 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr)
1032 self._happy_eyeballs_delay = happy_eyeballs_delay
1033 self._interleave = interleave
1034 self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set()
1035 self._socket_factory = socket_factory
1036 self._ssl_shutdown_timeout: Optional[float]
1037 # Handle ssl_shutdown_timeout with warning for Python < 3.11
1038 if ssl_shutdown_timeout is sentinel:
1039 self._ssl_shutdown_timeout = 0
1040 else:
1041 # Deprecation warning for ssl_shutdown_timeout parameter
1042 warnings.warn(
1043 "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0",
1044 DeprecationWarning,
1045 stacklevel=2,
1046 )
1047 if (
1048 sys.version_info < (3, 11)
1049 and ssl_shutdown_timeout is not None
1050 and ssl_shutdown_timeout != 0
1051 ):
1052 warnings.warn(
1053 f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; "
1054 "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.",
1055 RuntimeWarning,
1056 stacklevel=2,
1057 )
1058 self._ssl_shutdown_timeout = ssl_shutdown_timeout
1060 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]:
1061 """Close all ongoing DNS calls."""
1062 for fut in chain.from_iterable(self._throttle_dns_futures.values()):
1063 fut.cancel()
1065 waiters = super()._close(abort_ssl=abort_ssl)
1067 for t in self._resolve_host_tasks:
1068 t.cancel()
1069 waiters.append(t)
1071 return waiters
1073 async def close(self, *, abort_ssl: bool = False) -> None:
1074 """
1075 Close all opened transports.
1077 :param abort_ssl: If True, SSL connections will be aborted immediately
1078 without performing the shutdown handshake. If False (default),
1079 the behavior is determined by ssl_shutdown_timeout:
1080 - If ssl_shutdown_timeout=0: connections are aborted
1081 - If ssl_shutdown_timeout>0: graceful shutdown is performed
1082 """
1083 if self._resolver_owner:
1084 await self._resolver.close()
1085 # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default
1086 await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0)
1088 @property
1089 def family(self) -> int:
1090 """Socket family like AF_INET."""
1091 return self._family
1093 @property
1094 def use_dns_cache(self) -> bool:
1095 """True if local DNS caching is enabled."""
1096 return self._use_dns_cache
1098 def clear_dns_cache(
1099 self, host: Optional[str] = None, port: Optional[int] = None
1100 ) -> None:
1101 """Remove specified host/port or clear all dns local cache."""
1102 if host is not None and port is not None:
1103 self._cached_hosts.remove((host, port))
1104 elif host is not None or port is not None:
1105 raise ValueError("either both host and port or none of them are allowed")
1106 else:
1107 self._cached_hosts.clear()
1109 async def _resolve_host(
1110 self, host: str, port: int, traces: Optional[Sequence["Trace"]] = None
1111 ) -> List[ResolveResult]:
1112 """Resolve host and return list of addresses."""
1113 if is_ip_address(host):
1114 return [
1115 {
1116 "hostname": host,
1117 "host": host,
1118 "port": port,
1119 "family": self._family,
1120 "proto": 0,
1121 "flags": 0,
1122 }
1123 ]
1125 if not self._use_dns_cache:
1127 if traces:
1128 for trace in traces:
1129 await trace.send_dns_resolvehost_start(host)
1131 res = await self._resolver.resolve(host, port, family=self._family)
1133 if traces:
1134 for trace in traces:
1135 await trace.send_dns_resolvehost_end(host)
1137 return res
1139 key = (host, port)
1140 if key in self._cached_hosts and not self._cached_hosts.expired(key):
1141 # get result early, before any await (#4014)
1142 result = self._cached_hosts.next_addrs(key)
1144 if traces:
1145 for trace in traces:
1146 await trace.send_dns_cache_hit(host)
1147 return result
1149 futures: Set["asyncio.Future[None]"]
1150 #
1151 # If multiple connectors are resolving the same host, we wait
1152 # for the first one to resolve and then use the result for all of them.
1153 # We use a throttle to ensure that we only resolve the host once
1154 # and then use the result for all the waiters.
1155 #
1156 if key in self._throttle_dns_futures:
1157 # get futures early, before any await (#4014)
1158 futures = self._throttle_dns_futures[key]
1159 future: asyncio.Future[None] = self._loop.create_future()
1160 futures.add(future)
1161 if traces:
1162 for trace in traces:
1163 await trace.send_dns_cache_hit(host)
1164 try:
1165 await future
1166 finally:
1167 futures.discard(future)
1168 return self._cached_hosts.next_addrs(key)
1170 # update dict early, before any await (#4014)
1171 self._throttle_dns_futures[key] = futures = set()
1172 # In this case we need to create a task to ensure that we can shield
1173 # the task from cancellation as cancelling this lookup should not cancel
1174 # the underlying lookup or else the cancel event will get broadcast to
1175 # all the waiters across all connections.
1176 #
1177 coro = self._resolve_host_with_throttle(key, host, port, futures, traces)
1178 loop = asyncio.get_running_loop()
1179 if sys.version_info >= (3, 12):
1180 # Optimization for Python 3.12, try to send immediately
1181 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True)
1182 else:
1183 resolved_host_task = loop.create_task(coro)
1185 if not resolved_host_task.done():
1186 self._resolve_host_tasks.add(resolved_host_task)
1187 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard)
1189 try:
1190 return await asyncio.shield(resolved_host_task)
1191 except asyncio.CancelledError:
1193 def drop_exception(fut: "asyncio.Future[List[ResolveResult]]") -> None:
1194 with suppress(Exception, asyncio.CancelledError):
1195 fut.result()
1197 resolved_host_task.add_done_callback(drop_exception)
1198 raise
1200 async def _resolve_host_with_throttle(
1201 self,
1202 key: Tuple[str, int],
1203 host: str,
1204 port: int,
1205 futures: Set["asyncio.Future[None]"],
1206 traces: Optional[Sequence["Trace"]],
1207 ) -> List[ResolveResult]:
1208 """Resolve host and set result for all waiters.
1210 This method must be run in a task and shielded from cancellation
1211 to avoid cancelling the underlying lookup.
1212 """
1213 try:
1214 if traces:
1215 for trace in traces:
1216 await trace.send_dns_cache_miss(host)
1218 for trace in traces:
1219 await trace.send_dns_resolvehost_start(host)
1221 addrs = await self._resolver.resolve(host, port, family=self._family)
1222 if traces:
1223 for trace in traces:
1224 await trace.send_dns_resolvehost_end(host)
1226 self._cached_hosts.add(key, addrs)
1227 for fut in futures:
1228 set_result(fut, None)
1229 except BaseException as e:
1230 # any DNS exception is set for the waiters to raise the same exception.
1231 # This coro is always run in task that is shielded from cancellation so
1232 # we should never be propagating cancellation here.
1233 for fut in futures:
1234 set_exception(fut, e)
1235 raise
1236 finally:
1237 self._throttle_dns_futures.pop(key)
1239 return self._cached_hosts.next_addrs(key)
1241 async def _create_connection(
1242 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1243 ) -> ResponseHandler:
1244 """Create connection.
1246 Has same keyword arguments as BaseEventLoop.create_connection.
1247 """
1248 if req.proxy:
1249 _, proto = await self._create_proxy_connection(req, traces, timeout)
1250 else:
1251 _, proto = await self._create_direct_connection(req, traces, timeout)
1253 return proto
1255 def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]:
1256 """Logic to get the correct SSL context
1258 0. if req.ssl is false, return None
1260 1. if ssl_context is specified in req, use it
1261 2. if _ssl_context is specified in self, use it
1262 3. otherwise:
1263 1. if verify_ssl is not specified in req, use self.ssl_context
1264 (will generate a default context according to self.verify_ssl)
1265 2. if verify_ssl is True in req, generate a default SSL context
1266 3. if verify_ssl is False in req, generate a SSL context that
1267 won't verify
1268 """
1269 if not req.is_ssl():
1270 return None
1272 if ssl is None: # pragma: no cover
1273 raise RuntimeError("SSL is not supported.")
1274 sslcontext = req.ssl
1275 if isinstance(sslcontext, ssl.SSLContext):
1276 return sslcontext
1277 if sslcontext is not True:
1278 # not verified or fingerprinted
1279 return _SSL_CONTEXT_UNVERIFIED
1280 sslcontext = self._ssl
1281 if isinstance(sslcontext, ssl.SSLContext):
1282 return sslcontext
1283 if sslcontext is not True:
1284 # not verified or fingerprinted
1285 return _SSL_CONTEXT_UNVERIFIED
1286 return _SSL_CONTEXT_VERIFIED
1288 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]:
1289 ret = req.ssl
1290 if isinstance(ret, Fingerprint):
1291 return ret
1292 ret = self._ssl
1293 if isinstance(ret, Fingerprint):
1294 return ret
1295 return None
1297 async def _wrap_create_connection(
1298 self,
1299 *args: Any,
1300 addr_infos: List[AddrInfoType],
1301 req: ClientRequest,
1302 timeout: "ClientTimeout",
1303 client_error: Type[Exception] = ClientConnectorError,
1304 **kwargs: Any,
1305 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1306 try:
1307 async with ceil_timeout(
1308 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1309 ):
1310 sock = await aiohappyeyeballs.start_connection(
1311 addr_infos=addr_infos,
1312 local_addr_infos=self._local_addr_infos,
1313 happy_eyeballs_delay=self._happy_eyeballs_delay,
1314 interleave=self._interleave,
1315 loop=self._loop,
1316 socket_factory=self._socket_factory,
1317 )
1318 # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used
1319 if (
1320 kwargs.get("ssl")
1321 and self._ssl_shutdown_timeout
1322 and sys.version_info >= (3, 11)
1323 ):
1324 kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout
1325 return await self._loop.create_connection(*args, **kwargs, sock=sock)
1326 except cert_errors as exc:
1327 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1328 except ssl_errors as exc:
1329 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1330 except OSError as exc:
1331 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1332 raise
1333 raise client_error(req.connection_key, exc) from exc
1335 async def _wrap_existing_connection(
1336 self,
1337 *args: Any,
1338 req: ClientRequest,
1339 timeout: "ClientTimeout",
1340 client_error: Type[Exception] = ClientConnectorError,
1341 **kwargs: Any,
1342 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1343 try:
1344 async with ceil_timeout(
1345 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1346 ):
1347 return await self._loop.create_connection(*args, **kwargs)
1348 except cert_errors as exc:
1349 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1350 except ssl_errors as exc:
1351 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1352 except OSError as exc:
1353 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1354 raise
1355 raise client_error(req.connection_key, exc) from exc
1357 def _fail_on_no_start_tls(self, req: "ClientRequest") -> None:
1358 """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``.
1360 It is necessary for TLS-in-TLS so that it is possible to
1361 send HTTPS queries through HTTPS proxies.
1363 This doesn't affect regular HTTP requests, though.
1364 """
1365 if not req.is_ssl():
1366 return
1368 proxy_url = req.proxy
1369 assert proxy_url is not None
1370 if proxy_url.scheme != "https":
1371 return
1373 self._check_loop_for_start_tls()
1375 def _check_loop_for_start_tls(self) -> None:
1376 try:
1377 self._loop.start_tls
1378 except AttributeError as attr_exc:
1379 raise RuntimeError(
1380 "An HTTPS request is being sent through an HTTPS proxy. "
1381 "This needs support for TLS in TLS but it is not implemented "
1382 "in your runtime for the stdlib asyncio.\n\n"
1383 "Please upgrade to Python 3.11 or higher. For more details, "
1384 "please see:\n"
1385 "* https://bugs.python.org/issue37179\n"
1386 "* https://github.com/python/cpython/pull/28073\n"
1387 "* https://docs.aiohttp.org/en/stable/"
1388 "client_advanced.html#proxy-support\n"
1389 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1390 ) from attr_exc
1392 def _loop_supports_start_tls(self) -> bool:
1393 try:
1394 self._check_loop_for_start_tls()
1395 except RuntimeError:
1396 return False
1397 else:
1398 return True
1400 def _warn_about_tls_in_tls(
1401 self,
1402 underlying_transport: asyncio.Transport,
1403 req: ClientRequest,
1404 ) -> None:
1405 """Issue a warning if the requested URL has HTTPS scheme."""
1406 if req.request_info.url.scheme != "https":
1407 return
1409 # Check if uvloop is being used, which supports TLS in TLS,
1410 # otherwise assume that asyncio's native transport is being used.
1411 if type(underlying_transport).__module__.startswith("uvloop"):
1412 return
1414 # Support in asyncio was added in Python 3.11 (bpo-44011)
1415 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr(
1416 underlying_transport,
1417 "_start_tls_compatible",
1418 False,
1419 )
1421 if asyncio_supports_tls_in_tls:
1422 return
1424 warnings.warn(
1425 "An HTTPS request is being sent through an HTTPS proxy. "
1426 "This support for TLS in TLS is known to be disabled "
1427 "in the stdlib asyncio (Python <3.11). This is why you'll probably see "
1428 "an error in the log below.\n\n"
1429 "It is possible to enable it via monkeypatching. "
1430 "For more details, see:\n"
1431 "* https://bugs.python.org/issue37179\n"
1432 "* https://github.com/python/cpython/pull/28073\n\n"
1433 "You can temporarily patch this as follows:\n"
1434 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
1435 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1436 RuntimeWarning,
1437 source=self,
1438 # Why `4`? At least 3 of the calls in the stack originate
1439 # from the methods in this class.
1440 stacklevel=3,
1441 )
1443 async def _start_tls_connection(
1444 self,
1445 underlying_transport: asyncio.Transport,
1446 req: ClientRequest,
1447 timeout: "ClientTimeout",
1448 client_error: Type[Exception] = ClientConnectorError,
1449 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1450 """Wrap the raw TCP transport with TLS."""
1451 tls_proto = self._factory() # Create a brand new proto for TLS
1452 sslcontext = self._get_ssl_context(req)
1453 if TYPE_CHECKING:
1454 # _start_tls_connection is unreachable in the current code path
1455 # if sslcontext is None.
1456 assert sslcontext is not None
1458 try:
1459 async with ceil_timeout(
1460 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1461 ):
1462 try:
1463 # ssl_shutdown_timeout is only available in Python 3.11+
1464 if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout:
1465 tls_transport = await self._loop.start_tls(
1466 underlying_transport,
1467 tls_proto,
1468 sslcontext,
1469 server_hostname=req.server_hostname or req.host,
1470 ssl_handshake_timeout=timeout.total or None,
1471 ssl_shutdown_timeout=self._ssl_shutdown_timeout,
1472 )
1473 else:
1474 tls_transport = await self._loop.start_tls(
1475 underlying_transport,
1476 tls_proto,
1477 sslcontext,
1478 server_hostname=req.server_hostname or req.host,
1479 ssl_handshake_timeout=timeout.total or None,
1480 )
1481 except BaseException:
1482 # We need to close the underlying transport since
1483 # `start_tls()` probably failed before it had a
1484 # chance to do this:
1485 if self._ssl_shutdown_timeout == 0:
1486 underlying_transport.abort()
1487 else:
1488 underlying_transport.close()
1489 raise
1490 if isinstance(tls_transport, asyncio.Transport):
1491 fingerprint = self._get_fingerprint(req)
1492 if fingerprint:
1493 try:
1494 fingerprint.check(tls_transport)
1495 except ServerFingerprintMismatch:
1496 tls_transport.close()
1497 if not self._cleanup_closed_disabled:
1498 self._cleanup_closed_transports.append(tls_transport)
1499 raise
1500 except cert_errors as exc:
1501 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1502 except ssl_errors as exc:
1503 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1504 except OSError as exc:
1505 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1506 raise
1507 raise client_error(req.connection_key, exc) from exc
1508 except TypeError as type_err:
1509 # Example cause looks like this:
1510 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
1511 # object at 0x7f760615e460> is not supported by start_tls()
1513 raise ClientConnectionError(
1514 "Cannot initialize a TLS-in-TLS connection to host "
1515 f"{req.host!s}:{req.port:d} through an underlying connection "
1516 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
1517 f"[{type_err!s}]"
1518 ) from type_err
1519 else:
1520 if tls_transport is None:
1521 msg = "Failed to start TLS (possibly caused by closing transport)"
1522 raise client_error(req.connection_key, OSError(msg))
1523 tls_proto.connection_made(
1524 tls_transport
1525 ) # Kick the state machine of the new TLS protocol
1527 return tls_transport, tls_proto
1529 def _convert_hosts_to_addr_infos(
1530 self, hosts: List[ResolveResult]
1531 ) -> List[AddrInfoType]:
1532 """Converts the list of hosts to a list of addr_infos.
1534 The list of hosts is the result of a DNS lookup. The list of
1535 addr_infos is the result of a call to `socket.getaddrinfo()`.
1536 """
1537 addr_infos: List[AddrInfoType] = []
1538 for hinfo in hosts:
1539 host = hinfo["host"]
1540 is_ipv6 = ":" in host
1541 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET
1542 if self._family and self._family != family:
1543 continue
1544 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"])
1545 addr_infos.append(
1546 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)
1547 )
1548 return addr_infos
1550 async def _create_direct_connection(
1551 self,
1552 req: ClientRequest,
1553 traces: List["Trace"],
1554 timeout: "ClientTimeout",
1555 *,
1556 client_error: Type[Exception] = ClientConnectorError,
1557 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1558 sslcontext = self._get_ssl_context(req)
1559 fingerprint = self._get_fingerprint(req)
1561 host = req.url.raw_host
1562 assert host is not None
1563 # Replace multiple trailing dots with a single one.
1564 # A trailing dot is only present for fully-qualified domain names.
1565 # See https://github.com/aio-libs/aiohttp/pull/7364.
1566 if host.endswith(".."):
1567 host = host.rstrip(".") + "."
1568 port = req.port
1569 assert port is not None
1570 try:
1571 # Cancelling this lookup should not cancel the underlying lookup
1572 # or else the cancel event will get broadcast to all the waiters
1573 # across all connections.
1574 hosts = await self._resolve_host(host, port, traces=traces)
1575 except OSError as exc:
1576 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1577 raise
1578 # in case of proxy it is not ClientProxyConnectionError
1579 # it is problem of resolving proxy ip itself
1580 raise ClientConnectorDNSError(req.connection_key, exc) from exc
1582 last_exc: Optional[Exception] = None
1583 addr_infos = self._convert_hosts_to_addr_infos(hosts)
1584 while addr_infos:
1585 # Strip trailing dots, certificates contain FQDN without dots.
1586 # See https://github.com/aio-libs/aiohttp/issues/3636
1587 server_hostname = (
1588 (req.server_hostname or host).rstrip(".") if sslcontext else None
1589 )
1591 try:
1592 transp, proto = await self._wrap_create_connection(
1593 self._factory,
1594 timeout=timeout,
1595 ssl=sslcontext,
1596 addr_infos=addr_infos,
1597 server_hostname=server_hostname,
1598 req=req,
1599 client_error=client_error,
1600 )
1601 except (ClientConnectorError, asyncio.TimeoutError) as exc:
1602 last_exc = exc
1603 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave)
1604 continue
1606 if req.is_ssl() and fingerprint:
1607 try:
1608 fingerprint.check(transp)
1609 except ServerFingerprintMismatch as exc:
1610 transp.close()
1611 if not self._cleanup_closed_disabled:
1612 self._cleanup_closed_transports.append(transp)
1613 last_exc = exc
1614 # Remove the bad peer from the list of addr_infos
1615 sock: socket.socket = transp.get_extra_info("socket")
1616 bad_peer = sock.getpeername()
1617 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer)
1618 continue
1620 return transp, proto
1621 else:
1622 assert last_exc is not None
1623 raise last_exc
1625 async def _create_proxy_connection(
1626 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1627 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1628 self._fail_on_no_start_tls(req)
1629 runtime_has_start_tls = self._loop_supports_start_tls()
1630 proxy_req = self._update_proxy_auth_header_and_build_proxy_req(req)
1632 # create connection to proxy server
1633 transport, proto = await self._create_direct_connection(
1634 proxy_req, [], timeout, client_error=ClientProxyConnectionError
1635 )
1637 if req.is_ssl():
1638 if runtime_has_start_tls:
1639 self._warn_about_tls_in_tls(transport, req)
1641 # For HTTPS requests over HTTP proxy
1642 # we must notify proxy to tunnel connection
1643 # so we send CONNECT command:
1644 # CONNECT www.python.org:443 HTTP/1.1
1645 # Host: www.python.org
1646 #
1647 # next we must do TLS handshake and so on
1648 # to do this we must wrap raw socket into secure one
1649 # asyncio handles this perfectly
1650 proxy_req.method = hdrs.METH_CONNECT
1651 proxy_req.url = req.url
1652 key = req.connection_key._replace(
1653 proxy=None, proxy_auth=None, proxy_headers_hash=None
1654 )
1655 conn = _ConnectTunnelConnection(self, key, proto, self._loop)
1656 proxy_resp = await proxy_req.send(conn)
1657 try:
1658 protocol = conn._protocol
1659 assert protocol is not None
1661 # read_until_eof=True will ensure the connection isn't closed
1662 # once the response is received and processed allowing
1663 # START_TLS to work on the connection below.
1664 protocol.set_response_params(
1665 read_until_eof=runtime_has_start_tls,
1666 timeout_ceil_threshold=self._timeout_ceil_threshold,
1667 )
1668 resp = await proxy_resp.start(conn)
1669 except BaseException:
1670 proxy_resp.close()
1671 conn.close()
1672 raise
1673 else:
1674 conn._protocol = None
1675 try:
1676 if resp.status != 200:
1677 message = resp.reason
1678 if message is None:
1679 message = HTTPStatus(resp.status).phrase
1680 raise ClientHttpProxyError(
1681 proxy_resp.request_info,
1682 resp.history,
1683 status=resp.status,
1684 message=message,
1685 headers=resp.headers,
1686 )
1687 if not runtime_has_start_tls:
1688 rawsock = transport.get_extra_info("socket", default=None)
1689 if rawsock is None:
1690 raise RuntimeError(
1691 "Transport does not expose socket instance"
1692 )
1693 # Duplicate the socket, so now we can close proxy transport
1694 rawsock = rawsock.dup()
1695 except BaseException:
1696 # It shouldn't be closed in `finally` because it's fed to
1697 # `loop.start_tls()` and the docs say not to touch it after
1698 # passing there.
1699 transport.close()
1700 raise
1701 finally:
1702 if not runtime_has_start_tls:
1703 transport.close()
1705 if not runtime_has_start_tls:
1706 # HTTP proxy with support for upgrade to HTTPS
1707 sslcontext = self._get_ssl_context(req)
1708 return await self._wrap_existing_connection(
1709 self._factory,
1710 timeout=timeout,
1711 ssl=sslcontext,
1712 sock=rawsock,
1713 server_hostname=req.host,
1714 req=req,
1715 )
1717 return await self._start_tls_connection(
1718 # Access the old transport for the last time before it's
1719 # closed and forgotten forever:
1720 transport,
1721 req=req,
1722 timeout=timeout,
1723 )
1724 finally:
1725 proxy_resp.close()
1727 return transport, proto
1730class UnixConnector(BaseConnector):
1731 """Unix socket connector.
1733 path - Unix socket path.
1734 keepalive_timeout - (optional) Keep-alive timeout.
1735 force_close - Set to True to force close and do reconnect
1736 after each request (and between redirects).
1737 limit - The total number of simultaneous connections.
1738 limit_per_host - Number of simultaneous connections to one host.
1739 loop - Optional event loop.
1740 """
1742 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"})
1744 def __init__(
1745 self,
1746 path: str,
1747 force_close: bool = False,
1748 keepalive_timeout: Union[object, float, None] = sentinel,
1749 limit: int = 100,
1750 limit_per_host: int = 0,
1751 loop: Optional[asyncio.AbstractEventLoop] = None,
1752 ) -> None:
1753 super().__init__(
1754 force_close=force_close,
1755 keepalive_timeout=keepalive_timeout,
1756 limit=limit,
1757 limit_per_host=limit_per_host,
1758 loop=loop,
1759 )
1760 self._path = path
1762 @property
1763 def path(self) -> str:
1764 """Path to unix socket."""
1765 return self._path
1767 async def _create_connection(
1768 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1769 ) -> ResponseHandler:
1770 try:
1771 async with ceil_timeout(
1772 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1773 ):
1774 _, proto = await self._loop.create_unix_connection(
1775 self._factory, self._path
1776 )
1777 except OSError as exc:
1778 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1779 raise
1780 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
1782 return proto
1785class NamedPipeConnector(BaseConnector):
1786 """Named pipe connector.
1788 Only supported by the proactor event loop.
1789 See also: https://docs.python.org/3/library/asyncio-eventloop.html
1791 path - Windows named pipe path.
1792 keepalive_timeout - (optional) Keep-alive timeout.
1793 force_close - Set to True to force close and do reconnect
1794 after each request (and between redirects).
1795 limit - The total number of simultaneous connections.
1796 limit_per_host - Number of simultaneous connections to one host.
1797 loop - Optional event loop.
1798 """
1800 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"})
1802 def __init__(
1803 self,
1804 path: str,
1805 force_close: bool = False,
1806 keepalive_timeout: Union[object, float, None] = sentinel,
1807 limit: int = 100,
1808 limit_per_host: int = 0,
1809 loop: Optional[asyncio.AbstractEventLoop] = None,
1810 ) -> None:
1811 super().__init__(
1812 force_close=force_close,
1813 keepalive_timeout=keepalive_timeout,
1814 limit=limit,
1815 limit_per_host=limit_per_host,
1816 loop=loop,
1817 )
1818 if not isinstance(
1819 self._loop,
1820 asyncio.ProactorEventLoop, # type: ignore[attr-defined]
1821 ):
1822 raise RuntimeError(
1823 "Named Pipes only available in proactor loop under windows"
1824 )
1825 self._path = path
1827 @property
1828 def path(self) -> str:
1829 """Path to the named pipe."""
1830 return self._path
1832 async def _create_connection(
1833 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1834 ) -> ResponseHandler:
1835 try:
1836 async with ceil_timeout(
1837 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1838 ):
1839 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined]
1840 self._factory, self._path
1841 )
1842 # the drain is required so that the connection_made is called
1843 # and transport is set otherwise it is not set before the
1844 # `assert conn.transport is not None`
1845 # in client.py's _request method
1846 await asyncio.sleep(0)
1847 # other option is to manually set transport like
1848 # `proto.transport = trans`
1849 except OSError as exc:
1850 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1851 raise
1852 raise ClientConnectorError(req.connection_key, exc) from exc
1854 return cast(ResponseHandler, proto)