1import asyncio
2import functools
3import random
4import socket
5import sys
6import traceback
7import warnings
8from collections import OrderedDict, defaultdict, deque
9from contextlib import suppress
10from http import HTTPStatus
11from itertools import chain, cycle, islice
12from time import monotonic
13from types import TracebackType
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Awaitable,
18 Callable,
19 DefaultDict,
20 Deque,
21 Dict,
22 Iterator,
23 List,
24 Literal,
25 Optional,
26 Sequence,
27 Set,
28 Tuple,
29 Type,
30 Union,
31 cast,
32)
33
34import aiohappyeyeballs
35from aiohappyeyeballs import AddrInfoType, SocketFactoryType
36
37from . import hdrs, helpers
38from .abc import AbstractResolver, ResolveResult
39from .client_exceptions import (
40 ClientConnectionError,
41 ClientConnectorCertificateError,
42 ClientConnectorDNSError,
43 ClientConnectorError,
44 ClientConnectorSSLError,
45 ClientHttpProxyError,
46 ClientProxyConnectionError,
47 ServerFingerprintMismatch,
48 UnixClientConnectorError,
49 cert_errors,
50 ssl_errors,
51)
52from .client_proto import ResponseHandler
53from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params
54from .helpers import (
55 _SENTINEL,
56 ceil_timeout,
57 is_ip_address,
58 noop,
59 sentinel,
60 set_exception,
61 set_result,
62)
63from .log import client_logger
64from .resolver import DefaultResolver
65
66if sys.version_info >= (3, 12):
67 from collections.abc import Buffer
68else:
69 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
70
71if TYPE_CHECKING:
72 import ssl
73
74 SSLContext = ssl.SSLContext
75else:
76 try:
77 import ssl
78
79 SSLContext = ssl.SSLContext
80 except ImportError: # pragma: no cover
81 ssl = None # type: ignore[assignment]
82 SSLContext = object # type: ignore[misc,assignment]
83
84EMPTY_SCHEMA_SET = frozenset({""})
85HTTP_SCHEMA_SET = frozenset({"http", "https"})
86WS_SCHEMA_SET = frozenset({"ws", "wss"})
87
88HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET
89HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET
90
91NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < (
92 3,
93 13,
94 1,
95) or sys.version_info < (3, 12, 7)
96# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960
97# which first appeared in Python 3.12.7 and 3.13.1
98
99
100__all__ = (
101 "BaseConnector",
102 "TCPConnector",
103 "UnixConnector",
104 "NamedPipeConnector",
105 "AddrInfoType",
106 "SocketFactoryType",
107)
108
109
110if TYPE_CHECKING:
111 from .client import ClientTimeout
112 from .client_reqrep import ConnectionKey
113 from .tracing import Trace
114
115
116class _DeprecationWaiter:
117 __slots__ = ("_awaitable", "_awaited")
118
119 def __init__(self, awaitable: Awaitable[Any]) -> None:
120 self._awaitable = awaitable
121 self._awaited = False
122
123 def __await__(self) -> Any:
124 self._awaited = True
125 return self._awaitable.__await__()
126
127 def __del__(self) -> None:
128 if not self._awaited:
129 warnings.warn(
130 "Connector.close() is a coroutine, "
131 "please use await connector.close()",
132 DeprecationWarning,
133 )
134
135
136async def _wait_for_close(waiters: List[Awaitable[object]]) -> None:
137 """Wait for all waiters to finish closing."""
138 results = await asyncio.gather(*waiters, return_exceptions=True)
139 for res in results:
140 if isinstance(res, Exception):
141 client_logger.debug("Error while closing connector: %r", res)
142
143
144class Connection:
145
146 _source_traceback = None
147
148 def __init__(
149 self,
150 connector: "BaseConnector",
151 key: "ConnectionKey",
152 protocol: ResponseHandler,
153 loop: asyncio.AbstractEventLoop,
154 ) -> None:
155 self._key = key
156 self._connector = connector
157 self._loop = loop
158 self._protocol: Optional[ResponseHandler] = protocol
159 self._callbacks: List[Callable[[], None]] = []
160
161 if loop.get_debug():
162 self._source_traceback = traceback.extract_stack(sys._getframe(1))
163
164 def __repr__(self) -> str:
165 return f"Connection<{self._key}>"
166
167 def __del__(self, _warnings: Any = warnings) -> None:
168 if self._protocol is not None:
169 kwargs = {"source": self}
170 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs)
171 if self._loop.is_closed():
172 return
173
174 self._connector._release(self._key, self._protocol, should_close=True)
175
176 context = {"client_connection": self, "message": "Unclosed connection"}
177 if self._source_traceback is not None:
178 context["source_traceback"] = self._source_traceback
179 self._loop.call_exception_handler(context)
180
181 def __bool__(self) -> Literal[True]:
182 """Force subclasses to not be falsy, to make checks simpler."""
183 return True
184
185 @property
186 def loop(self) -> asyncio.AbstractEventLoop:
187 warnings.warn(
188 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2
189 )
190 return self._loop
191
192 @property
193 def transport(self) -> Optional[asyncio.Transport]:
194 if self._protocol is None:
195 return None
196 return self._protocol.transport
197
198 @property
199 def protocol(self) -> Optional[ResponseHandler]:
200 return self._protocol
201
202 def add_callback(self, callback: Callable[[], None]) -> None:
203 if callback is not None:
204 self._callbacks.append(callback)
205
206 def _notify_release(self) -> None:
207 callbacks, self._callbacks = self._callbacks[:], []
208
209 for cb in callbacks:
210 with suppress(Exception):
211 cb()
212
213 def close(self) -> None:
214 self._notify_release()
215
216 if self._protocol is not None:
217 self._connector._release(self._key, self._protocol, should_close=True)
218 self._protocol = None
219
220 def release(self) -> None:
221 self._notify_release()
222
223 if self._protocol is not None:
224 self._connector._release(self._key, self._protocol)
225 self._protocol = None
226
227 @property
228 def closed(self) -> bool:
229 return self._protocol is None or not self._protocol.is_connected()
230
231
232class _TransportPlaceholder:
233 """placeholder for BaseConnector.connect function"""
234
235 __slots__ = ("closed", "transport")
236
237 def __init__(self, closed_future: asyncio.Future[Optional[Exception]]) -> None:
238 """Initialize a placeholder for a transport."""
239 self.closed = closed_future
240 self.transport = None
241
242 def close(self) -> None:
243 """Close the placeholder."""
244
245 def abort(self) -> None:
246 """Abort the placeholder (does nothing)."""
247
248
249class BaseConnector:
250 """Base connector class.
251
252 keepalive_timeout - (optional) Keep-alive timeout.
253 force_close - Set to True to force close and do reconnect
254 after each request (and between redirects).
255 limit - The total number of simultaneous connections.
256 limit_per_host - Number of simultaneous connections to one host.
257 enable_cleanup_closed - Enables clean-up closed ssl transports.
258 Disabled by default.
259 timeout_ceil_threshold - Trigger ceiling of timeout values when
260 it's above timeout_ceil_threshold.
261 loop - Optional event loop.
262 """
263
264 _closed = True # prevent AttributeError in __del__ if ctor was failed
265 _source_traceback = None
266
267 # abort transport after 2 seconds (cleanup broken connections)
268 _cleanup_closed_period = 2.0
269
270 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET
271
272 def __init__(
273 self,
274 *,
275 keepalive_timeout: Union[object, None, float] = sentinel,
276 force_close: bool = False,
277 limit: int = 100,
278 limit_per_host: int = 0,
279 enable_cleanup_closed: bool = False,
280 loop: Optional[asyncio.AbstractEventLoop] = None,
281 timeout_ceil_threshold: float = 5,
282 ) -> None:
283
284 if force_close:
285 if keepalive_timeout is not None and keepalive_timeout is not sentinel:
286 raise ValueError(
287 "keepalive_timeout cannot be set if force_close is True"
288 )
289 else:
290 if keepalive_timeout is sentinel:
291 keepalive_timeout = 15.0
292
293 loop = loop or asyncio.get_running_loop()
294 self._timeout_ceil_threshold = timeout_ceil_threshold
295
296 self._closed = False
297 if loop.get_debug():
298 self._source_traceback = traceback.extract_stack(sys._getframe(1))
299
300 # Connection pool of reusable connections.
301 # We use a deque to store connections because it has O(1) popleft()
302 # and O(1) append() operations to implement a FIFO queue.
303 self._conns: DefaultDict[
304 ConnectionKey, Deque[Tuple[ResponseHandler, float]]
305 ] = defaultdict(deque)
306 self._limit = limit
307 self._limit_per_host = limit_per_host
308 self._acquired: Set[ResponseHandler] = set()
309 self._acquired_per_host: DefaultDict[ConnectionKey, Set[ResponseHandler]] = (
310 defaultdict(set)
311 )
312 self._keepalive_timeout = cast(float, keepalive_timeout)
313 self._force_close = force_close
314
315 # {host_key: FIFO list of waiters}
316 # The FIFO is implemented with an OrderedDict with None keys because
317 # python does not have an ordered set.
318 self._waiters: DefaultDict[
319 ConnectionKey, OrderedDict[asyncio.Future[None], None]
320 ] = defaultdict(OrderedDict)
321
322 self._loop = loop
323 self._factory = functools.partial(ResponseHandler, loop=loop)
324
325 # start keep-alive connection cleanup task
326 self._cleanup_handle: Optional[asyncio.TimerHandle] = None
327
328 # start cleanup closed transports task
329 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None
330
331 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED:
332 warnings.warn(
333 "enable_cleanup_closed ignored because "
334 "https://github.com/python/cpython/pull/118960 is fixed "
335 f"in Python version {sys.version_info}",
336 DeprecationWarning,
337 stacklevel=2,
338 )
339 enable_cleanup_closed = False
340
341 self._cleanup_closed_disabled = not enable_cleanup_closed
342 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = []
343 self._placeholder_future: asyncio.Future[Optional[Exception]] = (
344 loop.create_future()
345 )
346 self._placeholder_future.set_result(None)
347 self._cleanup_closed()
348
349 def __del__(self, _warnings: Any = warnings) -> None:
350 if self._closed:
351 return
352 if not self._conns:
353 return
354
355 conns = [repr(c) for c in self._conns.values()]
356
357 self._close()
358
359 kwargs = {"source": self}
360 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs)
361 context = {
362 "connector": self,
363 "connections": conns,
364 "message": "Unclosed connector",
365 }
366 if self._source_traceback is not None:
367 context["source_traceback"] = self._source_traceback
368 self._loop.call_exception_handler(context)
369
370 def __enter__(self) -> "BaseConnector":
371 warnings.warn(
372 '"with Connector():" is deprecated, '
373 'use "async with Connector():" instead',
374 DeprecationWarning,
375 )
376 return self
377
378 def __exit__(self, *exc: Any) -> None:
379 self._close()
380
381 async def __aenter__(self) -> "BaseConnector":
382 return self
383
384 async def __aexit__(
385 self,
386 exc_type: Optional[Type[BaseException]] = None,
387 exc_value: Optional[BaseException] = None,
388 exc_traceback: Optional[TracebackType] = None,
389 ) -> None:
390 await self.close()
391
392 @property
393 def force_close(self) -> bool:
394 """Ultimately close connection on releasing if True."""
395 return self._force_close
396
397 @property
398 def limit(self) -> int:
399 """The total number for simultaneous connections.
400
401 If limit is 0 the connector has no limit.
402 The default limit size is 100.
403 """
404 return self._limit
405
406 @property
407 def limit_per_host(self) -> int:
408 """The limit for simultaneous connections to the same endpoint.
409
410 Endpoints are the same if they are have equal
411 (host, port, is_ssl) triple.
412 """
413 return self._limit_per_host
414
415 def _cleanup(self) -> None:
416 """Cleanup unused transports."""
417 if self._cleanup_handle:
418 self._cleanup_handle.cancel()
419 # _cleanup_handle should be unset, otherwise _release() will not
420 # recreate it ever!
421 self._cleanup_handle = None
422
423 now = monotonic()
424 timeout = self._keepalive_timeout
425
426 if self._conns:
427 connections = defaultdict(deque)
428 deadline = now - timeout
429 for key, conns in self._conns.items():
430 alive: Deque[Tuple[ResponseHandler, float]] = deque()
431 for proto, use_time in conns:
432 if proto.is_connected() and use_time - deadline >= 0:
433 alive.append((proto, use_time))
434 continue
435 transport = proto.transport
436 proto.close()
437 if not self._cleanup_closed_disabled and key.is_ssl:
438 self._cleanup_closed_transports.append(transport)
439
440 if alive:
441 connections[key] = alive
442
443 self._conns = connections
444
445 if self._conns:
446 self._cleanup_handle = helpers.weakref_handle(
447 self,
448 "_cleanup",
449 timeout,
450 self._loop,
451 timeout_ceil_threshold=self._timeout_ceil_threshold,
452 )
453
454 def _cleanup_closed(self) -> None:
455 """Double confirmation for transport close.
456
457 Some broken ssl servers may leave socket open without proper close.
458 """
459 if self._cleanup_closed_handle:
460 self._cleanup_closed_handle.cancel()
461
462 for transport in self._cleanup_closed_transports:
463 if transport is not None:
464 transport.abort()
465
466 self._cleanup_closed_transports = []
467
468 if not self._cleanup_closed_disabled:
469 self._cleanup_closed_handle = helpers.weakref_handle(
470 self,
471 "_cleanup_closed",
472 self._cleanup_closed_period,
473 self._loop,
474 timeout_ceil_threshold=self._timeout_ceil_threshold,
475 )
476
477 def close(self, *, abort_ssl: bool = False) -> Awaitable[None]:
478 """Close all opened transports.
479
480 :param abort_ssl: If True, SSL connections will be aborted immediately
481 without performing the shutdown handshake. This provides
482 faster cleanup at the cost of less graceful disconnection.
483 """
484 if not (waiters := self._close(abort_ssl=abort_ssl)):
485 # If there are no connections to close, we can return a noop
486 # awaitable to avoid scheduling a task on the event loop.
487 return _DeprecationWaiter(noop())
488 coro = _wait_for_close(waiters)
489 if sys.version_info >= (3, 12):
490 # Optimization for Python 3.12, try to close connections
491 # immediately to avoid having to schedule the task on the event loop.
492 task = asyncio.Task(coro, loop=self._loop, eager_start=True)
493 else:
494 task = self._loop.create_task(coro)
495 return _DeprecationWaiter(task)
496
497 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]:
498 waiters: List[Awaitable[object]] = []
499
500 if self._closed:
501 return waiters
502
503 self._closed = True
504
505 try:
506 if self._loop.is_closed():
507 return waiters
508
509 # cancel cleanup task
510 if self._cleanup_handle:
511 self._cleanup_handle.cancel()
512
513 # cancel cleanup close task
514 if self._cleanup_closed_handle:
515 self._cleanup_closed_handle.cancel()
516
517 for data in self._conns.values():
518 for proto, _ in data:
519 if (
520 abort_ssl
521 and proto.transport
522 and proto.transport.get_extra_info("sslcontext") is not None
523 ):
524 proto.abort()
525 else:
526 proto.close()
527 if closed := proto.closed:
528 waiters.append(closed)
529
530 for proto in self._acquired:
531 if (
532 abort_ssl
533 and proto.transport
534 and proto.transport.get_extra_info("sslcontext") is not None
535 ):
536 proto.abort()
537 else:
538 proto.close()
539 if closed := proto.closed:
540 waiters.append(closed)
541
542 for transport in self._cleanup_closed_transports:
543 if transport is not None:
544 transport.abort()
545
546 return waiters
547
548 finally:
549 self._conns.clear()
550 self._acquired.clear()
551 for keyed_waiters in self._waiters.values():
552 for keyed_waiter in keyed_waiters:
553 keyed_waiter.cancel()
554 self._waiters.clear()
555 self._cleanup_handle = None
556 self._cleanup_closed_transports.clear()
557 self._cleanup_closed_handle = None
558
559 @property
560 def closed(self) -> bool:
561 """Is connector closed.
562
563 A readonly property.
564 """
565 return self._closed
566
567 def _available_connections(self, key: "ConnectionKey") -> int:
568 """
569 Return number of available connections.
570
571 The limit, limit_per_host and the connection key are taken into account.
572
573 If it returns less than 1 means that there are no connections
574 available.
575 """
576 # check total available connections
577 # If there are no limits, this will always return 1
578 total_remain = 1
579
580 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0:
581 return total_remain
582
583 # check limit per host
584 if host_remain := self._limit_per_host:
585 if acquired := self._acquired_per_host.get(key):
586 host_remain -= len(acquired)
587 if total_remain > host_remain:
588 return host_remain
589
590 return total_remain
591
592 async def connect(
593 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
594 ) -> Connection:
595 """Get from pool or create new connection."""
596 key = req.connection_key
597 if (conn := await self._get(key, traces)) is not None:
598 # If we do not have to wait and we can get a connection from the pool
599 # we can avoid the timeout ceil logic and directly return the connection
600 return conn
601
602 async with ceil_timeout(timeout.connect, timeout.ceil_threshold):
603 if self._available_connections(key) <= 0:
604 await self._wait_for_available_connection(key, traces)
605 if (conn := await self._get(key, traces)) is not None:
606 return conn
607
608 placeholder = cast(
609 ResponseHandler, _TransportPlaceholder(self._placeholder_future)
610 )
611 self._acquired.add(placeholder)
612 if self._limit_per_host:
613 self._acquired_per_host[key].add(placeholder)
614
615 try:
616 # Traces are done inside the try block to ensure that the
617 # that the placeholder is still cleaned up if an exception
618 # is raised.
619 if traces:
620 for trace in traces:
621 await trace.send_connection_create_start()
622 proto = await self._create_connection(req, traces, timeout)
623 if traces:
624 for trace in traces:
625 await trace.send_connection_create_end()
626 except BaseException:
627 self._release_acquired(key, placeholder)
628 raise
629 else:
630 if self._closed:
631 proto.close()
632 raise ClientConnectionError("Connector is closed.")
633
634 # The connection was successfully created, drop the placeholder
635 # and add the real connection to the acquired set. There should
636 # be no awaits after the proto is added to the acquired set
637 # to ensure that the connection is not left in the acquired set
638 # on cancellation.
639 self._acquired.remove(placeholder)
640 self._acquired.add(proto)
641 if self._limit_per_host:
642 acquired_per_host = self._acquired_per_host[key]
643 acquired_per_host.remove(placeholder)
644 acquired_per_host.add(proto)
645 return Connection(self, key, proto, self._loop)
646
647 async def _wait_for_available_connection(
648 self, key: "ConnectionKey", traces: List["Trace"]
649 ) -> None:
650 """Wait for an available connection slot."""
651 # We loop here because there is a race between
652 # the connection limit check and the connection
653 # being acquired. If the connection is acquired
654 # between the check and the await statement, we
655 # need to loop again to check if the connection
656 # slot is still available.
657 attempts = 0
658 while True:
659 fut: asyncio.Future[None] = self._loop.create_future()
660 keyed_waiters = self._waiters[key]
661 keyed_waiters[fut] = None
662 if attempts:
663 # If we have waited before, we need to move the waiter
664 # to the front of the queue as otherwise we might get
665 # starved and hit the timeout.
666 keyed_waiters.move_to_end(fut, last=False)
667
668 try:
669 # Traces happen in the try block to ensure that the
670 # the waiter is still cleaned up if an exception is raised.
671 if traces:
672 for trace in traces:
673 await trace.send_connection_queued_start()
674 await fut
675 if traces:
676 for trace in traces:
677 await trace.send_connection_queued_end()
678 finally:
679 # pop the waiter from the queue if its still
680 # there and not already removed by _release_waiter
681 keyed_waiters.pop(fut, None)
682 if not self._waiters.get(key, True):
683 del self._waiters[key]
684
685 if self._available_connections(key) > 0:
686 break
687 attempts += 1
688
689 async def _get(
690 self, key: "ConnectionKey", traces: List["Trace"]
691 ) -> Optional[Connection]:
692 """Get next reusable connection for the key or None.
693
694 The connection will be marked as acquired.
695 """
696 if (conns := self._conns.get(key)) is None:
697 return None
698
699 t1 = monotonic()
700 while conns:
701 proto, t0 = conns.popleft()
702 # We will we reuse the connection if its connected and
703 # the keepalive timeout has not been exceeded
704 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout:
705 if not conns:
706 # The very last connection was reclaimed: drop the key
707 del self._conns[key]
708 self._acquired.add(proto)
709 if self._limit_per_host:
710 self._acquired_per_host[key].add(proto)
711 if traces:
712 for trace in traces:
713 try:
714 await trace.send_connection_reuseconn()
715 except BaseException:
716 self._release_acquired(key, proto)
717 raise
718 return Connection(self, key, proto, self._loop)
719
720 # Connection cannot be reused, close it
721 transport = proto.transport
722 proto.close()
723 # only for SSL transports
724 if not self._cleanup_closed_disabled and key.is_ssl:
725 self._cleanup_closed_transports.append(transport)
726
727 # No more connections: drop the key
728 del self._conns[key]
729 return None
730
731 def _release_waiter(self) -> None:
732 """
733 Iterates over all waiters until one to be released is found.
734
735 The one to be released is not finished and
736 belongs to a host that has available connections.
737 """
738 if not self._waiters:
739 return
740
741 # Having the dict keys ordered this avoids to iterate
742 # at the same order at each call.
743 queues = list(self._waiters)
744 random.shuffle(queues)
745
746 for key in queues:
747 if self._available_connections(key) < 1:
748 continue
749
750 waiters = self._waiters[key]
751 while waiters:
752 waiter, _ = waiters.popitem(last=False)
753 if not waiter.done():
754 waiter.set_result(None)
755 return
756
757 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
758 """Release acquired connection."""
759 if self._closed:
760 # acquired connection is already released on connector closing
761 return
762
763 self._acquired.discard(proto)
764 if self._limit_per_host and (conns := self._acquired_per_host.get(key)):
765 conns.discard(proto)
766 if not conns:
767 del self._acquired_per_host[key]
768 self._release_waiter()
769
770 def _release(
771 self,
772 key: "ConnectionKey",
773 protocol: ResponseHandler,
774 *,
775 should_close: bool = False,
776 ) -> None:
777 if self._closed:
778 # acquired connection is already released on connector closing
779 return
780
781 self._release_acquired(key, protocol)
782
783 if self._force_close or should_close or protocol.should_close:
784 transport = protocol.transport
785 protocol.close()
786
787 if key.is_ssl and not self._cleanup_closed_disabled:
788 self._cleanup_closed_transports.append(transport)
789 return
790
791 self._conns[key].append((protocol, monotonic()))
792
793 if self._cleanup_handle is None:
794 self._cleanup_handle = helpers.weakref_handle(
795 self,
796 "_cleanup",
797 self._keepalive_timeout,
798 self._loop,
799 timeout_ceil_threshold=self._timeout_ceil_threshold,
800 )
801
802 async def _create_connection(
803 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
804 ) -> ResponseHandler:
805 raise NotImplementedError()
806
807
808class _DNSCacheTable:
809 def __init__(self, ttl: Optional[float] = None) -> None:
810 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[ResolveResult], int]] = {}
811 self._timestamps: Dict[Tuple[str, int], float] = {}
812 self._ttl = ttl
813
814 def __contains__(self, host: object) -> bool:
815 return host in self._addrs_rr
816
817 def add(self, key: Tuple[str, int], addrs: List[ResolveResult]) -> None:
818 self._addrs_rr[key] = (cycle(addrs), len(addrs))
819
820 if self._ttl is not None:
821 self._timestamps[key] = monotonic()
822
823 def remove(self, key: Tuple[str, int]) -> None:
824 self._addrs_rr.pop(key, None)
825
826 if self._ttl is not None:
827 self._timestamps.pop(key, None)
828
829 def clear(self) -> None:
830 self._addrs_rr.clear()
831 self._timestamps.clear()
832
833 def next_addrs(self, key: Tuple[str, int]) -> List[ResolveResult]:
834 loop, length = self._addrs_rr[key]
835 addrs = list(islice(loop, length))
836 # Consume one more element to shift internal state of `cycle`
837 next(loop)
838 return addrs
839
840 def expired(self, key: Tuple[str, int]) -> bool:
841 if self._ttl is None:
842 return False
843
844 return self._timestamps[key] + self._ttl < monotonic()
845
846
847def _make_ssl_context(verified: bool) -> SSLContext:
848 """Create SSL context.
849
850 This method is not async-friendly and should be called from a thread
851 because it will load certificates from disk and do other blocking I/O.
852 """
853 if ssl is None:
854 # No ssl support
855 return None
856 if verified:
857 sslcontext = ssl.create_default_context()
858 else:
859 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
860 sslcontext.options |= ssl.OP_NO_SSLv2
861 sslcontext.options |= ssl.OP_NO_SSLv3
862 sslcontext.check_hostname = False
863 sslcontext.verify_mode = ssl.CERT_NONE
864 sslcontext.options |= ssl.OP_NO_COMPRESSION
865 sslcontext.set_default_verify_paths()
866 sslcontext.set_alpn_protocols(("http/1.1",))
867 return sslcontext
868
869
870# The default SSLContext objects are created at import time
871# since they do blocking I/O to load certificates from disk,
872# and imports should always be done before the event loop starts
873# or in a thread.
874_SSL_CONTEXT_VERIFIED = _make_ssl_context(True)
875_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False)
876
877
878class TCPConnector(BaseConnector):
879 """TCP connector.
880
881 verify_ssl - Set to True to check ssl certifications.
882 fingerprint - Pass the binary sha256
883 digest of the expected certificate in DER format to verify
884 that the certificate the server presents matches. See also
885 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning
886 resolver - Enable DNS lookups and use this
887 resolver
888 use_dns_cache - Use memory cache for DNS lookups.
889 ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
890 family - socket address family
891 local_addr - local tuple of (host, port) to bind socket to
892
893 keepalive_timeout - (optional) Keep-alive timeout.
894 force_close - Set to True to force close and do reconnect
895 after each request (and between redirects).
896 limit - The total number of simultaneous connections.
897 limit_per_host - Number of simultaneous connections to one host.
898 enable_cleanup_closed - Enables clean-up closed ssl transports.
899 Disabled by default.
900 happy_eyeballs_delay - This is the “Connection Attempt Delay”
901 as defined in RFC 8305. To disable
902 the happy eyeballs algorithm, set to None.
903 interleave - “First Address Family Count” as defined in RFC 8305
904 loop - Optional event loop.
905 socket_factory - A SocketFactoryType function that, if supplied,
906 will be used to create sockets given an
907 AddrInfoType.
908 ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0.
909 Grace period for SSL shutdown handshake on TLS
910 connections. Default is 0 seconds (immediate abort).
911 This parameter allowed for a clean SSL shutdown by
912 notifying the remote peer of connection closure,
913 while avoiding excessive delays during connector cleanup.
914 Note: Only takes effect on Python 3.11+.
915 """
916
917 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"})
918
919 def __init__(
920 self,
921 *,
922 verify_ssl: bool = True,
923 fingerprint: Optional[bytes] = None,
924 use_dns_cache: bool = True,
925 ttl_dns_cache: Optional[int] = 10,
926 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC,
927 ssl_context: Optional[SSLContext] = None,
928 ssl: Union[bool, Fingerprint, SSLContext] = True,
929 local_addr: Optional[Tuple[str, int]] = None,
930 resolver: Optional[AbstractResolver] = None,
931 keepalive_timeout: Union[None, float, object] = sentinel,
932 force_close: bool = False,
933 limit: int = 100,
934 limit_per_host: int = 0,
935 enable_cleanup_closed: bool = False,
936 loop: Optional[asyncio.AbstractEventLoop] = None,
937 timeout_ceil_threshold: float = 5,
938 happy_eyeballs_delay: Optional[float] = 0.25,
939 interleave: Optional[int] = None,
940 socket_factory: Optional[SocketFactoryType] = None,
941 ssl_shutdown_timeout: Union[_SENTINEL, None, float] = sentinel,
942 ):
943 super().__init__(
944 keepalive_timeout=keepalive_timeout,
945 force_close=force_close,
946 limit=limit,
947 limit_per_host=limit_per_host,
948 enable_cleanup_closed=enable_cleanup_closed,
949 loop=loop,
950 timeout_ceil_threshold=timeout_ceil_threshold,
951 )
952
953 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
954
955 self._resolver: AbstractResolver
956 if resolver is None:
957 self._resolver = DefaultResolver(loop=self._loop)
958 self._resolver_owner = True
959 else:
960 self._resolver = resolver
961 self._resolver_owner = False
962
963 self._use_dns_cache = use_dns_cache
964 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)
965 self._throttle_dns_futures: Dict[
966 Tuple[str, int], Set["asyncio.Future[None]"]
967 ] = {}
968 self._family = family
969 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr)
970 self._happy_eyeballs_delay = happy_eyeballs_delay
971 self._interleave = interleave
972 self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set()
973 self._socket_factory = socket_factory
974 self._ssl_shutdown_timeout: Optional[float]
975 # Handle ssl_shutdown_timeout with warning for Python < 3.11
976 if ssl_shutdown_timeout is sentinel:
977 self._ssl_shutdown_timeout = 0
978 else:
979 # Deprecation warning for ssl_shutdown_timeout parameter
980 warnings.warn(
981 "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0",
982 DeprecationWarning,
983 stacklevel=2,
984 )
985 if (
986 sys.version_info < (3, 11)
987 and ssl_shutdown_timeout is not None
988 and ssl_shutdown_timeout != 0
989 ):
990 warnings.warn(
991 f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; "
992 "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.",
993 RuntimeWarning,
994 stacklevel=2,
995 )
996 self._ssl_shutdown_timeout = ssl_shutdown_timeout
997
998 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]:
999 """Close all ongoing DNS calls."""
1000 for fut in chain.from_iterable(self._throttle_dns_futures.values()):
1001 fut.cancel()
1002
1003 waiters = super()._close(abort_ssl=abort_ssl)
1004
1005 for t in self._resolve_host_tasks:
1006 t.cancel()
1007 waiters.append(t)
1008
1009 return waiters
1010
1011 async def close(self, *, abort_ssl: bool = False) -> None:
1012 """
1013 Close all opened transports.
1014
1015 :param abort_ssl: If True, SSL connections will be aborted immediately
1016 without performing the shutdown handshake. If False (default),
1017 the behavior is determined by ssl_shutdown_timeout:
1018 - If ssl_shutdown_timeout=0: connections are aborted
1019 - If ssl_shutdown_timeout>0: graceful shutdown is performed
1020 """
1021 if self._resolver_owner:
1022 await self._resolver.close()
1023 # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default
1024 await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0)
1025
1026 @property
1027 def family(self) -> int:
1028 """Socket family like AF_INET."""
1029 return self._family
1030
1031 @property
1032 def use_dns_cache(self) -> bool:
1033 """True if local DNS caching is enabled."""
1034 return self._use_dns_cache
1035
1036 def clear_dns_cache(
1037 self, host: Optional[str] = None, port: Optional[int] = None
1038 ) -> None:
1039 """Remove specified host/port or clear all dns local cache."""
1040 if host is not None and port is not None:
1041 self._cached_hosts.remove((host, port))
1042 elif host is not None or port is not None:
1043 raise ValueError("either both host and port or none of them are allowed")
1044 else:
1045 self._cached_hosts.clear()
1046
1047 async def _resolve_host(
1048 self, host: str, port: int, traces: Optional[Sequence["Trace"]] = None
1049 ) -> List[ResolveResult]:
1050 """Resolve host and return list of addresses."""
1051 if is_ip_address(host):
1052 return [
1053 {
1054 "hostname": host,
1055 "host": host,
1056 "port": port,
1057 "family": self._family,
1058 "proto": 0,
1059 "flags": 0,
1060 }
1061 ]
1062
1063 if not self._use_dns_cache:
1064
1065 if traces:
1066 for trace in traces:
1067 await trace.send_dns_resolvehost_start(host)
1068
1069 res = await self._resolver.resolve(host, port, family=self._family)
1070
1071 if traces:
1072 for trace in traces:
1073 await trace.send_dns_resolvehost_end(host)
1074
1075 return res
1076
1077 key = (host, port)
1078 if key in self._cached_hosts and not self._cached_hosts.expired(key):
1079 # get result early, before any await (#4014)
1080 result = self._cached_hosts.next_addrs(key)
1081
1082 if traces:
1083 for trace in traces:
1084 await trace.send_dns_cache_hit(host)
1085 return result
1086
1087 futures: Set["asyncio.Future[None]"]
1088 #
1089 # If multiple connectors are resolving the same host, we wait
1090 # for the first one to resolve and then use the result for all of them.
1091 # We use a throttle to ensure that we only resolve the host once
1092 # and then use the result for all the waiters.
1093 #
1094 if key in self._throttle_dns_futures:
1095 # get futures early, before any await (#4014)
1096 futures = self._throttle_dns_futures[key]
1097 future: asyncio.Future[None] = self._loop.create_future()
1098 futures.add(future)
1099 if traces:
1100 for trace in traces:
1101 await trace.send_dns_cache_hit(host)
1102 try:
1103 await future
1104 finally:
1105 futures.discard(future)
1106 return self._cached_hosts.next_addrs(key)
1107
1108 # update dict early, before any await (#4014)
1109 self._throttle_dns_futures[key] = futures = set()
1110 # In this case we need to create a task to ensure that we can shield
1111 # the task from cancellation as cancelling this lookup should not cancel
1112 # the underlying lookup or else the cancel event will get broadcast to
1113 # all the waiters across all connections.
1114 #
1115 coro = self._resolve_host_with_throttle(key, host, port, futures, traces)
1116 loop = asyncio.get_running_loop()
1117 if sys.version_info >= (3, 12):
1118 # Optimization for Python 3.12, try to send immediately
1119 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True)
1120 else:
1121 resolved_host_task = loop.create_task(coro)
1122
1123 if not resolved_host_task.done():
1124 self._resolve_host_tasks.add(resolved_host_task)
1125 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard)
1126
1127 try:
1128 return await asyncio.shield(resolved_host_task)
1129 except asyncio.CancelledError:
1130
1131 def drop_exception(fut: "asyncio.Future[List[ResolveResult]]") -> None:
1132 with suppress(Exception, asyncio.CancelledError):
1133 fut.result()
1134
1135 resolved_host_task.add_done_callback(drop_exception)
1136 raise
1137
1138 async def _resolve_host_with_throttle(
1139 self,
1140 key: Tuple[str, int],
1141 host: str,
1142 port: int,
1143 futures: Set["asyncio.Future[None]"],
1144 traces: Optional[Sequence["Trace"]],
1145 ) -> List[ResolveResult]:
1146 """Resolve host and set result for all waiters.
1147
1148 This method must be run in a task and shielded from cancellation
1149 to avoid cancelling the underlying lookup.
1150 """
1151 try:
1152 if traces:
1153 for trace in traces:
1154 await trace.send_dns_cache_miss(host)
1155
1156 for trace in traces:
1157 await trace.send_dns_resolvehost_start(host)
1158
1159 addrs = await self._resolver.resolve(host, port, family=self._family)
1160 if traces:
1161 for trace in traces:
1162 await trace.send_dns_resolvehost_end(host)
1163
1164 self._cached_hosts.add(key, addrs)
1165 for fut in futures:
1166 set_result(fut, None)
1167 except BaseException as e:
1168 # any DNS exception is set for the waiters to raise the same exception.
1169 # This coro is always run in task that is shielded from cancellation so
1170 # we should never be propagating cancellation here.
1171 for fut in futures:
1172 set_exception(fut, e)
1173 raise
1174 finally:
1175 self._throttle_dns_futures.pop(key)
1176
1177 return self._cached_hosts.next_addrs(key)
1178
1179 async def _create_connection(
1180 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1181 ) -> ResponseHandler:
1182 """Create connection.
1183
1184 Has same keyword arguments as BaseEventLoop.create_connection.
1185 """
1186 if req.proxy:
1187 _, proto = await self._create_proxy_connection(req, traces, timeout)
1188 else:
1189 _, proto = await self._create_direct_connection(req, traces, timeout)
1190
1191 return proto
1192
1193 def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]:
1194 """Logic to get the correct SSL context
1195
1196 0. if req.ssl is false, return None
1197
1198 1. if ssl_context is specified in req, use it
1199 2. if _ssl_context is specified in self, use it
1200 3. otherwise:
1201 1. if verify_ssl is not specified in req, use self.ssl_context
1202 (will generate a default context according to self.verify_ssl)
1203 2. if verify_ssl is True in req, generate a default SSL context
1204 3. if verify_ssl is False in req, generate a SSL context that
1205 won't verify
1206 """
1207 if not req.is_ssl():
1208 return None
1209
1210 if ssl is None: # pragma: no cover
1211 raise RuntimeError("SSL is not supported.")
1212 sslcontext = req.ssl
1213 if isinstance(sslcontext, ssl.SSLContext):
1214 return sslcontext
1215 if sslcontext is not True:
1216 # not verified or fingerprinted
1217 return _SSL_CONTEXT_UNVERIFIED
1218 sslcontext = self._ssl
1219 if isinstance(sslcontext, ssl.SSLContext):
1220 return sslcontext
1221 if sslcontext is not True:
1222 # not verified or fingerprinted
1223 return _SSL_CONTEXT_UNVERIFIED
1224 return _SSL_CONTEXT_VERIFIED
1225
1226 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]:
1227 ret = req.ssl
1228 if isinstance(ret, Fingerprint):
1229 return ret
1230 ret = self._ssl
1231 if isinstance(ret, Fingerprint):
1232 return ret
1233 return None
1234
1235 async def _wrap_create_connection(
1236 self,
1237 *args: Any,
1238 addr_infos: List[AddrInfoType],
1239 req: ClientRequest,
1240 timeout: "ClientTimeout",
1241 client_error: Type[Exception] = ClientConnectorError,
1242 **kwargs: Any,
1243 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1244 try:
1245 async with ceil_timeout(
1246 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1247 ):
1248 sock = await aiohappyeyeballs.start_connection(
1249 addr_infos=addr_infos,
1250 local_addr_infos=self._local_addr_infos,
1251 happy_eyeballs_delay=self._happy_eyeballs_delay,
1252 interleave=self._interleave,
1253 loop=self._loop,
1254 socket_factory=self._socket_factory,
1255 )
1256 # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used
1257 if (
1258 kwargs.get("ssl")
1259 and self._ssl_shutdown_timeout
1260 and sys.version_info >= (3, 11)
1261 ):
1262 kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout
1263 return await self._loop.create_connection(*args, **kwargs, sock=sock)
1264 except cert_errors as exc:
1265 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1266 except ssl_errors as exc:
1267 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1268 except OSError as exc:
1269 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1270 raise
1271 raise client_error(req.connection_key, exc) from exc
1272
1273 async def _wrap_existing_connection(
1274 self,
1275 *args: Any,
1276 req: ClientRequest,
1277 timeout: "ClientTimeout",
1278 client_error: Type[Exception] = ClientConnectorError,
1279 **kwargs: Any,
1280 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1281 try:
1282 async with ceil_timeout(
1283 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1284 ):
1285 return await self._loop.create_connection(*args, **kwargs)
1286 except cert_errors as exc:
1287 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1288 except ssl_errors as exc:
1289 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1290 except OSError as exc:
1291 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1292 raise
1293 raise client_error(req.connection_key, exc) from exc
1294
1295 def _fail_on_no_start_tls(self, req: "ClientRequest") -> None:
1296 """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``.
1297
1298 It is necessary for TLS-in-TLS so that it is possible to
1299 send HTTPS queries through HTTPS proxies.
1300
1301 This doesn't affect regular HTTP requests, though.
1302 """
1303 if not req.is_ssl():
1304 return
1305
1306 proxy_url = req.proxy
1307 assert proxy_url is not None
1308 if proxy_url.scheme != "https":
1309 return
1310
1311 self._check_loop_for_start_tls()
1312
1313 def _check_loop_for_start_tls(self) -> None:
1314 try:
1315 self._loop.start_tls
1316 except AttributeError as attr_exc:
1317 raise RuntimeError(
1318 "An HTTPS request is being sent through an HTTPS proxy. "
1319 "This needs support for TLS in TLS but it is not implemented "
1320 "in your runtime for the stdlib asyncio.\n\n"
1321 "Please upgrade to Python 3.11 or higher. For more details, "
1322 "please see:\n"
1323 "* https://bugs.python.org/issue37179\n"
1324 "* https://github.com/python/cpython/pull/28073\n"
1325 "* https://docs.aiohttp.org/en/stable/"
1326 "client_advanced.html#proxy-support\n"
1327 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1328 ) from attr_exc
1329
1330 def _loop_supports_start_tls(self) -> bool:
1331 try:
1332 self._check_loop_for_start_tls()
1333 except RuntimeError:
1334 return False
1335 else:
1336 return True
1337
1338 def _warn_about_tls_in_tls(
1339 self,
1340 underlying_transport: asyncio.Transport,
1341 req: ClientRequest,
1342 ) -> None:
1343 """Issue a warning if the requested URL has HTTPS scheme."""
1344 if req.request_info.url.scheme != "https":
1345 return
1346
1347 # Check if uvloop is being used, which supports TLS in TLS,
1348 # otherwise assume that asyncio's native transport is being used.
1349 if type(underlying_transport).__module__.startswith("uvloop"):
1350 return
1351
1352 # Support in asyncio was added in Python 3.11 (bpo-44011)
1353 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr(
1354 underlying_transport,
1355 "_start_tls_compatible",
1356 False,
1357 )
1358
1359 if asyncio_supports_tls_in_tls:
1360 return
1361
1362 warnings.warn(
1363 "An HTTPS request is being sent through an HTTPS proxy. "
1364 "This support for TLS in TLS is known to be disabled "
1365 "in the stdlib asyncio (Python <3.11). This is why you'll probably see "
1366 "an error in the log below.\n\n"
1367 "It is possible to enable it via monkeypatching. "
1368 "For more details, see:\n"
1369 "* https://bugs.python.org/issue37179\n"
1370 "* https://github.com/python/cpython/pull/28073\n\n"
1371 "You can temporarily patch this as follows:\n"
1372 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
1373 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1374 RuntimeWarning,
1375 source=self,
1376 # Why `4`? At least 3 of the calls in the stack originate
1377 # from the methods in this class.
1378 stacklevel=3,
1379 )
1380
1381 async def _start_tls_connection(
1382 self,
1383 underlying_transport: asyncio.Transport,
1384 req: ClientRequest,
1385 timeout: "ClientTimeout",
1386 client_error: Type[Exception] = ClientConnectorError,
1387 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1388 """Wrap the raw TCP transport with TLS."""
1389 tls_proto = self._factory() # Create a brand new proto for TLS
1390 sslcontext = self._get_ssl_context(req)
1391 if TYPE_CHECKING:
1392 # _start_tls_connection is unreachable in the current code path
1393 # if sslcontext is None.
1394 assert sslcontext is not None
1395
1396 try:
1397 async with ceil_timeout(
1398 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1399 ):
1400 try:
1401 # ssl_shutdown_timeout is only available in Python 3.11+
1402 if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout:
1403 tls_transport = await self._loop.start_tls(
1404 underlying_transport,
1405 tls_proto,
1406 sslcontext,
1407 server_hostname=req.server_hostname or req.host,
1408 ssl_handshake_timeout=timeout.total,
1409 ssl_shutdown_timeout=self._ssl_shutdown_timeout,
1410 )
1411 else:
1412 tls_transport = await self._loop.start_tls(
1413 underlying_transport,
1414 tls_proto,
1415 sslcontext,
1416 server_hostname=req.server_hostname or req.host,
1417 ssl_handshake_timeout=timeout.total,
1418 )
1419 except BaseException:
1420 # We need to close the underlying transport since
1421 # `start_tls()` probably failed before it had a
1422 # chance to do this:
1423 if self._ssl_shutdown_timeout == 0:
1424 underlying_transport.abort()
1425 else:
1426 underlying_transport.close()
1427 raise
1428 if isinstance(tls_transport, asyncio.Transport):
1429 fingerprint = self._get_fingerprint(req)
1430 if fingerprint:
1431 try:
1432 fingerprint.check(tls_transport)
1433 except ServerFingerprintMismatch:
1434 tls_transport.close()
1435 if not self._cleanup_closed_disabled:
1436 self._cleanup_closed_transports.append(tls_transport)
1437 raise
1438 except cert_errors as exc:
1439 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1440 except ssl_errors as exc:
1441 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1442 except OSError as exc:
1443 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1444 raise
1445 raise client_error(req.connection_key, exc) from exc
1446 except TypeError as type_err:
1447 # Example cause looks like this:
1448 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
1449 # object at 0x7f760615e460> is not supported by start_tls()
1450
1451 raise ClientConnectionError(
1452 "Cannot initialize a TLS-in-TLS connection to host "
1453 f"{req.host!s}:{req.port:d} through an underlying connection "
1454 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
1455 f"[{type_err!s}]"
1456 ) from type_err
1457 else:
1458 if tls_transport is None:
1459 msg = "Failed to start TLS (possibly caused by closing transport)"
1460 raise client_error(req.connection_key, OSError(msg))
1461 tls_proto.connection_made(
1462 tls_transport
1463 ) # Kick the state machine of the new TLS protocol
1464
1465 return tls_transport, tls_proto
1466
1467 def _convert_hosts_to_addr_infos(
1468 self, hosts: List[ResolveResult]
1469 ) -> List[AddrInfoType]:
1470 """Converts the list of hosts to a list of addr_infos.
1471
1472 The list of hosts is the result of a DNS lookup. The list of
1473 addr_infos is the result of a call to `socket.getaddrinfo()`.
1474 """
1475 addr_infos: List[AddrInfoType] = []
1476 for hinfo in hosts:
1477 host = hinfo["host"]
1478 is_ipv6 = ":" in host
1479 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET
1480 if self._family and self._family != family:
1481 continue
1482 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"])
1483 addr_infos.append(
1484 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)
1485 )
1486 return addr_infos
1487
1488 async def _create_direct_connection(
1489 self,
1490 req: ClientRequest,
1491 traces: List["Trace"],
1492 timeout: "ClientTimeout",
1493 *,
1494 client_error: Type[Exception] = ClientConnectorError,
1495 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1496 sslcontext = self._get_ssl_context(req)
1497 fingerprint = self._get_fingerprint(req)
1498
1499 host = req.url.raw_host
1500 assert host is not None
1501 # Replace multiple trailing dots with a single one.
1502 # A trailing dot is only present for fully-qualified domain names.
1503 # See https://github.com/aio-libs/aiohttp/pull/7364.
1504 if host.endswith(".."):
1505 host = host.rstrip(".") + "."
1506 port = req.port
1507 assert port is not None
1508 try:
1509 # Cancelling this lookup should not cancel the underlying lookup
1510 # or else the cancel event will get broadcast to all the waiters
1511 # across all connections.
1512 hosts = await self._resolve_host(host, port, traces=traces)
1513 except OSError as exc:
1514 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1515 raise
1516 # in case of proxy it is not ClientProxyConnectionError
1517 # it is problem of resolving proxy ip itself
1518 raise ClientConnectorDNSError(req.connection_key, exc) from exc
1519
1520 last_exc: Optional[Exception] = None
1521 addr_infos = self._convert_hosts_to_addr_infos(hosts)
1522 while addr_infos:
1523 # Strip trailing dots, certificates contain FQDN without dots.
1524 # See https://github.com/aio-libs/aiohttp/issues/3636
1525 server_hostname = (
1526 (req.server_hostname or host).rstrip(".") if sslcontext else None
1527 )
1528
1529 try:
1530 transp, proto = await self._wrap_create_connection(
1531 self._factory,
1532 timeout=timeout,
1533 ssl=sslcontext,
1534 addr_infos=addr_infos,
1535 server_hostname=server_hostname,
1536 req=req,
1537 client_error=client_error,
1538 )
1539 except (ClientConnectorError, asyncio.TimeoutError) as exc:
1540 last_exc = exc
1541 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave)
1542 continue
1543
1544 if req.is_ssl() and fingerprint:
1545 try:
1546 fingerprint.check(transp)
1547 except ServerFingerprintMismatch as exc:
1548 transp.close()
1549 if not self._cleanup_closed_disabled:
1550 self._cleanup_closed_transports.append(transp)
1551 last_exc = exc
1552 # Remove the bad peer from the list of addr_infos
1553 sock: socket.socket = transp.get_extra_info("socket")
1554 bad_peer = sock.getpeername()
1555 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer)
1556 continue
1557
1558 return transp, proto
1559 else:
1560 assert last_exc is not None
1561 raise last_exc
1562
1563 async def _create_proxy_connection(
1564 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1565 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1566 self._fail_on_no_start_tls(req)
1567 runtime_has_start_tls = self._loop_supports_start_tls()
1568
1569 headers: Dict[str, str] = {}
1570 if req.proxy_headers is not None:
1571 headers = req.proxy_headers # type: ignore[assignment]
1572 headers[hdrs.HOST] = req.headers[hdrs.HOST]
1573
1574 url = req.proxy
1575 assert url is not None
1576 proxy_req = ClientRequest(
1577 hdrs.METH_GET,
1578 url,
1579 headers=headers,
1580 auth=req.proxy_auth,
1581 loop=self._loop,
1582 ssl=req.ssl,
1583 )
1584
1585 # create connection to proxy server
1586 transport, proto = await self._create_direct_connection(
1587 proxy_req, [], timeout, client_error=ClientProxyConnectionError
1588 )
1589
1590 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
1591 if auth is not None:
1592 if not req.is_ssl():
1593 req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1594 else:
1595 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1596
1597 if req.is_ssl():
1598 if runtime_has_start_tls:
1599 self._warn_about_tls_in_tls(transport, req)
1600
1601 # For HTTPS requests over HTTP proxy
1602 # we must notify proxy to tunnel connection
1603 # so we send CONNECT command:
1604 # CONNECT www.python.org:443 HTTP/1.1
1605 # Host: www.python.org
1606 #
1607 # next we must do TLS handshake and so on
1608 # to do this we must wrap raw socket into secure one
1609 # asyncio handles this perfectly
1610 proxy_req.method = hdrs.METH_CONNECT
1611 proxy_req.url = req.url
1612 key = req.connection_key._replace(
1613 proxy=None, proxy_auth=None, proxy_headers_hash=None
1614 )
1615 conn = Connection(self, key, proto, self._loop)
1616 proxy_resp = await proxy_req.send(conn)
1617 try:
1618 protocol = conn._protocol
1619 assert protocol is not None
1620
1621 # read_until_eof=True will ensure the connection isn't closed
1622 # once the response is received and processed allowing
1623 # START_TLS to work on the connection below.
1624 protocol.set_response_params(
1625 read_until_eof=runtime_has_start_tls,
1626 timeout_ceil_threshold=self._timeout_ceil_threshold,
1627 )
1628 resp = await proxy_resp.start(conn)
1629 except BaseException:
1630 proxy_resp.close()
1631 conn.close()
1632 raise
1633 else:
1634 conn._protocol = None
1635 try:
1636 if resp.status != 200:
1637 message = resp.reason
1638 if message is None:
1639 message = HTTPStatus(resp.status).phrase
1640 raise ClientHttpProxyError(
1641 proxy_resp.request_info,
1642 resp.history,
1643 status=resp.status,
1644 message=message,
1645 headers=resp.headers,
1646 )
1647 if not runtime_has_start_tls:
1648 rawsock = transport.get_extra_info("socket", default=None)
1649 if rawsock is None:
1650 raise RuntimeError(
1651 "Transport does not expose socket instance"
1652 )
1653 # Duplicate the socket, so now we can close proxy transport
1654 rawsock = rawsock.dup()
1655 except BaseException:
1656 # It shouldn't be closed in `finally` because it's fed to
1657 # `loop.start_tls()` and the docs say not to touch it after
1658 # passing there.
1659 transport.close()
1660 raise
1661 finally:
1662 if not runtime_has_start_tls:
1663 transport.close()
1664
1665 if not runtime_has_start_tls:
1666 # HTTP proxy with support for upgrade to HTTPS
1667 sslcontext = self._get_ssl_context(req)
1668 return await self._wrap_existing_connection(
1669 self._factory,
1670 timeout=timeout,
1671 ssl=sslcontext,
1672 sock=rawsock,
1673 server_hostname=req.host,
1674 req=req,
1675 )
1676
1677 return await self._start_tls_connection(
1678 # Access the old transport for the last time before it's
1679 # closed and forgotten forever:
1680 transport,
1681 req=req,
1682 timeout=timeout,
1683 )
1684 finally:
1685 proxy_resp.close()
1686
1687 return transport, proto
1688
1689
1690class UnixConnector(BaseConnector):
1691 """Unix socket connector.
1692
1693 path - Unix socket path.
1694 keepalive_timeout - (optional) Keep-alive timeout.
1695 force_close - Set to True to force close and do reconnect
1696 after each request (and between redirects).
1697 limit - The total number of simultaneous connections.
1698 limit_per_host - Number of simultaneous connections to one host.
1699 loop - Optional event loop.
1700 """
1701
1702 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"})
1703
1704 def __init__(
1705 self,
1706 path: str,
1707 force_close: bool = False,
1708 keepalive_timeout: Union[object, float, None] = sentinel,
1709 limit: int = 100,
1710 limit_per_host: int = 0,
1711 loop: Optional[asyncio.AbstractEventLoop] = None,
1712 ) -> None:
1713 super().__init__(
1714 force_close=force_close,
1715 keepalive_timeout=keepalive_timeout,
1716 limit=limit,
1717 limit_per_host=limit_per_host,
1718 loop=loop,
1719 )
1720 self._path = path
1721
1722 @property
1723 def path(self) -> str:
1724 """Path to unix socket."""
1725 return self._path
1726
1727 async def _create_connection(
1728 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1729 ) -> ResponseHandler:
1730 try:
1731 async with ceil_timeout(
1732 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1733 ):
1734 _, proto = await self._loop.create_unix_connection(
1735 self._factory, self._path
1736 )
1737 except OSError as exc:
1738 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1739 raise
1740 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
1741
1742 return proto
1743
1744
1745class NamedPipeConnector(BaseConnector):
1746 """Named pipe connector.
1747
1748 Only supported by the proactor event loop.
1749 See also: https://docs.python.org/3/library/asyncio-eventloop.html
1750
1751 path - Windows named pipe path.
1752 keepalive_timeout - (optional) Keep-alive timeout.
1753 force_close - Set to True to force close and do reconnect
1754 after each request (and between redirects).
1755 limit - The total number of simultaneous connections.
1756 limit_per_host - Number of simultaneous connections to one host.
1757 loop - Optional event loop.
1758 """
1759
1760 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"})
1761
1762 def __init__(
1763 self,
1764 path: str,
1765 force_close: bool = False,
1766 keepalive_timeout: Union[object, float, None] = sentinel,
1767 limit: int = 100,
1768 limit_per_host: int = 0,
1769 loop: Optional[asyncio.AbstractEventLoop] = None,
1770 ) -> None:
1771 super().__init__(
1772 force_close=force_close,
1773 keepalive_timeout=keepalive_timeout,
1774 limit=limit,
1775 limit_per_host=limit_per_host,
1776 loop=loop,
1777 )
1778 if not isinstance(
1779 self._loop,
1780 asyncio.ProactorEventLoop, # type: ignore[attr-defined]
1781 ):
1782 raise RuntimeError(
1783 "Named Pipes only available in proactor loop under windows"
1784 )
1785 self._path = path
1786
1787 @property
1788 def path(self) -> str:
1789 """Path to the named pipe."""
1790 return self._path
1791
1792 async def _create_connection(
1793 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1794 ) -> ResponseHandler:
1795 try:
1796 async with ceil_timeout(
1797 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1798 ):
1799 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined]
1800 self._factory, self._path
1801 )
1802 # the drain is required so that the connection_made is called
1803 # and transport is set otherwise it is not set before the
1804 # `assert conn.transport is not None`
1805 # in client.py's _request method
1806 await asyncio.sleep(0)
1807 # other option is to manually set transport like
1808 # `proto.transport = trans`
1809 except OSError as exc:
1810 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1811 raise
1812 raise ClientConnectorError(req.connection_key, exc) from exc
1813
1814 return cast(ResponseHandler, proto)