Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/connector.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import functools
3import random
4import socket
5import sys
6import traceback
7import warnings
8from collections import OrderedDict, defaultdict, deque
9from collections.abc import Awaitable, Callable, Iterator, Sequence
10from contextlib import suppress
11from http import HTTPStatus
12from itertools import chain, cycle, islice
13from time import monotonic
14from types import TracebackType
15from typing import TYPE_CHECKING, Any, Literal, Optional, Union, cast
17import aiohappyeyeballs
18from aiohappyeyeballs import AddrInfoType, SocketFactoryType
20from . import hdrs, helpers
21from .abc import AbstractResolver, ResolveResult
22from .client_exceptions import (
23 ClientConnectionError,
24 ClientConnectorCertificateError,
25 ClientConnectorDNSError,
26 ClientConnectorError,
27 ClientConnectorSSLError,
28 ClientHttpProxyError,
29 ClientProxyConnectionError,
30 InvalidUrlClientError,
31 ServerFingerprintMismatch,
32 UnixClientConnectorError,
33 cert_errors,
34 ssl_errors,
35)
36from .client_proto import ResponseHandler
37from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params
38from .helpers import (
39 _SENTINEL,
40 ceil_timeout,
41 is_canonical_ipv4_address,
42 is_ip_address,
43 noop,
44 sentinel,
45 set_exception,
46 set_result,
47)
48from .log import client_logger
49from .resolver import DefaultResolver
51if sys.version_info >= (3, 12):
52 from collections.abc import Buffer
53else:
54 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
56if TYPE_CHECKING:
57 import ssl
59 SSLContext = ssl.SSLContext
60else:
61 try:
62 import ssl
64 SSLContext = ssl.SSLContext
65 except ImportError: # pragma: no cover
66 ssl = None # type: ignore[assignment]
67 SSLContext = object # type: ignore[misc,assignment]
69EMPTY_SCHEMA_SET = frozenset({""})
70HTTP_SCHEMA_SET = frozenset({"http", "https"})
71WS_SCHEMA_SET = frozenset({"ws", "wss"})
73HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET
74HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET
76NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < (
77 3,
78 13,
79 1,
80) or sys.version_info < (3, 12, 8)
81# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960
82# which first appeared in Python 3.12.8 and 3.13.1
85__all__ = (
86 "BaseConnector",
87 "TCPConnector",
88 "UnixConnector",
89 "NamedPipeConnector",
90 "AddrInfoType",
91 "SocketFactoryType",
92)
95if TYPE_CHECKING:
96 from .client import ClientTimeout
97 from .client_reqrep import ConnectionKey
98 from .tracing import Trace
101class _DeprecationWaiter:
102 __slots__ = ("_awaitable", "_awaited")
104 def __init__(self, awaitable: Awaitable[Any]) -> None:
105 self._awaitable = awaitable
106 self._awaited = False
108 def __await__(self) -> Any:
109 self._awaited = True
110 return self._awaitable.__await__()
112 def __del__(self) -> None:
113 if not self._awaited:
114 warnings.warn(
115 "Connector.close() is a coroutine, "
116 "please use await connector.close()",
117 DeprecationWarning,
118 )
121async def _wait_for_close(waiters: list[Awaitable[object]]) -> None:
122 """Wait for all waiters to finish closing."""
123 results = await asyncio.gather(*waiters, return_exceptions=True)
124 for res in results:
125 if isinstance(res, Exception):
126 client_logger.debug("Error while closing connector: %r", res)
129class Connection:
131 _source_traceback = None
133 def __init__(
134 self,
135 connector: "BaseConnector",
136 key: "ConnectionKey",
137 protocol: ResponseHandler,
138 loop: asyncio.AbstractEventLoop,
139 ) -> None:
140 self._key = key
141 self._connector = connector
142 self._loop = loop
143 self._protocol: ResponseHandler | None = protocol
144 self._callbacks: list[Callable[[], None]] = []
146 if loop.get_debug():
147 self._source_traceback = traceback.extract_stack(sys._getframe(1))
149 def __repr__(self) -> str:
150 return f"Connection<{self._key}>"
152 def __del__(self, _warnings: Any = warnings) -> None:
153 if self._protocol is not None:
154 kwargs = {"source": self}
155 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs)
156 if self._loop.is_closed():
157 return
159 self._connector._release(self._key, self._protocol, should_close=True)
161 context = {"client_connection": self, "message": "Unclosed connection"}
162 if self._source_traceback is not None:
163 context["source_traceback"] = self._source_traceback
164 self._loop.call_exception_handler(context)
166 def __bool__(self) -> Literal[True]:
167 """Force subclasses to not be falsy, to make checks simpler."""
168 return True
170 @property
171 def loop(self) -> asyncio.AbstractEventLoop:
172 warnings.warn(
173 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2
174 )
175 return self._loop
177 @property
178 def transport(self) -> asyncio.Transport | None:
179 if self._protocol is None:
180 return None
181 return self._protocol.transport
183 @property
184 def protocol(self) -> ResponseHandler | None:
185 return self._protocol
187 def add_callback(self, callback: Callable[[], None]) -> None:
188 if callback is not None:
189 self._callbacks.append(callback)
191 def _notify_release(self) -> None:
192 callbacks, self._callbacks = self._callbacks[:], []
194 for cb in callbacks:
195 with suppress(Exception):
196 cb()
198 def close(self) -> None:
199 self._notify_release()
201 if self._protocol is not None:
202 self._connector._release(self._key, self._protocol, should_close=True)
203 self._protocol = None
205 def release(self) -> None:
206 self._notify_release()
208 if self._protocol is not None:
209 self._connector._release(self._key, self._protocol)
210 self._protocol = None
212 @property
213 def closed(self) -> bool:
214 return self._protocol is None or not self._protocol.is_connected()
217class _ConnectTunnelConnection(Connection):
218 """Special connection wrapper for CONNECT tunnels that must never be pooled.
220 This connection wraps the proxy connection that will be upgraded with TLS.
221 It must never be released to the pool because:
222 1. Its 'closed' future will never complete, causing session.close() to hang
223 2. It represents an intermediate state, not a reusable connection
224 3. The real connection (with TLS) will be created separately
225 """
227 def release(self) -> None:
228 """Do nothing - don't pool or close the connection.
230 These connections are an intermediate state during the CONNECT tunnel
231 setup and will be cleaned up naturally after the TLS upgrade. If they
232 were to be pooled, they would never be properly closed, causing
233 session.close() to wait forever for their 'closed' future.
234 """
237class _TransportPlaceholder:
238 """placeholder for BaseConnector.connect function"""
240 __slots__ = ("closed", "transport")
242 def __init__(self, closed_future: asyncio.Future[Exception | None]) -> None:
243 """Initialize a placeholder for a transport."""
244 self.closed = closed_future
245 self.transport = None
247 def close(self) -> None:
248 """Close the placeholder."""
250 def abort(self) -> None:
251 """Abort the placeholder (does nothing)."""
254class BaseConnector:
255 """Base connector class.
257 keepalive_timeout - (optional) Keep-alive timeout.
258 force_close - Set to True to force close and do reconnect
259 after each request (and between redirects).
260 limit - The total number of simultaneous connections.
261 limit_per_host - Number of simultaneous connections to one host.
262 enable_cleanup_closed - Enables clean-up closed ssl transports.
263 Disabled by default.
264 timeout_ceil_threshold - Trigger ceiling of timeout values when
265 it's above timeout_ceil_threshold.
266 loop - Optional event loop.
267 """
269 _closed = True # prevent AttributeError in __del__ if ctor was failed
270 _source_traceback = None
272 # abort transport after 2 seconds (cleanup broken connections)
273 _cleanup_closed_period = 2.0
275 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET
277 def __init__(
278 self,
279 *,
280 keepalive_timeout: object | None | float = sentinel,
281 force_close: bool = False,
282 limit: int = 100,
283 limit_per_host: int = 0,
284 enable_cleanup_closed: bool = False,
285 loop: asyncio.AbstractEventLoop | None = None,
286 timeout_ceil_threshold: float = 5,
287 ) -> None:
289 if force_close:
290 if keepalive_timeout is not None and keepalive_timeout is not sentinel:
291 raise ValueError(
292 "keepalive_timeout cannot be set if force_close is True"
293 )
294 else:
295 if keepalive_timeout is sentinel:
296 keepalive_timeout = 15.0
298 loop = loop or asyncio.get_running_loop()
299 self._timeout_ceil_threshold = timeout_ceil_threshold
301 self._closed = False
302 if loop.get_debug():
303 self._source_traceback = traceback.extract_stack(sys._getframe(1))
305 # Connection pool of reusable connections.
306 # We use a deque to store connections because it has O(1) popleft()
307 # and O(1) append() operations to implement a FIFO queue.
308 self._conns: defaultdict[
309 ConnectionKey, deque[tuple[ResponseHandler, float]]
310 ] = defaultdict(deque)
311 self._limit = limit
312 self._limit_per_host = limit_per_host
313 self._acquired: set[ResponseHandler] = set()
314 self._acquired_per_host: defaultdict[ConnectionKey, set[ResponseHandler]] = (
315 defaultdict(set)
316 )
317 self._keepalive_timeout = cast(float, keepalive_timeout)
318 self._force_close = force_close
320 # {host_key: FIFO list of waiters}
321 # The FIFO is implemented with an OrderedDict with None keys because
322 # python does not have an ordered set.
323 self._waiters: defaultdict[
324 ConnectionKey, OrderedDict[asyncio.Future[None], None]
325 ] = defaultdict(OrderedDict)
327 self._loop = loop
328 self._factory = functools.partial(ResponseHandler, loop=loop)
330 # start keep-alive connection cleanup task
331 self._cleanup_handle: asyncio.TimerHandle | None = None
333 # start cleanup closed transports task
334 self._cleanup_closed_handle: asyncio.TimerHandle | None = None
336 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED:
337 warnings.warn(
338 "enable_cleanup_closed ignored because "
339 "https://github.com/python/cpython/pull/118960 is fixed "
340 f"in Python version {sys.version_info}",
341 DeprecationWarning,
342 stacklevel=2,
343 )
344 enable_cleanup_closed = False
346 self._cleanup_closed_disabled = not enable_cleanup_closed
347 self._cleanup_closed_transports: list[asyncio.Transport | None] = []
348 self._placeholder_future: asyncio.Future[Exception | None] = (
349 loop.create_future()
350 )
351 self._placeholder_future.set_result(None)
352 self._cleanup_closed()
354 def __del__(self, _warnings: Any = warnings) -> None:
355 if self._closed:
356 return
357 if not self._conns:
358 return
360 conns = [repr(c) for c in self._conns.values()]
362 self._close()
364 kwargs = {"source": self}
365 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs)
366 context = {
367 "connector": self,
368 "connections": conns,
369 "message": "Unclosed connector",
370 }
371 if self._source_traceback is not None:
372 context["source_traceback"] = self._source_traceback
373 self._loop.call_exception_handler(context)
375 def __enter__(self) -> "BaseConnector":
376 warnings.warn(
377 '"with Connector():" is deprecated, '
378 'use "async with Connector():" instead',
379 DeprecationWarning,
380 )
381 return self
383 def __exit__(self, *exc: Any) -> None:
384 self._close()
386 async def __aenter__(self) -> "BaseConnector":
387 return self
389 async def __aexit__(
390 self,
391 exc_type: type[BaseException] | None = None,
392 exc_value: BaseException | None = None,
393 exc_traceback: TracebackType | None = None,
394 ) -> None:
395 await self.close()
397 @property
398 def force_close(self) -> bool:
399 """Ultimately close connection on releasing if True."""
400 return self._force_close
402 @property
403 def limit(self) -> int:
404 """The total number for simultaneous connections.
406 If limit is 0 the connector has no limit.
407 The default limit size is 100.
408 """
409 return self._limit
411 @property
412 def limit_per_host(self) -> int:
413 """The limit for simultaneous connections to the same endpoint.
415 Endpoints are the same if they are have equal
416 (host, port, is_ssl) triple.
417 """
418 return self._limit_per_host
420 def _cleanup(self) -> None:
421 """Cleanup unused transports."""
422 if self._cleanup_handle:
423 self._cleanup_handle.cancel()
424 # _cleanup_handle should be unset, otherwise _release() will not
425 # recreate it ever!
426 self._cleanup_handle = None
428 now = monotonic()
429 timeout = self._keepalive_timeout
431 if self._conns:
432 connections = defaultdict(deque)
433 deadline = now - timeout
434 for key, conns in self._conns.items():
435 alive: deque[tuple[ResponseHandler, float]] = deque()
436 for proto, use_time in conns:
437 if proto.is_connected() and use_time - deadline >= 0:
438 alive.append((proto, use_time))
439 continue
440 transport = proto.transport
441 proto.close()
442 if not self._cleanup_closed_disabled and key.is_ssl:
443 self._cleanup_closed_transports.append(transport)
445 if alive:
446 connections[key] = alive
448 self._conns = connections
450 if self._conns:
451 self._cleanup_handle = helpers.weakref_handle(
452 self,
453 "_cleanup",
454 timeout,
455 self._loop,
456 timeout_ceil_threshold=self._timeout_ceil_threshold,
457 )
459 def _cleanup_closed(self) -> None:
460 """Double confirmation for transport close.
462 Some broken ssl servers may leave socket open without proper close.
463 """
464 if self._cleanup_closed_handle:
465 self._cleanup_closed_handle.cancel()
467 for transport in self._cleanup_closed_transports:
468 if transport is not None:
469 transport.abort()
471 self._cleanup_closed_transports = []
473 if not self._cleanup_closed_disabled:
474 self._cleanup_closed_handle = helpers.weakref_handle(
475 self,
476 "_cleanup_closed",
477 self._cleanup_closed_period,
478 self._loop,
479 timeout_ceil_threshold=self._timeout_ceil_threshold,
480 )
482 def close(self, *, abort_ssl: bool = False) -> Awaitable[None]:
483 """Close all opened transports.
485 :param abort_ssl: If True, SSL connections will be aborted immediately
486 without performing the shutdown handshake. This provides
487 faster cleanup at the cost of less graceful disconnection.
488 """
489 if not (waiters := self._close(abort_ssl=abort_ssl)):
490 # If there are no connections to close, we can return a noop
491 # awaitable to avoid scheduling a task on the event loop.
492 return _DeprecationWaiter(noop())
493 coro = _wait_for_close(waiters)
494 if sys.version_info >= (3, 12):
495 # Optimization for Python 3.12, try to close connections
496 # immediately to avoid having to schedule the task on the event loop.
497 task = asyncio.Task(coro, loop=self._loop, eager_start=True)
498 else:
499 task = self._loop.create_task(coro)
500 return _DeprecationWaiter(task)
502 def _close(self, *, abort_ssl: bool = False) -> list[Awaitable[object]]:
503 waiters: list[Awaitable[object]] = []
505 if self._closed:
506 return waiters
508 self._closed = True
510 try:
511 if self._loop.is_closed():
512 return waiters
514 # cancel cleanup task
515 if self._cleanup_handle:
516 self._cleanup_handle.cancel()
518 # cancel cleanup close task
519 if self._cleanup_closed_handle:
520 self._cleanup_closed_handle.cancel()
522 for data in self._conns.values():
523 for proto, _ in data:
524 if (
525 abort_ssl
526 and proto.transport
527 and proto.transport.get_extra_info("sslcontext") is not None
528 ):
529 proto.abort()
530 else:
531 proto.close()
532 if closed := proto.closed:
533 waiters.append(closed)
535 for proto in self._acquired:
536 if (
537 abort_ssl
538 and proto.transport
539 and proto.transport.get_extra_info("sslcontext") is not None
540 ):
541 proto.abort()
542 else:
543 proto.close()
544 if closed := proto.closed:
545 waiters.append(closed)
547 for transport in self._cleanup_closed_transports:
548 if transport is not None:
549 transport.abort()
551 return waiters
553 finally:
554 self._conns.clear()
555 self._acquired.clear()
556 for keyed_waiters in self._waiters.values():
557 for keyed_waiter in keyed_waiters:
558 keyed_waiter.cancel()
559 self._waiters.clear()
560 self._cleanup_handle = None
561 self._cleanup_closed_transports.clear()
562 self._cleanup_closed_handle = None
564 @property
565 def closed(self) -> bool:
566 """Is connector closed.
568 A readonly property.
569 """
570 return self._closed
572 def _available_connections(self, key: "ConnectionKey") -> int:
573 """
574 Return number of available connections.
576 The limit, limit_per_host and the connection key are taken into account.
578 If it returns less than 1 means that there are no connections
579 available.
580 """
581 # check total available connections
582 # If there are no limits, this will always return 1
583 total_remain = 1
585 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0:
586 return total_remain
588 # check limit per host
589 if host_remain := self._limit_per_host:
590 if acquired := self._acquired_per_host.get(key):
591 host_remain -= len(acquired)
592 if total_remain > host_remain:
593 return host_remain
595 return total_remain
597 def _update_proxy_auth_header_and_build_proxy_req(
598 self, req: ClientRequest
599 ) -> ClientRequest:
600 """Set Proxy-Authorization header for non-SSL proxy requests and builds the proxy request for SSL proxy requests."""
601 url = req.proxy
602 assert url is not None
603 headers: dict[str, str] = {}
604 if req.proxy_headers is not None:
605 headers = req.proxy_headers # type: ignore[assignment]
606 headers[hdrs.HOST] = req.headers[hdrs.HOST]
607 proxy_req = ClientRequest(
608 hdrs.METH_GET,
609 url,
610 headers=headers,
611 auth=req.proxy_auth,
612 loop=self._loop,
613 ssl=req.ssl,
614 )
615 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
616 if auth is not None:
617 if not req.is_ssl():
618 req.headers[hdrs.PROXY_AUTHORIZATION] = auth
619 else:
620 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
621 return proxy_req
623 async def connect(
624 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
625 ) -> Connection:
626 """Get from pool or create new connection."""
627 key = req.connection_key
628 if (conn := await self._get(key, traces)) is not None:
629 # If we do not have to wait and we can get a connection from the pool
630 # we can avoid the timeout ceil logic and directly return the connection
631 if req.proxy:
632 self._update_proxy_auth_header_and_build_proxy_req(req)
633 return conn
635 async with ceil_timeout(timeout.connect, timeout.ceil_threshold):
636 if self._available_connections(key) <= 0:
637 await self._wait_for_available_connection(key, traces)
638 if (conn := await self._get(key, traces)) is not None:
639 if req.proxy:
640 self._update_proxy_auth_header_and_build_proxy_req(req)
641 return conn
643 placeholder = cast(
644 ResponseHandler, _TransportPlaceholder(self._placeholder_future)
645 )
646 self._acquired.add(placeholder)
647 if self._limit_per_host:
648 self._acquired_per_host[key].add(placeholder)
650 try:
651 # Traces are done inside the try block to ensure that the
652 # that the placeholder is still cleaned up if an exception
653 # is raised.
654 if traces:
655 for trace in traces:
656 await trace.send_connection_create_start()
657 proto = await self._create_connection(req, traces, timeout)
658 if traces:
659 for trace in traces:
660 await trace.send_connection_create_end()
661 except BaseException:
662 self._release_acquired(key, placeholder)
663 raise
664 else:
665 if self._closed:
666 proto.close()
667 raise ClientConnectionError("Connector is closed.")
669 # The connection was successfully created, drop the placeholder
670 # and add the real connection to the acquired set. There should
671 # be no awaits after the proto is added to the acquired set
672 # to ensure that the connection is not left in the acquired set
673 # on cancellation.
674 self._acquired.remove(placeholder)
675 self._acquired.add(proto)
676 if self._limit_per_host:
677 acquired_per_host = self._acquired_per_host[key]
678 acquired_per_host.remove(placeholder)
679 acquired_per_host.add(proto)
680 return Connection(self, key, proto, self._loop)
682 async def _wait_for_available_connection(
683 self, key: "ConnectionKey", traces: list["Trace"]
684 ) -> None:
685 """Wait for an available connection slot."""
686 # We loop here because there is a race between
687 # the connection limit check and the connection
688 # being acquired. If the connection is acquired
689 # between the check and the await statement, we
690 # need to loop again to check if the connection
691 # slot is still available.
692 attempts = 0
693 while True:
694 fut: asyncio.Future[None] = self._loop.create_future()
695 keyed_waiters = self._waiters[key]
696 keyed_waiters[fut] = None
697 if attempts:
698 # If we have waited before, we need to move the waiter
699 # to the front of the queue as otherwise we might get
700 # starved and hit the timeout.
701 keyed_waiters.move_to_end(fut, last=False)
703 try:
704 # Traces happen in the try block to ensure that the
705 # the waiter is still cleaned up if an exception is raised.
706 if traces:
707 for trace in traces:
708 await trace.send_connection_queued_start()
709 await fut
710 if traces:
711 for trace in traces:
712 await trace.send_connection_queued_end()
713 finally:
714 # pop the waiter from the queue if its still
715 # there and not already removed by _release_waiter
716 keyed_waiters.pop(fut, None)
717 if not self._waiters.get(key, True):
718 del self._waiters[key]
720 if self._available_connections(key) > 0:
721 break
722 attempts += 1
724 async def _get(
725 self, key: "ConnectionKey", traces: list["Trace"]
726 ) -> Connection | None:
727 """Get next reusable connection for the key or None.
729 The connection will be marked as acquired.
730 """
731 if (conns := self._conns.get(key)) is None:
732 return None
734 t1 = monotonic()
735 while conns:
736 proto, t0 = conns.popleft()
737 # We will we reuse the connection if its connected and
738 # the keepalive timeout has not been exceeded
739 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout:
740 if not conns:
741 # The very last connection was reclaimed: drop the key
742 del self._conns[key]
743 self._acquired.add(proto)
744 if self._limit_per_host:
745 self._acquired_per_host[key].add(proto)
746 if traces:
747 for trace in traces:
748 try:
749 await trace.send_connection_reuseconn()
750 except BaseException:
751 self._release_acquired(key, proto)
752 raise
753 return Connection(self, key, proto, self._loop)
755 # Connection cannot be reused, close it
756 transport = proto.transport
757 proto.close()
758 # only for SSL transports
759 if not self._cleanup_closed_disabled and key.is_ssl:
760 self._cleanup_closed_transports.append(transport)
762 # No more connections: drop the key
763 del self._conns[key]
764 return None
766 def _release_waiter(self) -> None:
767 """
768 Iterates over all waiters until one to be released is found.
770 The one to be released is not finished and
771 belongs to a host that has available connections.
772 """
773 if not self._waiters:
774 return
776 # Having the dict keys ordered this avoids to iterate
777 # at the same order at each call.
778 queues = list(self._waiters)
779 random.shuffle(queues)
781 for key in queues:
782 if self._available_connections(key) < 1:
783 continue
785 waiters = self._waiters[key]
786 while waiters:
787 waiter, _ = waiters.popitem(last=False)
788 if not waiter.done():
789 waiter.set_result(None)
790 return
792 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
793 """Release acquired connection."""
794 if self._closed:
795 # acquired connection is already released on connector closing
796 return
798 self._acquired.discard(proto)
799 if self._limit_per_host and (conns := self._acquired_per_host.get(key)):
800 conns.discard(proto)
801 if not conns:
802 del self._acquired_per_host[key]
803 self._release_waiter()
805 def _release(
806 self,
807 key: "ConnectionKey",
808 protocol: ResponseHandler,
809 *,
810 should_close: bool = False,
811 ) -> None:
812 if self._closed:
813 # acquired connection is already released on connector closing
814 return
816 self._release_acquired(key, protocol)
818 if self._force_close or should_close or protocol.should_close:
819 transport = protocol.transport
820 protocol.close()
822 if key.is_ssl and not self._cleanup_closed_disabled:
823 self._cleanup_closed_transports.append(transport)
824 return
826 self._conns[key].append((protocol, monotonic()))
828 if self._cleanup_handle is None:
829 self._cleanup_handle = helpers.weakref_handle(
830 self,
831 "_cleanup",
832 self._keepalive_timeout,
833 self._loop,
834 timeout_ceil_threshold=self._timeout_ceil_threshold,
835 )
837 async def _create_connection(
838 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
839 ) -> ResponseHandler:
840 raise NotImplementedError()
843class _DNSCacheTable:
844 def __init__(self, ttl: float | None = None, max_size: int = 1000) -> None:
845 self._addrs_rr: OrderedDict[
846 tuple[str, int], tuple[Iterator[ResolveResult], int]
847 ] = OrderedDict()
848 self._timestamps: dict[tuple[str, int], float] = {}
849 self._ttl = ttl
850 self._max_size = max_size
852 def __contains__(self, host: object) -> bool:
853 return host in self._addrs_rr
855 def add(self, key: tuple[str, int], addrs: list[ResolveResult]) -> None:
856 if key in self._addrs_rr:
857 self._addrs_rr.move_to_end(key)
859 self._addrs_rr[key] = (cycle(addrs), len(addrs))
861 if self._ttl is not None:
862 self._timestamps[key] = monotonic()
864 if len(self._addrs_rr) > self._max_size:
865 oldest_key, _ = self._addrs_rr.popitem(last=False)
866 self._timestamps.pop(oldest_key, None)
868 def remove(self, key: tuple[str, int]) -> None:
869 self._addrs_rr.pop(key, None)
870 self._timestamps.pop(key, None)
872 def clear(self) -> None:
873 self._addrs_rr.clear()
874 self._timestamps.clear()
876 def next_addrs(self, key: tuple[str, int]) -> list[ResolveResult]:
877 loop, length = self._addrs_rr[key]
878 addrs = list(islice(loop, length))
879 # Consume one more element to shift internal state of `cycle`
880 next(loop)
881 self._addrs_rr.move_to_end(key)
882 return addrs
884 def expired(self, key: tuple[str, int]) -> bool:
885 if self._ttl is None:
886 return False
888 return self._timestamps[key] + self._ttl < monotonic()
891def _make_ssl_context(verified: bool) -> SSLContext:
892 """Create SSL context.
894 This method is not async-friendly and should be called from a thread
895 because it will load certificates from disk and do other blocking I/O.
896 """
897 if ssl is None:
898 # No ssl support
899 return None
900 if verified:
901 sslcontext = ssl.create_default_context()
902 else:
903 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
904 sslcontext.options |= ssl.OP_NO_SSLv2
905 sslcontext.options |= ssl.OP_NO_SSLv3
906 sslcontext.check_hostname = False
907 sslcontext.verify_mode = ssl.CERT_NONE
908 sslcontext.options |= ssl.OP_NO_COMPRESSION
909 sslcontext.set_default_verify_paths()
910 sslcontext.set_alpn_protocols(("http/1.1",))
911 return sslcontext
914# The default SSLContext objects are created at import time
915# since they do blocking I/O to load certificates from disk,
916# and imports should always be done before the event loop starts
917# or in a thread.
918_SSL_CONTEXT_VERIFIED = _make_ssl_context(True)
919_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False)
922class TCPConnector(BaseConnector):
923 """TCP connector.
925 verify_ssl - Set to True to check ssl certifications.
926 fingerprint - Pass the binary sha256
927 digest of the expected certificate in DER format to verify
928 that the certificate the server presents matches. See also
929 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning
930 resolver - Enable DNS lookups and use this
931 resolver
932 use_dns_cache - Use memory cache for DNS lookups.
933 ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
934 family - socket address family
935 local_addr - local tuple of (host, port) to bind socket to
937 keepalive_timeout - (optional) Keep-alive timeout.
938 force_close - Set to True to force close and do reconnect
939 after each request (and between redirects).
940 limit - The total number of simultaneous connections.
941 limit_per_host - Number of simultaneous connections to one host.
942 enable_cleanup_closed - Enables clean-up closed ssl transports.
943 Disabled by default.
944 happy_eyeballs_delay - This is the “Connection Attempt Delay”
945 as defined in RFC 8305. To disable
946 the happy eyeballs algorithm, set to None.
947 interleave - “First Address Family Count” as defined in RFC 8305
948 loop - Optional event loop.
949 socket_factory - A SocketFactoryType function that, if supplied,
950 will be used to create sockets given an
951 AddrInfoType.
952 ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0.
953 Grace period for SSL shutdown handshake on TLS
954 connections. Default is 0 seconds (immediate abort).
955 This parameter allowed for a clean SSL shutdown by
956 notifying the remote peer of connection closure,
957 while avoiding excessive delays during connector cleanup.
958 Note: Only takes effect on Python 3.11+.
959 """
961 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"})
963 def __init__(
964 self,
965 *,
966 verify_ssl: bool = True,
967 fingerprint: bytes | None = None,
968 use_dns_cache: bool = True,
969 ttl_dns_cache: int | None = 10,
970 dns_cache_max_size: int = 1000,
971 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC,
972 ssl_context: SSLContext | None = None,
973 ssl: bool | Fingerprint | SSLContext = True,
974 local_addr: tuple[str, int] | None = None,
975 resolver: AbstractResolver | None = None,
976 keepalive_timeout: None | float | object = sentinel,
977 force_close: bool = False,
978 limit: int = 100,
979 limit_per_host: int = 0,
980 enable_cleanup_closed: bool = False,
981 loop: asyncio.AbstractEventLoop | None = None,
982 timeout_ceil_threshold: float = 5,
983 happy_eyeballs_delay: float | None = 0.25,
984 interleave: int | None = None,
985 socket_factory: SocketFactoryType | None = None,
986 ssl_shutdown_timeout: _SENTINEL | None | float = sentinel,
987 ):
988 super().__init__(
989 keepalive_timeout=keepalive_timeout,
990 force_close=force_close,
991 limit=limit,
992 limit_per_host=limit_per_host,
993 enable_cleanup_closed=enable_cleanup_closed,
994 loop=loop,
995 timeout_ceil_threshold=timeout_ceil_threshold,
996 )
998 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
1000 self._resolver: AbstractResolver
1001 if resolver is None:
1002 self._resolver = DefaultResolver(loop=self._loop)
1003 self._resolver_owner = True
1004 else:
1005 self._resolver = resolver
1006 self._resolver_owner = False
1008 self._use_dns_cache = use_dns_cache
1009 self._cached_hosts = _DNSCacheTable(
1010 ttl=ttl_dns_cache, max_size=dns_cache_max_size
1011 )
1012 self._throttle_dns_futures: dict[tuple[str, int], set[asyncio.Future[None]]] = (
1013 {}
1014 )
1015 self._family = family
1016 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr)
1017 self._happy_eyeballs_delay = happy_eyeballs_delay
1018 self._interleave = interleave
1019 self._resolve_host_tasks: set[asyncio.Task[list[ResolveResult]]] = set()
1020 self._socket_factory = socket_factory
1021 self._ssl_shutdown_timeout: float | None
1022 # Handle ssl_shutdown_timeout with warning for Python < 3.11
1023 if ssl_shutdown_timeout is sentinel:
1024 self._ssl_shutdown_timeout = 0
1025 else:
1026 # Deprecation warning for ssl_shutdown_timeout parameter
1027 warnings.warn(
1028 "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0",
1029 DeprecationWarning,
1030 stacklevel=2,
1031 )
1032 if (
1033 sys.version_info < (3, 11)
1034 and ssl_shutdown_timeout is not None
1035 and ssl_shutdown_timeout != 0
1036 ):
1037 warnings.warn(
1038 f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; "
1039 "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.",
1040 RuntimeWarning,
1041 stacklevel=2,
1042 )
1043 self._ssl_shutdown_timeout = ssl_shutdown_timeout
1045 def _close(self, *, abort_ssl: bool = False) -> list[Awaitable[object]]:
1046 """Close all ongoing DNS calls."""
1047 for fut in chain.from_iterable(self._throttle_dns_futures.values()):
1048 fut.cancel()
1050 waiters = super()._close(abort_ssl=abort_ssl)
1052 for t in self._resolve_host_tasks:
1053 t.cancel()
1054 waiters.append(t)
1056 return waiters
1058 async def close(self, *, abort_ssl: bool = False) -> None:
1059 """
1060 Close all opened transports.
1062 :param abort_ssl: If True, SSL connections will be aborted immediately
1063 without performing the shutdown handshake. If False (default),
1064 the behavior is determined by ssl_shutdown_timeout:
1065 - If ssl_shutdown_timeout=0: connections are aborted
1066 - If ssl_shutdown_timeout>0: graceful shutdown is performed
1067 """
1068 # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default
1069 await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0)
1070 if self._resolver_owner:
1071 await self._resolver.close()
1073 @property
1074 def family(self) -> int:
1075 """Socket family like AF_INET."""
1076 return self._family
1078 @property
1079 def use_dns_cache(self) -> bool:
1080 """True if local DNS caching is enabled."""
1081 return self._use_dns_cache
1083 def clear_dns_cache(self, host: str | None = None, port: int | None = None) -> None:
1084 """Remove specified host/port or clear all dns local cache."""
1085 if host is not None and port is not None:
1086 self._cached_hosts.remove((host, port))
1087 elif host is not None or port is not None:
1088 raise ValueError("either both host and port or none of them are allowed")
1089 else:
1090 self._cached_hosts.clear()
1092 async def _resolve_host(
1093 self, host: str, port: int, traces: Sequence["Trace"] | None = None
1094 ) -> list[ResolveResult]:
1095 """Resolve host and return list of addresses."""
1096 if is_ip_address(host):
1097 # Reject legacy numeric IPv4 forms (e.g. 2130706433, 127.1) that
1098 # socket would map onto an address, slipping past a connector-level
1099 # policy that only sees the raw host.
1100 if ":" not in host and not is_canonical_ipv4_address(host):
1101 raise InvalidUrlClientError(host, "is not a canonical IPv4 address")
1102 return [
1103 {
1104 "hostname": host,
1105 "host": host,
1106 "port": port,
1107 "family": self._family,
1108 "proto": 0,
1109 "flags": 0,
1110 }
1111 ]
1113 if not self._use_dns_cache:
1115 if traces:
1116 for trace in traces:
1117 await trace.send_dns_resolvehost_start(host)
1119 if self._closed:
1120 raise ClientConnectionError("Connector is closed")
1122 res = await self._resolver.resolve(host, port, family=self._family)
1124 if traces:
1125 for trace in traces:
1126 await trace.send_dns_resolvehost_end(host)
1128 return res
1130 key = (host, port)
1131 if key in self._cached_hosts and not self._cached_hosts.expired(key):
1132 # get result early, before any await (#4014)
1133 result = self._cached_hosts.next_addrs(key)
1135 if traces:
1136 for trace in traces:
1137 await trace.send_dns_cache_hit(host)
1138 return result
1140 futures: set[asyncio.Future[None]]
1141 #
1142 # If multiple connectors are resolving the same host, we wait
1143 # for the first one to resolve and then use the result for all of them.
1144 # We use a throttle to ensure that we only resolve the host once
1145 # and then use the result for all the waiters.
1146 #
1147 if key in self._throttle_dns_futures:
1148 # get futures early, before any await (#4014)
1149 futures = self._throttle_dns_futures[key]
1150 future: asyncio.Future[None] = self._loop.create_future()
1151 futures.add(future)
1152 if traces:
1153 for trace in traces:
1154 await trace.send_dns_cache_hit(host)
1155 try:
1156 await future
1157 finally:
1158 futures.discard(future)
1159 return self._cached_hosts.next_addrs(key)
1161 # update dict early, before any await (#4014)
1162 self._throttle_dns_futures[key] = futures = set()
1163 # In this case we need to create a task to ensure that we can shield
1164 # the task from cancellation as cancelling this lookup should not cancel
1165 # the underlying lookup or else the cancel event will get broadcast to
1166 # all the waiters across all connections.
1167 #
1168 coro = self._resolve_host_with_throttle(key, host, port, futures, traces)
1169 loop = asyncio.get_running_loop()
1170 if sys.version_info >= (3, 12):
1171 # Optimization for Python 3.12, try to send immediately
1172 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True)
1173 else:
1174 resolved_host_task = loop.create_task(coro)
1176 if not resolved_host_task.done():
1177 self._resolve_host_tasks.add(resolved_host_task)
1178 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard)
1180 try:
1181 return await asyncio.shield(resolved_host_task)
1182 except asyncio.CancelledError:
1184 def drop_exception(fut: "asyncio.Future[list[ResolveResult]]") -> None:
1185 with suppress(Exception, asyncio.CancelledError):
1186 fut.result()
1188 resolved_host_task.add_done_callback(drop_exception)
1189 raise
1191 async def _resolve_host_with_throttle(
1192 self,
1193 key: tuple[str, int],
1194 host: str,
1195 port: int,
1196 futures: set["asyncio.Future[None]"],
1197 traces: Sequence["Trace"] | None,
1198 ) -> list[ResolveResult]:
1199 """Resolve host and set result for all waiters.
1201 This method must be run in a task and shielded from cancellation
1202 to avoid cancelling the underlying lookup.
1203 """
1204 try:
1205 if traces:
1206 for trace in traces:
1207 await trace.send_dns_cache_miss(host)
1209 for trace in traces:
1210 await trace.send_dns_resolvehost_start(host)
1212 addrs = await self._resolver.resolve(host, port, family=self._family)
1213 if traces:
1214 for trace in traces:
1215 await trace.send_dns_resolvehost_end(host)
1217 self._cached_hosts.add(key, addrs)
1218 for fut in futures:
1219 set_result(fut, None)
1220 except BaseException as e:
1221 # any DNS exception is set for the waiters to raise the same exception.
1222 # This coro is always run in task that is shielded from cancellation so
1223 # we should never be propagating cancellation here.
1224 for fut in futures:
1225 set_exception(fut, e)
1226 raise
1227 finally:
1228 self._throttle_dns_futures.pop(key)
1230 return self._cached_hosts.next_addrs(key)
1232 async def _create_connection(
1233 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
1234 ) -> ResponseHandler:
1235 """Create connection.
1237 Has same keyword arguments as BaseEventLoop.create_connection.
1238 """
1239 if req.proxy:
1240 _, proto = await self._create_proxy_connection(req, traces, timeout)
1241 else:
1242 _, proto = await self._create_direct_connection(req, traces, timeout)
1244 return proto
1246 def _get_ssl_context(self, req: ClientRequest) -> SSLContext | None:
1247 """Logic to get the correct SSL context
1249 0. if req.ssl is false, return None
1251 1. if ssl_context is specified in req, use it
1252 2. if _ssl_context is specified in self, use it
1253 3. otherwise:
1254 1. if verify_ssl is not specified in req, use self.ssl_context
1255 (will generate a default context according to self.verify_ssl)
1256 2. if verify_ssl is True in req, generate a default SSL context
1257 3. if verify_ssl is False in req, generate a SSL context that
1258 won't verify
1259 """
1260 if not req.is_ssl():
1261 return None
1263 if ssl is None: # pragma: no cover
1264 raise RuntimeError("SSL is not supported.")
1265 sslcontext = req.ssl
1266 if isinstance(sslcontext, ssl.SSLContext):
1267 return sslcontext
1268 if sslcontext is not True:
1269 # not verified or fingerprinted
1270 return _SSL_CONTEXT_UNVERIFIED
1271 sslcontext = self._ssl
1272 if isinstance(sslcontext, ssl.SSLContext):
1273 return sslcontext
1274 if sslcontext is not True:
1275 # not verified or fingerprinted
1276 return _SSL_CONTEXT_UNVERIFIED
1277 return _SSL_CONTEXT_VERIFIED
1279 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]:
1280 ret = req.ssl
1281 if isinstance(ret, Fingerprint):
1282 return ret
1283 ret = self._ssl
1284 if isinstance(ret, Fingerprint):
1285 return ret
1286 return None
1288 async def _wrap_create_connection(
1289 self,
1290 *args: Any,
1291 addr_infos: list[AddrInfoType],
1292 req: ClientRequest,
1293 timeout: "ClientTimeout",
1294 client_error: type[Exception] = ClientConnectorError,
1295 **kwargs: Any,
1296 ) -> tuple[asyncio.Transport, ResponseHandler]:
1297 try:
1298 async with ceil_timeout(
1299 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1300 ):
1301 sock = await aiohappyeyeballs.start_connection(
1302 addr_infos=addr_infos,
1303 local_addr_infos=self._local_addr_infos,
1304 happy_eyeballs_delay=self._happy_eyeballs_delay,
1305 interleave=self._interleave,
1306 loop=self._loop,
1307 socket_factory=self._socket_factory,
1308 )
1309 # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used
1310 if (
1311 kwargs.get("ssl")
1312 and self._ssl_shutdown_timeout
1313 and sys.version_info >= (3, 11)
1314 ):
1315 kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout
1316 return await self._loop.create_connection(*args, **kwargs, sock=sock)
1317 except cert_errors as exc:
1318 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1319 except ssl_errors as exc:
1320 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1321 except OSError as exc:
1322 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1323 raise
1324 raise client_error(req.connection_key, exc) from exc
1326 async def _wrap_existing_connection(
1327 self,
1328 *args: Any,
1329 req: ClientRequest,
1330 timeout: "ClientTimeout",
1331 client_error: type[Exception] = ClientConnectorError,
1332 **kwargs: Any,
1333 ) -> tuple[asyncio.Transport, ResponseHandler]:
1334 try:
1335 async with ceil_timeout(
1336 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1337 ):
1338 return await self._loop.create_connection(*args, **kwargs)
1339 except cert_errors as exc:
1340 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1341 except ssl_errors as exc:
1342 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1343 except OSError as exc:
1344 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1345 raise
1346 raise client_error(req.connection_key, exc) from exc
1348 def _fail_on_no_start_tls(self, req: "ClientRequest") -> None:
1349 """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``.
1351 It is necessary for TLS-in-TLS so that it is possible to
1352 send HTTPS queries through HTTPS proxies.
1354 This doesn't affect regular HTTP requests, though.
1355 """
1356 if not req.is_ssl():
1357 return
1359 proxy_url = req.proxy
1360 assert proxy_url is not None
1361 if proxy_url.scheme != "https":
1362 return
1364 self._check_loop_for_start_tls()
1366 def _check_loop_for_start_tls(self) -> None:
1367 try:
1368 self._loop.start_tls
1369 except AttributeError as attr_exc:
1370 raise RuntimeError(
1371 "An HTTPS request is being sent through an HTTPS proxy. "
1372 "This needs support for TLS in TLS but it is not implemented "
1373 "in your runtime for the stdlib asyncio.\n\n"
1374 "Please upgrade to Python 3.11 or higher. For more details, "
1375 "please see:\n"
1376 "* https://bugs.python.org/issue37179\n"
1377 "* https://github.com/python/cpython/pull/28073\n"
1378 "* https://docs.aiohttp.org/en/stable/"
1379 "client_advanced.html#proxy-support\n"
1380 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1381 ) from attr_exc
1383 def _loop_supports_start_tls(self) -> bool:
1384 try:
1385 self._check_loop_for_start_tls()
1386 except RuntimeError:
1387 return False
1388 else:
1389 return True
1391 def _warn_about_tls_in_tls(
1392 self,
1393 underlying_transport: asyncio.Transport,
1394 req: ClientRequest,
1395 ) -> None:
1396 """Issue a warning if the requested URL has HTTPS scheme."""
1397 if req.request_info.url.scheme != "https":
1398 return
1400 # TLS-in-TLS only applies when the proxy itself is HTTPS.
1401 # When the proxy is HTTP, start_tls upgrades a plain TCP connection,
1402 # which is standard TLS and works on all event loops and Python versions.
1403 if req.proxy is None or req.proxy.scheme != "https":
1404 return
1406 # Check if uvloop is being used, which supports TLS in TLS,
1407 # otherwise assume that asyncio's native transport is being used.
1408 if type(underlying_transport).__module__.startswith("uvloop"):
1409 return
1411 # Support in asyncio was added in Python 3.11 (bpo-44011)
1412 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr(
1413 underlying_transport,
1414 "_start_tls_compatible",
1415 False,
1416 )
1418 if asyncio_supports_tls_in_tls:
1419 return
1421 warnings.warn(
1422 "An HTTPS request is being sent through an HTTPS proxy. "
1423 "This support for TLS in TLS is known to be disabled "
1424 "in the stdlib asyncio (Python <3.11). This is why you'll probably see "
1425 "an error in the log below.\n\n"
1426 "It is possible to enable it via monkeypatching. "
1427 "For more details, see:\n"
1428 "* https://bugs.python.org/issue37179\n"
1429 "* https://github.com/python/cpython/pull/28073\n\n"
1430 "You can temporarily patch this as follows:\n"
1431 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
1432 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1433 RuntimeWarning,
1434 source=self,
1435 # Why `4`? At least 3 of the calls in the stack originate
1436 # from the methods in this class.
1437 stacklevel=3,
1438 )
1440 async def _start_tls_connection(
1441 self,
1442 underlying_transport: asyncio.Transport,
1443 req: ClientRequest,
1444 timeout: "ClientTimeout",
1445 client_error: type[Exception] = ClientConnectorError,
1446 ) -> tuple[asyncio.BaseTransport, ResponseHandler]:
1447 """Wrap the raw TCP transport with TLS."""
1448 tls_proto = self._factory() # Create a brand new proto for TLS
1449 sslcontext = self._get_ssl_context(req)
1450 if TYPE_CHECKING:
1451 # _start_tls_connection is unreachable in the current code path
1452 # if sslcontext is None.
1453 assert sslcontext is not None
1455 try:
1456 async with ceil_timeout(
1457 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1458 ):
1459 try:
1460 # ssl_shutdown_timeout is only available in Python 3.11+
1461 if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout:
1462 tls_transport = await self._loop.start_tls(
1463 underlying_transport,
1464 tls_proto,
1465 sslcontext,
1466 server_hostname=req.server_hostname or req.host,
1467 ssl_handshake_timeout=timeout.total or None,
1468 ssl_shutdown_timeout=self._ssl_shutdown_timeout,
1469 )
1470 else:
1471 tls_transport = await self._loop.start_tls(
1472 underlying_transport,
1473 tls_proto,
1474 sslcontext,
1475 server_hostname=req.server_hostname or req.host,
1476 ssl_handshake_timeout=timeout.total or None,
1477 )
1478 except BaseException:
1479 # We need to close the underlying transport since
1480 # `start_tls()` probably failed before it had a
1481 # chance to do this:
1482 if self._ssl_shutdown_timeout == 0:
1483 underlying_transport.abort()
1484 else:
1485 underlying_transport.close()
1486 raise
1487 if isinstance(tls_transport, asyncio.Transport):
1488 fingerprint = self._get_fingerprint(req)
1489 if fingerprint:
1490 try:
1491 fingerprint.check(tls_transport)
1492 except ServerFingerprintMismatch:
1493 tls_transport.close()
1494 if not self._cleanup_closed_disabled:
1495 self._cleanup_closed_transports.append(tls_transport)
1496 raise
1497 except cert_errors as exc:
1498 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1499 except ssl_errors as exc:
1500 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1501 except OSError as exc:
1502 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1503 raise
1504 raise client_error(req.connection_key, exc) from exc
1505 except TypeError as type_err:
1506 # Example cause looks like this:
1507 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
1508 # object at 0x7f760615e460> is not supported by start_tls()
1510 raise ClientConnectionError(
1511 "Cannot initialize a TLS-in-TLS connection to host "
1512 f"{req.host!s}:{req.port:d} through an underlying connection "
1513 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
1514 f"[{type_err!s}]"
1515 ) from type_err
1516 else:
1517 if tls_transport is None:
1518 msg = "Failed to start TLS (possibly caused by closing transport)"
1519 raise client_error(req.connection_key, OSError(msg))
1520 tls_proto.connection_made(
1521 tls_transport
1522 ) # Kick the state machine of the new TLS protocol
1524 return tls_transport, tls_proto
1526 def _convert_hosts_to_addr_infos(
1527 self, hosts: list[ResolveResult]
1528 ) -> list[AddrInfoType]:
1529 """Converts the list of hosts to a list of addr_infos.
1531 The list of hosts is the result of a DNS lookup. The list of
1532 addr_infos is the result of a call to `socket.getaddrinfo()`.
1533 """
1534 addr_infos: list[AddrInfoType] = []
1535 for hinfo in hosts:
1536 host = hinfo["host"]
1537 is_ipv6 = ":" in host
1538 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET
1539 if self._family and self._family != family:
1540 continue
1541 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"])
1542 addr_infos.append(
1543 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)
1544 )
1545 return addr_infos
1547 async def _create_direct_connection(
1548 self,
1549 req: ClientRequest,
1550 traces: list["Trace"],
1551 timeout: "ClientTimeout",
1552 *,
1553 client_error: type[Exception] = ClientConnectorError,
1554 ) -> tuple[asyncio.Transport, ResponseHandler]:
1555 sslcontext = self._get_ssl_context(req)
1556 fingerprint = self._get_fingerprint(req)
1558 host = req.url.raw_host
1559 assert host is not None
1560 # Replace multiple trailing dots with a single one.
1561 # A trailing dot is only present for fully-qualified domain names.
1562 # See https://github.com/aio-libs/aiohttp/pull/7364.
1563 if host.endswith(".."):
1564 host = host.rstrip(".") + "."
1565 port = req.port
1566 assert port is not None
1567 try:
1568 # Cancelling this lookup should not cancel the underlying lookup
1569 # or else the cancel event will get broadcast to all the waiters
1570 # across all connections.
1571 hosts = await self._resolve_host(host, port, traces=traces)
1572 except OSError as exc:
1573 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1574 raise
1575 # in case of proxy it is not ClientProxyConnectionError
1576 # it is problem of resolving proxy ip itself
1577 raise ClientConnectorDNSError(req.connection_key, exc) from exc
1579 last_exc: Exception | None = None
1580 addr_infos = self._convert_hosts_to_addr_infos(hosts)
1581 while addr_infos:
1582 # Strip trailing dots, certificates contain FQDN without dots.
1583 # See https://github.com/aio-libs/aiohttp/issues/3636
1584 server_hostname = (
1585 (req.server_hostname or host).rstrip(".") if sslcontext else None
1586 )
1588 try:
1589 transp, proto = await self._wrap_create_connection(
1590 self._factory,
1591 timeout=timeout,
1592 ssl=sslcontext,
1593 addr_infos=addr_infos,
1594 server_hostname=server_hostname,
1595 req=req,
1596 client_error=client_error,
1597 )
1598 except (ClientConnectorError, asyncio.TimeoutError) as exc:
1599 last_exc = exc
1600 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave)
1601 continue
1603 if req.is_ssl() and fingerprint:
1604 try:
1605 fingerprint.check(transp)
1606 except ServerFingerprintMismatch as exc:
1607 transp.close()
1608 if not self._cleanup_closed_disabled:
1609 self._cleanup_closed_transports.append(transp)
1610 last_exc = exc
1611 # Remove the bad peer from the list of addr_infos
1612 sock: socket.socket = transp.get_extra_info("socket")
1613 bad_peer = sock.getpeername()
1614 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer)
1615 continue
1617 return transp, proto
1618 else:
1619 assert last_exc is not None
1620 raise last_exc
1622 async def _create_proxy_connection(
1623 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
1624 ) -> tuple[asyncio.BaseTransport, ResponseHandler]:
1625 self._fail_on_no_start_tls(req)
1626 runtime_has_start_tls = self._loop_supports_start_tls()
1627 proxy_req = self._update_proxy_auth_header_and_build_proxy_req(req)
1629 # create connection to proxy server
1630 transport, proto = await self._create_direct_connection(
1631 proxy_req, [], timeout, client_error=ClientProxyConnectionError
1632 )
1634 if req.is_ssl():
1635 if runtime_has_start_tls:
1636 self._warn_about_tls_in_tls(transport, req)
1638 # For HTTPS requests over HTTP proxy
1639 # we must notify proxy to tunnel connection
1640 # so we send CONNECT command:
1641 # CONNECT www.python.org:443 HTTP/1.1
1642 # Host: www.python.org
1643 #
1644 # next we must do TLS handshake and so on
1645 # to do this we must wrap raw socket into secure one
1646 # asyncio handles this perfectly
1647 proxy_req.method = hdrs.METH_CONNECT
1648 proxy_req.url = req.url
1649 key = req.connection_key._replace(
1650 proxy=None, proxy_auth=None, proxy_headers_hash=None
1651 )
1652 conn = _ConnectTunnelConnection(self, key, proto, self._loop)
1653 proxy_resp = await proxy_req.send(conn)
1654 try:
1655 protocol = conn._protocol
1656 assert protocol is not None
1658 # read_until_eof=True will ensure the connection isn't closed
1659 # once the response is received and processed allowing
1660 # START_TLS to work on the connection below.
1661 protocol.set_response_params(
1662 read_until_eof=runtime_has_start_tls,
1663 timeout_ceil_threshold=self._timeout_ceil_threshold,
1664 )
1665 resp = await proxy_resp.start(conn)
1666 except BaseException:
1667 proxy_resp.close()
1668 conn.close()
1669 raise
1670 else:
1671 conn._protocol = None
1672 try:
1673 if resp.status != 200:
1674 message = resp.reason
1675 if message is None:
1676 message = HTTPStatus(resp.status).phrase
1677 raise ClientHttpProxyError(
1678 proxy_resp.request_info,
1679 resp.history,
1680 status=resp.status,
1681 message=message,
1682 headers=resp.headers,
1683 )
1684 if not runtime_has_start_tls:
1685 rawsock = transport.get_extra_info("socket", default=None)
1686 if rawsock is None:
1687 raise RuntimeError(
1688 "Transport does not expose socket instance"
1689 )
1690 # Duplicate the socket, so now we can close proxy transport
1691 rawsock = rawsock.dup()
1692 except BaseException:
1693 # It shouldn't be closed in `finally` because it's fed to
1694 # `loop.start_tls()` and the docs say not to touch it after
1695 # passing there.
1696 transport.close()
1697 raise
1698 finally:
1699 if not runtime_has_start_tls:
1700 transport.close()
1702 if not runtime_has_start_tls:
1703 # HTTP proxy with support for upgrade to HTTPS
1704 sslcontext = self._get_ssl_context(req)
1705 return await self._wrap_existing_connection(
1706 self._factory,
1707 timeout=timeout,
1708 ssl=sslcontext,
1709 sock=rawsock,
1710 server_hostname=req.host,
1711 req=req,
1712 )
1714 return await self._start_tls_connection(
1715 # Access the old transport for the last time before it's
1716 # closed and forgotten forever:
1717 transport,
1718 req=req,
1719 timeout=timeout,
1720 )
1721 finally:
1722 proxy_resp.close()
1724 return transport, proto
1727class UnixConnector(BaseConnector):
1728 """Unix socket connector.
1730 path - Unix socket path.
1731 keepalive_timeout - (optional) Keep-alive timeout.
1732 force_close - Set to True to force close and do reconnect
1733 after each request (and between redirects).
1734 limit - The total number of simultaneous connections.
1735 limit_per_host - Number of simultaneous connections to one host.
1736 loop - Optional event loop.
1737 """
1739 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"})
1741 def __init__(
1742 self,
1743 path: str,
1744 force_close: bool = False,
1745 keepalive_timeout: object | float | None = sentinel,
1746 limit: int = 100,
1747 limit_per_host: int = 0,
1748 loop: asyncio.AbstractEventLoop | None = None,
1749 ) -> None:
1750 super().__init__(
1751 force_close=force_close,
1752 keepalive_timeout=keepalive_timeout,
1753 limit=limit,
1754 limit_per_host=limit_per_host,
1755 loop=loop,
1756 )
1757 self._path = path
1759 @property
1760 def path(self) -> str:
1761 """Path to unix socket."""
1762 return self._path
1764 async def _create_connection(
1765 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
1766 ) -> ResponseHandler:
1767 try:
1768 async with ceil_timeout(
1769 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1770 ):
1771 _, proto = await self._loop.create_unix_connection(
1772 self._factory, self._path
1773 )
1774 except OSError as exc:
1775 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1776 raise
1777 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
1779 return proto
1782class NamedPipeConnector(BaseConnector):
1783 """Named pipe connector.
1785 Only supported by the proactor event loop.
1786 See also: https://docs.python.org/3/library/asyncio-eventloop.html
1788 path - Windows named pipe path.
1789 keepalive_timeout - (optional) Keep-alive timeout.
1790 force_close - Set to True to force close and do reconnect
1791 after each request (and between redirects).
1792 limit - The total number of simultaneous connections.
1793 limit_per_host - Number of simultaneous connections to one host.
1794 loop - Optional event loop.
1795 """
1797 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"})
1799 def __init__(
1800 self,
1801 path: str,
1802 force_close: bool = False,
1803 keepalive_timeout: object | float | None = sentinel,
1804 limit: int = 100,
1805 limit_per_host: int = 0,
1806 loop: asyncio.AbstractEventLoop | None = None,
1807 ) -> None:
1808 super().__init__(
1809 force_close=force_close,
1810 keepalive_timeout=keepalive_timeout,
1811 limit=limit,
1812 limit_per_host=limit_per_host,
1813 loop=loop,
1814 )
1815 if not isinstance(
1816 self._loop,
1817 asyncio.ProactorEventLoop, # type: ignore[attr-defined]
1818 ):
1819 raise RuntimeError(
1820 "Named Pipes only available in proactor loop under windows"
1821 )
1822 self._path = path
1824 @property
1825 def path(self) -> str:
1826 """Path to the named pipe."""
1827 return self._path
1829 async def _create_connection(
1830 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout"
1831 ) -> ResponseHandler:
1832 try:
1833 async with ceil_timeout(
1834 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1835 ):
1836 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined]
1837 self._factory, self._path
1838 )
1839 # the drain is required so that the connection_made is called
1840 # and transport is set otherwise it is not set before the
1841 # `assert conn.transport is not None`
1842 # in client.py's _request method
1843 await asyncio.sleep(0)
1844 # other option is to manually set transport like
1845 # `proto.transport = trans`
1846 except OSError as exc:
1847 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1848 raise
1849 raise ClientConnectorError(req.connection_key, exc) from exc
1851 return cast(ResponseHandler, proto)