1import asyncio
2import functools
3import random
4import socket
5import sys
6import traceback
7import warnings
8from collections import OrderedDict, defaultdict, deque
9from contextlib import suppress
10from http import HTTPStatus
11from itertools import chain, cycle, islice
12from time import monotonic
13from types import TracebackType
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Awaitable,
18 Callable,
19 DefaultDict,
20 Deque,
21 Dict,
22 Iterator,
23 List,
24 Literal,
25 Optional,
26 Sequence,
27 Set,
28 Tuple,
29 Type,
30 Union,
31 cast,
32)
33
34import aiohappyeyeballs
35from aiohappyeyeballs import AddrInfoType, SocketFactoryType
36
37from . import hdrs, helpers
38from .abc import AbstractResolver, ResolveResult
39from .client_exceptions import (
40 ClientConnectionError,
41 ClientConnectorCertificateError,
42 ClientConnectorDNSError,
43 ClientConnectorError,
44 ClientConnectorSSLError,
45 ClientHttpProxyError,
46 ClientProxyConnectionError,
47 ServerFingerprintMismatch,
48 UnixClientConnectorError,
49 cert_errors,
50 ssl_errors,
51)
52from .client_proto import ResponseHandler
53from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params
54from .helpers import (
55 _SENTINEL,
56 ceil_timeout,
57 is_ip_address,
58 noop,
59 sentinel,
60 set_exception,
61 set_result,
62)
63from .log import client_logger
64from .resolver import DefaultResolver
65
66if sys.version_info >= (3, 12):
67 from collections.abc import Buffer
68else:
69 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
70
71if TYPE_CHECKING:
72 import ssl
73
74 SSLContext = ssl.SSLContext
75else:
76 try:
77 import ssl
78
79 SSLContext = ssl.SSLContext
80 except ImportError: # pragma: no cover
81 ssl = None # type: ignore[assignment]
82 SSLContext = object # type: ignore[misc,assignment]
83
84EMPTY_SCHEMA_SET = frozenset({""})
85HTTP_SCHEMA_SET = frozenset({"http", "https"})
86WS_SCHEMA_SET = frozenset({"ws", "wss"})
87
88HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET
89HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET
90
91NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < (
92 3,
93 13,
94 1,
95) or sys.version_info < (3, 12, 7)
96# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960
97# which first appeared in Python 3.12.7 and 3.13.1
98
99
100__all__ = (
101 "BaseConnector",
102 "TCPConnector",
103 "UnixConnector",
104 "NamedPipeConnector",
105 "AddrInfoType",
106 "SocketFactoryType",
107)
108
109
110if TYPE_CHECKING:
111 from .client import ClientTimeout
112 from .client_reqrep import ConnectionKey
113 from .tracing import Trace
114
115
116class _DeprecationWaiter:
117 __slots__ = ("_awaitable", "_awaited")
118
119 def __init__(self, awaitable: Awaitable[Any]) -> None:
120 self._awaitable = awaitable
121 self._awaited = False
122
123 def __await__(self) -> Any:
124 self._awaited = True
125 return self._awaitable.__await__()
126
127 def __del__(self) -> None:
128 if not self._awaited:
129 warnings.warn(
130 "Connector.close() is a coroutine, "
131 "please use await connector.close()",
132 DeprecationWarning,
133 )
134
135
136async def _wait_for_close(waiters: List[Awaitable[object]]) -> None:
137 """Wait for all waiters to finish closing."""
138 results = await asyncio.gather(*waiters, return_exceptions=True)
139 for res in results:
140 if isinstance(res, Exception):
141 client_logger.debug("Error while closing connector: %r", res)
142
143
144class Connection:
145
146 _source_traceback = None
147
148 def __init__(
149 self,
150 connector: "BaseConnector",
151 key: "ConnectionKey",
152 protocol: ResponseHandler,
153 loop: asyncio.AbstractEventLoop,
154 ) -> None:
155 self._key = key
156 self._connector = connector
157 self._loop = loop
158 self._protocol: Optional[ResponseHandler] = protocol
159 self._callbacks: List[Callable[[], None]] = []
160
161 if loop.get_debug():
162 self._source_traceback = traceback.extract_stack(sys._getframe(1))
163
164 def __repr__(self) -> str:
165 return f"Connection<{self._key}>"
166
167 def __del__(self, _warnings: Any = warnings) -> None:
168 if self._protocol is not None:
169 kwargs = {"source": self}
170 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs)
171 if self._loop.is_closed():
172 return
173
174 self._connector._release(self._key, self._protocol, should_close=True)
175
176 context = {"client_connection": self, "message": "Unclosed connection"}
177 if self._source_traceback is not None:
178 context["source_traceback"] = self._source_traceback
179 self._loop.call_exception_handler(context)
180
181 def __bool__(self) -> Literal[True]:
182 """Force subclasses to not be falsy, to make checks simpler."""
183 return True
184
185 @property
186 def loop(self) -> asyncio.AbstractEventLoop:
187 warnings.warn(
188 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2
189 )
190 return self._loop
191
192 @property
193 def transport(self) -> Optional[asyncio.Transport]:
194 if self._protocol is None:
195 return None
196 return self._protocol.transport
197
198 @property
199 def protocol(self) -> Optional[ResponseHandler]:
200 return self._protocol
201
202 def add_callback(self, callback: Callable[[], None]) -> None:
203 if callback is not None:
204 self._callbacks.append(callback)
205
206 def _notify_release(self) -> None:
207 callbacks, self._callbacks = self._callbacks[:], []
208
209 for cb in callbacks:
210 with suppress(Exception):
211 cb()
212
213 def close(self) -> None:
214 self._notify_release()
215
216 if self._protocol is not None:
217 self._connector._release(self._key, self._protocol, should_close=True)
218 self._protocol = None
219
220 def release(self) -> None:
221 self._notify_release()
222
223 if self._protocol is not None:
224 self._connector._release(self._key, self._protocol)
225 self._protocol = None
226
227 @property
228 def closed(self) -> bool:
229 return self._protocol is None or not self._protocol.is_connected()
230
231
232class _ConnectTunnelConnection(Connection):
233 """Special connection wrapper for CONNECT tunnels that must never be pooled.
234
235 This connection wraps the proxy connection that will be upgraded with TLS.
236 It must never be released to the pool because:
237 1. Its 'closed' future will never complete, causing session.close() to hang
238 2. It represents an intermediate state, not a reusable connection
239 3. The real connection (with TLS) will be created separately
240 """
241
242 def release(self) -> None:
243 """Do nothing - don't pool or close the connection.
244
245 These connections are an intermediate state during the CONNECT tunnel
246 setup and will be cleaned up naturally after the TLS upgrade. If they
247 were to be pooled, they would never be properly closed, causing
248 session.close() to wait forever for their 'closed' future.
249 """
250
251
252class _TransportPlaceholder:
253 """placeholder for BaseConnector.connect function"""
254
255 __slots__ = ("closed", "transport")
256
257 def __init__(self, closed_future: asyncio.Future[Optional[Exception]]) -> None:
258 """Initialize a placeholder for a transport."""
259 self.closed = closed_future
260 self.transport = None
261
262 def close(self) -> None:
263 """Close the placeholder."""
264
265 def abort(self) -> None:
266 """Abort the placeholder (does nothing)."""
267
268
269class BaseConnector:
270 """Base connector class.
271
272 keepalive_timeout - (optional) Keep-alive timeout.
273 force_close - Set to True to force close and do reconnect
274 after each request (and between redirects).
275 limit - The total number of simultaneous connections.
276 limit_per_host - Number of simultaneous connections to one host.
277 enable_cleanup_closed - Enables clean-up closed ssl transports.
278 Disabled by default.
279 timeout_ceil_threshold - Trigger ceiling of timeout values when
280 it's above timeout_ceil_threshold.
281 loop - Optional event loop.
282 """
283
284 _closed = True # prevent AttributeError in __del__ if ctor was failed
285 _source_traceback = None
286
287 # abort transport after 2 seconds (cleanup broken connections)
288 _cleanup_closed_period = 2.0
289
290 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET
291
292 def __init__(
293 self,
294 *,
295 keepalive_timeout: Union[object, None, float] = sentinel,
296 force_close: bool = False,
297 limit: int = 100,
298 limit_per_host: int = 0,
299 enable_cleanup_closed: bool = False,
300 loop: Optional[asyncio.AbstractEventLoop] = None,
301 timeout_ceil_threshold: float = 5,
302 ) -> None:
303
304 if force_close:
305 if keepalive_timeout is not None and keepalive_timeout is not sentinel:
306 raise ValueError(
307 "keepalive_timeout cannot be set if force_close is True"
308 )
309 else:
310 if keepalive_timeout is sentinel:
311 keepalive_timeout = 15.0
312
313 loop = loop or asyncio.get_running_loop()
314 self._timeout_ceil_threshold = timeout_ceil_threshold
315
316 self._closed = False
317 if loop.get_debug():
318 self._source_traceback = traceback.extract_stack(sys._getframe(1))
319
320 # Connection pool of reusable connections.
321 # We use a deque to store connections because it has O(1) popleft()
322 # and O(1) append() operations to implement a FIFO queue.
323 self._conns: DefaultDict[
324 ConnectionKey, Deque[Tuple[ResponseHandler, float]]
325 ] = defaultdict(deque)
326 self._limit = limit
327 self._limit_per_host = limit_per_host
328 self._acquired: Set[ResponseHandler] = set()
329 self._acquired_per_host: DefaultDict[ConnectionKey, Set[ResponseHandler]] = (
330 defaultdict(set)
331 )
332 self._keepalive_timeout = cast(float, keepalive_timeout)
333 self._force_close = force_close
334
335 # {host_key: FIFO list of waiters}
336 # The FIFO is implemented with an OrderedDict with None keys because
337 # python does not have an ordered set.
338 self._waiters: DefaultDict[
339 ConnectionKey, OrderedDict[asyncio.Future[None], None]
340 ] = defaultdict(OrderedDict)
341
342 self._loop = loop
343 self._factory = functools.partial(ResponseHandler, loop=loop)
344
345 # start keep-alive connection cleanup task
346 self._cleanup_handle: Optional[asyncio.TimerHandle] = None
347
348 # start cleanup closed transports task
349 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None
350
351 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED:
352 warnings.warn(
353 "enable_cleanup_closed ignored because "
354 "https://github.com/python/cpython/pull/118960 is fixed "
355 f"in Python version {sys.version_info}",
356 DeprecationWarning,
357 stacklevel=2,
358 )
359 enable_cleanup_closed = False
360
361 self._cleanup_closed_disabled = not enable_cleanup_closed
362 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = []
363 self._placeholder_future: asyncio.Future[Optional[Exception]] = (
364 loop.create_future()
365 )
366 self._placeholder_future.set_result(None)
367 self._cleanup_closed()
368
369 def __del__(self, _warnings: Any = warnings) -> None:
370 if self._closed:
371 return
372 if not self._conns:
373 return
374
375 conns = [repr(c) for c in self._conns.values()]
376
377 self._close()
378
379 kwargs = {"source": self}
380 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs)
381 context = {
382 "connector": self,
383 "connections": conns,
384 "message": "Unclosed connector",
385 }
386 if self._source_traceback is not None:
387 context["source_traceback"] = self._source_traceback
388 self._loop.call_exception_handler(context)
389
390 def __enter__(self) -> "BaseConnector":
391 warnings.warn(
392 '"with Connector():" is deprecated, '
393 'use "async with Connector():" instead',
394 DeprecationWarning,
395 )
396 return self
397
398 def __exit__(self, *exc: Any) -> None:
399 self._close()
400
401 async def __aenter__(self) -> "BaseConnector":
402 return self
403
404 async def __aexit__(
405 self,
406 exc_type: Optional[Type[BaseException]] = None,
407 exc_value: Optional[BaseException] = None,
408 exc_traceback: Optional[TracebackType] = None,
409 ) -> None:
410 await self.close()
411
412 @property
413 def force_close(self) -> bool:
414 """Ultimately close connection on releasing if True."""
415 return self._force_close
416
417 @property
418 def limit(self) -> int:
419 """The total number for simultaneous connections.
420
421 If limit is 0 the connector has no limit.
422 The default limit size is 100.
423 """
424 return self._limit
425
426 @property
427 def limit_per_host(self) -> int:
428 """The limit for simultaneous connections to the same endpoint.
429
430 Endpoints are the same if they are have equal
431 (host, port, is_ssl) triple.
432 """
433 return self._limit_per_host
434
435 def _cleanup(self) -> None:
436 """Cleanup unused transports."""
437 if self._cleanup_handle:
438 self._cleanup_handle.cancel()
439 # _cleanup_handle should be unset, otherwise _release() will not
440 # recreate it ever!
441 self._cleanup_handle = None
442
443 now = monotonic()
444 timeout = self._keepalive_timeout
445
446 if self._conns:
447 connections = defaultdict(deque)
448 deadline = now - timeout
449 for key, conns in self._conns.items():
450 alive: Deque[Tuple[ResponseHandler, float]] = deque()
451 for proto, use_time in conns:
452 if proto.is_connected() and use_time - deadline >= 0:
453 alive.append((proto, use_time))
454 continue
455 transport = proto.transport
456 proto.close()
457 if not self._cleanup_closed_disabled and key.is_ssl:
458 self._cleanup_closed_transports.append(transport)
459
460 if alive:
461 connections[key] = alive
462
463 self._conns = connections
464
465 if self._conns:
466 self._cleanup_handle = helpers.weakref_handle(
467 self,
468 "_cleanup",
469 timeout,
470 self._loop,
471 timeout_ceil_threshold=self._timeout_ceil_threshold,
472 )
473
474 def _cleanup_closed(self) -> None:
475 """Double confirmation for transport close.
476
477 Some broken ssl servers may leave socket open without proper close.
478 """
479 if self._cleanup_closed_handle:
480 self._cleanup_closed_handle.cancel()
481
482 for transport in self._cleanup_closed_transports:
483 if transport is not None:
484 transport.abort()
485
486 self._cleanup_closed_transports = []
487
488 if not self._cleanup_closed_disabled:
489 self._cleanup_closed_handle = helpers.weakref_handle(
490 self,
491 "_cleanup_closed",
492 self._cleanup_closed_period,
493 self._loop,
494 timeout_ceil_threshold=self._timeout_ceil_threshold,
495 )
496
497 def close(self, *, abort_ssl: bool = False) -> Awaitable[None]:
498 """Close all opened transports.
499
500 :param abort_ssl: If True, SSL connections will be aborted immediately
501 without performing the shutdown handshake. This provides
502 faster cleanup at the cost of less graceful disconnection.
503 """
504 if not (waiters := self._close(abort_ssl=abort_ssl)):
505 # If there are no connections to close, we can return a noop
506 # awaitable to avoid scheduling a task on the event loop.
507 return _DeprecationWaiter(noop())
508 coro = _wait_for_close(waiters)
509 if sys.version_info >= (3, 12):
510 # Optimization for Python 3.12, try to close connections
511 # immediately to avoid having to schedule the task on the event loop.
512 task = asyncio.Task(coro, loop=self._loop, eager_start=True)
513 else:
514 task = self._loop.create_task(coro)
515 return _DeprecationWaiter(task)
516
517 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]:
518 waiters: List[Awaitable[object]] = []
519
520 if self._closed:
521 return waiters
522
523 self._closed = True
524
525 try:
526 if self._loop.is_closed():
527 return waiters
528
529 # cancel cleanup task
530 if self._cleanup_handle:
531 self._cleanup_handle.cancel()
532
533 # cancel cleanup close task
534 if self._cleanup_closed_handle:
535 self._cleanup_closed_handle.cancel()
536
537 for data in self._conns.values():
538 for proto, _ in data:
539 if (
540 abort_ssl
541 and proto.transport
542 and proto.transport.get_extra_info("sslcontext") is not None
543 ):
544 proto.abort()
545 else:
546 proto.close()
547 if closed := proto.closed:
548 waiters.append(closed)
549
550 for proto in self._acquired:
551 if (
552 abort_ssl
553 and proto.transport
554 and proto.transport.get_extra_info("sslcontext") is not None
555 ):
556 proto.abort()
557 else:
558 proto.close()
559 if closed := proto.closed:
560 waiters.append(closed)
561
562 for transport in self._cleanup_closed_transports:
563 if transport is not None:
564 transport.abort()
565
566 return waiters
567
568 finally:
569 self._conns.clear()
570 self._acquired.clear()
571 for keyed_waiters in self._waiters.values():
572 for keyed_waiter in keyed_waiters:
573 keyed_waiter.cancel()
574 self._waiters.clear()
575 self._cleanup_handle = None
576 self._cleanup_closed_transports.clear()
577 self._cleanup_closed_handle = None
578
579 @property
580 def closed(self) -> bool:
581 """Is connector closed.
582
583 A readonly property.
584 """
585 return self._closed
586
587 def _available_connections(self, key: "ConnectionKey") -> int:
588 """
589 Return number of available connections.
590
591 The limit, limit_per_host and the connection key are taken into account.
592
593 If it returns less than 1 means that there are no connections
594 available.
595 """
596 # check total available connections
597 # If there are no limits, this will always return 1
598 total_remain = 1
599
600 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0:
601 return total_remain
602
603 # check limit per host
604 if host_remain := self._limit_per_host:
605 if acquired := self._acquired_per_host.get(key):
606 host_remain -= len(acquired)
607 if total_remain > host_remain:
608 return host_remain
609
610 return total_remain
611
612 async def connect(
613 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
614 ) -> Connection:
615 """Get from pool or create new connection."""
616 key = req.connection_key
617 if (conn := await self._get(key, traces)) is not None:
618 # If we do not have to wait and we can get a connection from the pool
619 # we can avoid the timeout ceil logic and directly return the connection
620 return conn
621
622 async with ceil_timeout(timeout.connect, timeout.ceil_threshold):
623 if self._available_connections(key) <= 0:
624 await self._wait_for_available_connection(key, traces)
625 if (conn := await self._get(key, traces)) is not None:
626 return conn
627
628 placeholder = cast(
629 ResponseHandler, _TransportPlaceholder(self._placeholder_future)
630 )
631 self._acquired.add(placeholder)
632 if self._limit_per_host:
633 self._acquired_per_host[key].add(placeholder)
634
635 try:
636 # Traces are done inside the try block to ensure that the
637 # that the placeholder is still cleaned up if an exception
638 # is raised.
639 if traces:
640 for trace in traces:
641 await trace.send_connection_create_start()
642 proto = await self._create_connection(req, traces, timeout)
643 if traces:
644 for trace in traces:
645 await trace.send_connection_create_end()
646 except BaseException:
647 self._release_acquired(key, placeholder)
648 raise
649 else:
650 if self._closed:
651 proto.close()
652 raise ClientConnectionError("Connector is closed.")
653
654 # The connection was successfully created, drop the placeholder
655 # and add the real connection to the acquired set. There should
656 # be no awaits after the proto is added to the acquired set
657 # to ensure that the connection is not left in the acquired set
658 # on cancellation.
659 self._acquired.remove(placeholder)
660 self._acquired.add(proto)
661 if self._limit_per_host:
662 acquired_per_host = self._acquired_per_host[key]
663 acquired_per_host.remove(placeholder)
664 acquired_per_host.add(proto)
665 return Connection(self, key, proto, self._loop)
666
667 async def _wait_for_available_connection(
668 self, key: "ConnectionKey", traces: List["Trace"]
669 ) -> None:
670 """Wait for an available connection slot."""
671 # We loop here because there is a race between
672 # the connection limit check and the connection
673 # being acquired. If the connection is acquired
674 # between the check and the await statement, we
675 # need to loop again to check if the connection
676 # slot is still available.
677 attempts = 0
678 while True:
679 fut: asyncio.Future[None] = self._loop.create_future()
680 keyed_waiters = self._waiters[key]
681 keyed_waiters[fut] = None
682 if attempts:
683 # If we have waited before, we need to move the waiter
684 # to the front of the queue as otherwise we might get
685 # starved and hit the timeout.
686 keyed_waiters.move_to_end(fut, last=False)
687
688 try:
689 # Traces happen in the try block to ensure that the
690 # the waiter is still cleaned up if an exception is raised.
691 if traces:
692 for trace in traces:
693 await trace.send_connection_queued_start()
694 await fut
695 if traces:
696 for trace in traces:
697 await trace.send_connection_queued_end()
698 finally:
699 # pop the waiter from the queue if its still
700 # there and not already removed by _release_waiter
701 keyed_waiters.pop(fut, None)
702 if not self._waiters.get(key, True):
703 del self._waiters[key]
704
705 if self._available_connections(key) > 0:
706 break
707 attempts += 1
708
709 async def _get(
710 self, key: "ConnectionKey", traces: List["Trace"]
711 ) -> Optional[Connection]:
712 """Get next reusable connection for the key or None.
713
714 The connection will be marked as acquired.
715 """
716 if (conns := self._conns.get(key)) is None:
717 return None
718
719 t1 = monotonic()
720 while conns:
721 proto, t0 = conns.popleft()
722 # We will we reuse the connection if its connected and
723 # the keepalive timeout has not been exceeded
724 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout:
725 if not conns:
726 # The very last connection was reclaimed: drop the key
727 del self._conns[key]
728 self._acquired.add(proto)
729 if self._limit_per_host:
730 self._acquired_per_host[key].add(proto)
731 if traces:
732 for trace in traces:
733 try:
734 await trace.send_connection_reuseconn()
735 except BaseException:
736 self._release_acquired(key, proto)
737 raise
738 return Connection(self, key, proto, self._loop)
739
740 # Connection cannot be reused, close it
741 transport = proto.transport
742 proto.close()
743 # only for SSL transports
744 if not self._cleanup_closed_disabled and key.is_ssl:
745 self._cleanup_closed_transports.append(transport)
746
747 # No more connections: drop the key
748 del self._conns[key]
749 return None
750
751 def _release_waiter(self) -> None:
752 """
753 Iterates over all waiters until one to be released is found.
754
755 The one to be released is not finished and
756 belongs to a host that has available connections.
757 """
758 if not self._waiters:
759 return
760
761 # Having the dict keys ordered this avoids to iterate
762 # at the same order at each call.
763 queues = list(self._waiters)
764 random.shuffle(queues)
765
766 for key in queues:
767 if self._available_connections(key) < 1:
768 continue
769
770 waiters = self._waiters[key]
771 while waiters:
772 waiter, _ = waiters.popitem(last=False)
773 if not waiter.done():
774 waiter.set_result(None)
775 return
776
777 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
778 """Release acquired connection."""
779 if self._closed:
780 # acquired connection is already released on connector closing
781 return
782
783 self._acquired.discard(proto)
784 if self._limit_per_host and (conns := self._acquired_per_host.get(key)):
785 conns.discard(proto)
786 if not conns:
787 del self._acquired_per_host[key]
788 self._release_waiter()
789
790 def _release(
791 self,
792 key: "ConnectionKey",
793 protocol: ResponseHandler,
794 *,
795 should_close: bool = False,
796 ) -> None:
797 if self._closed:
798 # acquired connection is already released on connector closing
799 return
800
801 self._release_acquired(key, protocol)
802
803 if self._force_close or should_close or protocol.should_close:
804 transport = protocol.transport
805 protocol.close()
806
807 if key.is_ssl and not self._cleanup_closed_disabled:
808 self._cleanup_closed_transports.append(transport)
809 return
810
811 self._conns[key].append((protocol, monotonic()))
812
813 if self._cleanup_handle is None:
814 self._cleanup_handle = helpers.weakref_handle(
815 self,
816 "_cleanup",
817 self._keepalive_timeout,
818 self._loop,
819 timeout_ceil_threshold=self._timeout_ceil_threshold,
820 )
821
822 async def _create_connection(
823 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
824 ) -> ResponseHandler:
825 raise NotImplementedError()
826
827
828class _DNSCacheTable:
829 def __init__(self, ttl: Optional[float] = None) -> None:
830 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[ResolveResult], int]] = {}
831 self._timestamps: Dict[Tuple[str, int], float] = {}
832 self._ttl = ttl
833
834 def __contains__(self, host: object) -> bool:
835 return host in self._addrs_rr
836
837 def add(self, key: Tuple[str, int], addrs: List[ResolveResult]) -> None:
838 self._addrs_rr[key] = (cycle(addrs), len(addrs))
839
840 if self._ttl is not None:
841 self._timestamps[key] = monotonic()
842
843 def remove(self, key: Tuple[str, int]) -> None:
844 self._addrs_rr.pop(key, None)
845
846 if self._ttl is not None:
847 self._timestamps.pop(key, None)
848
849 def clear(self) -> None:
850 self._addrs_rr.clear()
851 self._timestamps.clear()
852
853 def next_addrs(self, key: Tuple[str, int]) -> List[ResolveResult]:
854 loop, length = self._addrs_rr[key]
855 addrs = list(islice(loop, length))
856 # Consume one more element to shift internal state of `cycle`
857 next(loop)
858 return addrs
859
860 def expired(self, key: Tuple[str, int]) -> bool:
861 if self._ttl is None:
862 return False
863
864 return self._timestamps[key] + self._ttl < monotonic()
865
866
867def _make_ssl_context(verified: bool) -> SSLContext:
868 """Create SSL context.
869
870 This method is not async-friendly and should be called from a thread
871 because it will load certificates from disk and do other blocking I/O.
872 """
873 if ssl is None:
874 # No ssl support
875 return None
876 if verified:
877 sslcontext = ssl.create_default_context()
878 else:
879 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
880 sslcontext.options |= ssl.OP_NO_SSLv2
881 sslcontext.options |= ssl.OP_NO_SSLv3
882 sslcontext.check_hostname = False
883 sslcontext.verify_mode = ssl.CERT_NONE
884 sslcontext.options |= ssl.OP_NO_COMPRESSION
885 sslcontext.set_default_verify_paths()
886 sslcontext.set_alpn_protocols(("http/1.1",))
887 return sslcontext
888
889
890# The default SSLContext objects are created at import time
891# since they do blocking I/O to load certificates from disk,
892# and imports should always be done before the event loop starts
893# or in a thread.
894_SSL_CONTEXT_VERIFIED = _make_ssl_context(True)
895_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False)
896
897
898class TCPConnector(BaseConnector):
899 """TCP connector.
900
901 verify_ssl - Set to True to check ssl certifications.
902 fingerprint - Pass the binary sha256
903 digest of the expected certificate in DER format to verify
904 that the certificate the server presents matches. See also
905 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning
906 resolver - Enable DNS lookups and use this
907 resolver
908 use_dns_cache - Use memory cache for DNS lookups.
909 ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
910 family - socket address family
911 local_addr - local tuple of (host, port) to bind socket to
912
913 keepalive_timeout - (optional) Keep-alive timeout.
914 force_close - Set to True to force close and do reconnect
915 after each request (and between redirects).
916 limit - The total number of simultaneous connections.
917 limit_per_host - Number of simultaneous connections to one host.
918 enable_cleanup_closed - Enables clean-up closed ssl transports.
919 Disabled by default.
920 happy_eyeballs_delay - This is the “Connection Attempt Delay”
921 as defined in RFC 8305. To disable
922 the happy eyeballs algorithm, set to None.
923 interleave - “First Address Family Count” as defined in RFC 8305
924 loop - Optional event loop.
925 socket_factory - A SocketFactoryType function that, if supplied,
926 will be used to create sockets given an
927 AddrInfoType.
928 ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0.
929 Grace period for SSL shutdown handshake on TLS
930 connections. Default is 0 seconds (immediate abort).
931 This parameter allowed for a clean SSL shutdown by
932 notifying the remote peer of connection closure,
933 while avoiding excessive delays during connector cleanup.
934 Note: Only takes effect on Python 3.11+.
935 """
936
937 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"})
938
939 def __init__(
940 self,
941 *,
942 verify_ssl: bool = True,
943 fingerprint: Optional[bytes] = None,
944 use_dns_cache: bool = True,
945 ttl_dns_cache: Optional[int] = 10,
946 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC,
947 ssl_context: Optional[SSLContext] = None,
948 ssl: Union[bool, Fingerprint, SSLContext] = True,
949 local_addr: Optional[Tuple[str, int]] = None,
950 resolver: Optional[AbstractResolver] = None,
951 keepalive_timeout: Union[None, float, object] = sentinel,
952 force_close: bool = False,
953 limit: int = 100,
954 limit_per_host: int = 0,
955 enable_cleanup_closed: bool = False,
956 loop: Optional[asyncio.AbstractEventLoop] = None,
957 timeout_ceil_threshold: float = 5,
958 happy_eyeballs_delay: Optional[float] = 0.25,
959 interleave: Optional[int] = None,
960 socket_factory: Optional[SocketFactoryType] = None,
961 ssl_shutdown_timeout: Union[_SENTINEL, None, float] = sentinel,
962 ):
963 super().__init__(
964 keepalive_timeout=keepalive_timeout,
965 force_close=force_close,
966 limit=limit,
967 limit_per_host=limit_per_host,
968 enable_cleanup_closed=enable_cleanup_closed,
969 loop=loop,
970 timeout_ceil_threshold=timeout_ceil_threshold,
971 )
972
973 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
974
975 self._resolver: AbstractResolver
976 if resolver is None:
977 self._resolver = DefaultResolver(loop=self._loop)
978 self._resolver_owner = True
979 else:
980 self._resolver = resolver
981 self._resolver_owner = False
982
983 self._use_dns_cache = use_dns_cache
984 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)
985 self._throttle_dns_futures: Dict[
986 Tuple[str, int], Set["asyncio.Future[None]"]
987 ] = {}
988 self._family = family
989 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr)
990 self._happy_eyeballs_delay = happy_eyeballs_delay
991 self._interleave = interleave
992 self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set()
993 self._socket_factory = socket_factory
994 self._ssl_shutdown_timeout: Optional[float]
995 # Handle ssl_shutdown_timeout with warning for Python < 3.11
996 if ssl_shutdown_timeout is sentinel:
997 self._ssl_shutdown_timeout = 0
998 else:
999 # Deprecation warning for ssl_shutdown_timeout parameter
1000 warnings.warn(
1001 "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0",
1002 DeprecationWarning,
1003 stacklevel=2,
1004 )
1005 if (
1006 sys.version_info < (3, 11)
1007 and ssl_shutdown_timeout is not None
1008 and ssl_shutdown_timeout != 0
1009 ):
1010 warnings.warn(
1011 f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; "
1012 "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.",
1013 RuntimeWarning,
1014 stacklevel=2,
1015 )
1016 self._ssl_shutdown_timeout = ssl_shutdown_timeout
1017
1018 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]:
1019 """Close all ongoing DNS calls."""
1020 for fut in chain.from_iterable(self._throttle_dns_futures.values()):
1021 fut.cancel()
1022
1023 waiters = super()._close(abort_ssl=abort_ssl)
1024
1025 for t in self._resolve_host_tasks:
1026 t.cancel()
1027 waiters.append(t)
1028
1029 return waiters
1030
1031 async def close(self, *, abort_ssl: bool = False) -> None:
1032 """
1033 Close all opened transports.
1034
1035 :param abort_ssl: If True, SSL connections will be aborted immediately
1036 without performing the shutdown handshake. If False (default),
1037 the behavior is determined by ssl_shutdown_timeout:
1038 - If ssl_shutdown_timeout=0: connections are aborted
1039 - If ssl_shutdown_timeout>0: graceful shutdown is performed
1040 """
1041 if self._resolver_owner:
1042 await self._resolver.close()
1043 # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default
1044 await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0)
1045
1046 @property
1047 def family(self) -> int:
1048 """Socket family like AF_INET."""
1049 return self._family
1050
1051 @property
1052 def use_dns_cache(self) -> bool:
1053 """True if local DNS caching is enabled."""
1054 return self._use_dns_cache
1055
1056 def clear_dns_cache(
1057 self, host: Optional[str] = None, port: Optional[int] = None
1058 ) -> None:
1059 """Remove specified host/port or clear all dns local cache."""
1060 if host is not None and port is not None:
1061 self._cached_hosts.remove((host, port))
1062 elif host is not None or port is not None:
1063 raise ValueError("either both host and port or none of them are allowed")
1064 else:
1065 self._cached_hosts.clear()
1066
1067 async def _resolve_host(
1068 self, host: str, port: int, traces: Optional[Sequence["Trace"]] = None
1069 ) -> List[ResolveResult]:
1070 """Resolve host and return list of addresses."""
1071 if is_ip_address(host):
1072 return [
1073 {
1074 "hostname": host,
1075 "host": host,
1076 "port": port,
1077 "family": self._family,
1078 "proto": 0,
1079 "flags": 0,
1080 }
1081 ]
1082
1083 if not self._use_dns_cache:
1084
1085 if traces:
1086 for trace in traces:
1087 await trace.send_dns_resolvehost_start(host)
1088
1089 res = await self._resolver.resolve(host, port, family=self._family)
1090
1091 if traces:
1092 for trace in traces:
1093 await trace.send_dns_resolvehost_end(host)
1094
1095 return res
1096
1097 key = (host, port)
1098 if key in self._cached_hosts and not self._cached_hosts.expired(key):
1099 # get result early, before any await (#4014)
1100 result = self._cached_hosts.next_addrs(key)
1101
1102 if traces:
1103 for trace in traces:
1104 await trace.send_dns_cache_hit(host)
1105 return result
1106
1107 futures: Set["asyncio.Future[None]"]
1108 #
1109 # If multiple connectors are resolving the same host, we wait
1110 # for the first one to resolve and then use the result for all of them.
1111 # We use a throttle to ensure that we only resolve the host once
1112 # and then use the result for all the waiters.
1113 #
1114 if key in self._throttle_dns_futures:
1115 # get futures early, before any await (#4014)
1116 futures = self._throttle_dns_futures[key]
1117 future: asyncio.Future[None] = self._loop.create_future()
1118 futures.add(future)
1119 if traces:
1120 for trace in traces:
1121 await trace.send_dns_cache_hit(host)
1122 try:
1123 await future
1124 finally:
1125 futures.discard(future)
1126 return self._cached_hosts.next_addrs(key)
1127
1128 # update dict early, before any await (#4014)
1129 self._throttle_dns_futures[key] = futures = set()
1130 # In this case we need to create a task to ensure that we can shield
1131 # the task from cancellation as cancelling this lookup should not cancel
1132 # the underlying lookup or else the cancel event will get broadcast to
1133 # all the waiters across all connections.
1134 #
1135 coro = self._resolve_host_with_throttle(key, host, port, futures, traces)
1136 loop = asyncio.get_running_loop()
1137 if sys.version_info >= (3, 12):
1138 # Optimization for Python 3.12, try to send immediately
1139 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True)
1140 else:
1141 resolved_host_task = loop.create_task(coro)
1142
1143 if not resolved_host_task.done():
1144 self._resolve_host_tasks.add(resolved_host_task)
1145 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard)
1146
1147 try:
1148 return await asyncio.shield(resolved_host_task)
1149 except asyncio.CancelledError:
1150
1151 def drop_exception(fut: "asyncio.Future[List[ResolveResult]]") -> None:
1152 with suppress(Exception, asyncio.CancelledError):
1153 fut.result()
1154
1155 resolved_host_task.add_done_callback(drop_exception)
1156 raise
1157
1158 async def _resolve_host_with_throttle(
1159 self,
1160 key: Tuple[str, int],
1161 host: str,
1162 port: int,
1163 futures: Set["asyncio.Future[None]"],
1164 traces: Optional[Sequence["Trace"]],
1165 ) -> List[ResolveResult]:
1166 """Resolve host and set result for all waiters.
1167
1168 This method must be run in a task and shielded from cancellation
1169 to avoid cancelling the underlying lookup.
1170 """
1171 try:
1172 if traces:
1173 for trace in traces:
1174 await trace.send_dns_cache_miss(host)
1175
1176 for trace in traces:
1177 await trace.send_dns_resolvehost_start(host)
1178
1179 addrs = await self._resolver.resolve(host, port, family=self._family)
1180 if traces:
1181 for trace in traces:
1182 await trace.send_dns_resolvehost_end(host)
1183
1184 self._cached_hosts.add(key, addrs)
1185 for fut in futures:
1186 set_result(fut, None)
1187 except BaseException as e:
1188 # any DNS exception is set for the waiters to raise the same exception.
1189 # This coro is always run in task that is shielded from cancellation so
1190 # we should never be propagating cancellation here.
1191 for fut in futures:
1192 set_exception(fut, e)
1193 raise
1194 finally:
1195 self._throttle_dns_futures.pop(key)
1196
1197 return self._cached_hosts.next_addrs(key)
1198
1199 async def _create_connection(
1200 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1201 ) -> ResponseHandler:
1202 """Create connection.
1203
1204 Has same keyword arguments as BaseEventLoop.create_connection.
1205 """
1206 if req.proxy:
1207 _, proto = await self._create_proxy_connection(req, traces, timeout)
1208 else:
1209 _, proto = await self._create_direct_connection(req, traces, timeout)
1210
1211 return proto
1212
1213 def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]:
1214 """Logic to get the correct SSL context
1215
1216 0. if req.ssl is false, return None
1217
1218 1. if ssl_context is specified in req, use it
1219 2. if _ssl_context is specified in self, use it
1220 3. otherwise:
1221 1. if verify_ssl is not specified in req, use self.ssl_context
1222 (will generate a default context according to self.verify_ssl)
1223 2. if verify_ssl is True in req, generate a default SSL context
1224 3. if verify_ssl is False in req, generate a SSL context that
1225 won't verify
1226 """
1227 if not req.is_ssl():
1228 return None
1229
1230 if ssl is None: # pragma: no cover
1231 raise RuntimeError("SSL is not supported.")
1232 sslcontext = req.ssl
1233 if isinstance(sslcontext, ssl.SSLContext):
1234 return sslcontext
1235 if sslcontext is not True:
1236 # not verified or fingerprinted
1237 return _SSL_CONTEXT_UNVERIFIED
1238 sslcontext = self._ssl
1239 if isinstance(sslcontext, ssl.SSLContext):
1240 return sslcontext
1241 if sslcontext is not True:
1242 # not verified or fingerprinted
1243 return _SSL_CONTEXT_UNVERIFIED
1244 return _SSL_CONTEXT_VERIFIED
1245
1246 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]:
1247 ret = req.ssl
1248 if isinstance(ret, Fingerprint):
1249 return ret
1250 ret = self._ssl
1251 if isinstance(ret, Fingerprint):
1252 return ret
1253 return None
1254
1255 async def _wrap_create_connection(
1256 self,
1257 *args: Any,
1258 addr_infos: List[AddrInfoType],
1259 req: ClientRequest,
1260 timeout: "ClientTimeout",
1261 client_error: Type[Exception] = ClientConnectorError,
1262 **kwargs: Any,
1263 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1264 try:
1265 async with ceil_timeout(
1266 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1267 ):
1268 sock = await aiohappyeyeballs.start_connection(
1269 addr_infos=addr_infos,
1270 local_addr_infos=self._local_addr_infos,
1271 happy_eyeballs_delay=self._happy_eyeballs_delay,
1272 interleave=self._interleave,
1273 loop=self._loop,
1274 socket_factory=self._socket_factory,
1275 )
1276 # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used
1277 if (
1278 kwargs.get("ssl")
1279 and self._ssl_shutdown_timeout
1280 and sys.version_info >= (3, 11)
1281 ):
1282 kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout
1283 return await self._loop.create_connection(*args, **kwargs, sock=sock)
1284 except cert_errors as exc:
1285 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1286 except ssl_errors as exc:
1287 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1288 except OSError as exc:
1289 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1290 raise
1291 raise client_error(req.connection_key, exc) from exc
1292
1293 async def _wrap_existing_connection(
1294 self,
1295 *args: Any,
1296 req: ClientRequest,
1297 timeout: "ClientTimeout",
1298 client_error: Type[Exception] = ClientConnectorError,
1299 **kwargs: Any,
1300 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1301 try:
1302 async with ceil_timeout(
1303 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1304 ):
1305 return await self._loop.create_connection(*args, **kwargs)
1306 except cert_errors as exc:
1307 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1308 except ssl_errors as exc:
1309 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1310 except OSError as exc:
1311 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1312 raise
1313 raise client_error(req.connection_key, exc) from exc
1314
1315 def _fail_on_no_start_tls(self, req: "ClientRequest") -> None:
1316 """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``.
1317
1318 It is necessary for TLS-in-TLS so that it is possible to
1319 send HTTPS queries through HTTPS proxies.
1320
1321 This doesn't affect regular HTTP requests, though.
1322 """
1323 if not req.is_ssl():
1324 return
1325
1326 proxy_url = req.proxy
1327 assert proxy_url is not None
1328 if proxy_url.scheme != "https":
1329 return
1330
1331 self._check_loop_for_start_tls()
1332
1333 def _check_loop_for_start_tls(self) -> None:
1334 try:
1335 self._loop.start_tls
1336 except AttributeError as attr_exc:
1337 raise RuntimeError(
1338 "An HTTPS request is being sent through an HTTPS proxy. "
1339 "This needs support for TLS in TLS but it is not implemented "
1340 "in your runtime for the stdlib asyncio.\n\n"
1341 "Please upgrade to Python 3.11 or higher. For more details, "
1342 "please see:\n"
1343 "* https://bugs.python.org/issue37179\n"
1344 "* https://github.com/python/cpython/pull/28073\n"
1345 "* https://docs.aiohttp.org/en/stable/"
1346 "client_advanced.html#proxy-support\n"
1347 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1348 ) from attr_exc
1349
1350 def _loop_supports_start_tls(self) -> bool:
1351 try:
1352 self._check_loop_for_start_tls()
1353 except RuntimeError:
1354 return False
1355 else:
1356 return True
1357
1358 def _warn_about_tls_in_tls(
1359 self,
1360 underlying_transport: asyncio.Transport,
1361 req: ClientRequest,
1362 ) -> None:
1363 """Issue a warning if the requested URL has HTTPS scheme."""
1364 if req.request_info.url.scheme != "https":
1365 return
1366
1367 # Check if uvloop is being used, which supports TLS in TLS,
1368 # otherwise assume that asyncio's native transport is being used.
1369 if type(underlying_transport).__module__.startswith("uvloop"):
1370 return
1371
1372 # Support in asyncio was added in Python 3.11 (bpo-44011)
1373 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr(
1374 underlying_transport,
1375 "_start_tls_compatible",
1376 False,
1377 )
1378
1379 if asyncio_supports_tls_in_tls:
1380 return
1381
1382 warnings.warn(
1383 "An HTTPS request is being sent through an HTTPS proxy. "
1384 "This support for TLS in TLS is known to be disabled "
1385 "in the stdlib asyncio (Python <3.11). This is why you'll probably see "
1386 "an error in the log below.\n\n"
1387 "It is possible to enable it via monkeypatching. "
1388 "For more details, see:\n"
1389 "* https://bugs.python.org/issue37179\n"
1390 "* https://github.com/python/cpython/pull/28073\n\n"
1391 "You can temporarily patch this as follows:\n"
1392 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
1393 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1394 RuntimeWarning,
1395 source=self,
1396 # Why `4`? At least 3 of the calls in the stack originate
1397 # from the methods in this class.
1398 stacklevel=3,
1399 )
1400
1401 async def _start_tls_connection(
1402 self,
1403 underlying_transport: asyncio.Transport,
1404 req: ClientRequest,
1405 timeout: "ClientTimeout",
1406 client_error: Type[Exception] = ClientConnectorError,
1407 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1408 """Wrap the raw TCP transport with TLS."""
1409 tls_proto = self._factory() # Create a brand new proto for TLS
1410 sslcontext = self._get_ssl_context(req)
1411 if TYPE_CHECKING:
1412 # _start_tls_connection is unreachable in the current code path
1413 # if sslcontext is None.
1414 assert sslcontext is not None
1415
1416 try:
1417 async with ceil_timeout(
1418 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1419 ):
1420 try:
1421 # ssl_shutdown_timeout is only available in Python 3.11+
1422 if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout:
1423 tls_transport = await self._loop.start_tls(
1424 underlying_transport,
1425 tls_proto,
1426 sslcontext,
1427 server_hostname=req.server_hostname or req.host,
1428 ssl_handshake_timeout=timeout.total,
1429 ssl_shutdown_timeout=self._ssl_shutdown_timeout,
1430 )
1431 else:
1432 tls_transport = await self._loop.start_tls(
1433 underlying_transport,
1434 tls_proto,
1435 sslcontext,
1436 server_hostname=req.server_hostname or req.host,
1437 ssl_handshake_timeout=timeout.total,
1438 )
1439 except BaseException:
1440 # We need to close the underlying transport since
1441 # `start_tls()` probably failed before it had a
1442 # chance to do this:
1443 if self._ssl_shutdown_timeout == 0:
1444 underlying_transport.abort()
1445 else:
1446 underlying_transport.close()
1447 raise
1448 if isinstance(tls_transport, asyncio.Transport):
1449 fingerprint = self._get_fingerprint(req)
1450 if fingerprint:
1451 try:
1452 fingerprint.check(tls_transport)
1453 except ServerFingerprintMismatch:
1454 tls_transport.close()
1455 if not self._cleanup_closed_disabled:
1456 self._cleanup_closed_transports.append(tls_transport)
1457 raise
1458 except cert_errors as exc:
1459 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1460 except ssl_errors as exc:
1461 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1462 except OSError as exc:
1463 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1464 raise
1465 raise client_error(req.connection_key, exc) from exc
1466 except TypeError as type_err:
1467 # Example cause looks like this:
1468 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
1469 # object at 0x7f760615e460> is not supported by start_tls()
1470
1471 raise ClientConnectionError(
1472 "Cannot initialize a TLS-in-TLS connection to host "
1473 f"{req.host!s}:{req.port:d} through an underlying connection "
1474 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
1475 f"[{type_err!s}]"
1476 ) from type_err
1477 else:
1478 if tls_transport is None:
1479 msg = "Failed to start TLS (possibly caused by closing transport)"
1480 raise client_error(req.connection_key, OSError(msg))
1481 tls_proto.connection_made(
1482 tls_transport
1483 ) # Kick the state machine of the new TLS protocol
1484
1485 return tls_transport, tls_proto
1486
1487 def _convert_hosts_to_addr_infos(
1488 self, hosts: List[ResolveResult]
1489 ) -> List[AddrInfoType]:
1490 """Converts the list of hosts to a list of addr_infos.
1491
1492 The list of hosts is the result of a DNS lookup. The list of
1493 addr_infos is the result of a call to `socket.getaddrinfo()`.
1494 """
1495 addr_infos: List[AddrInfoType] = []
1496 for hinfo in hosts:
1497 host = hinfo["host"]
1498 is_ipv6 = ":" in host
1499 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET
1500 if self._family and self._family != family:
1501 continue
1502 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"])
1503 addr_infos.append(
1504 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)
1505 )
1506 return addr_infos
1507
1508 async def _create_direct_connection(
1509 self,
1510 req: ClientRequest,
1511 traces: List["Trace"],
1512 timeout: "ClientTimeout",
1513 *,
1514 client_error: Type[Exception] = ClientConnectorError,
1515 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1516 sslcontext = self._get_ssl_context(req)
1517 fingerprint = self._get_fingerprint(req)
1518
1519 host = req.url.raw_host
1520 assert host is not None
1521 # Replace multiple trailing dots with a single one.
1522 # A trailing dot is only present for fully-qualified domain names.
1523 # See https://github.com/aio-libs/aiohttp/pull/7364.
1524 if host.endswith(".."):
1525 host = host.rstrip(".") + "."
1526 port = req.port
1527 assert port is not None
1528 try:
1529 # Cancelling this lookup should not cancel the underlying lookup
1530 # or else the cancel event will get broadcast to all the waiters
1531 # across all connections.
1532 hosts = await self._resolve_host(host, port, traces=traces)
1533 except OSError as exc:
1534 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1535 raise
1536 # in case of proxy it is not ClientProxyConnectionError
1537 # it is problem of resolving proxy ip itself
1538 raise ClientConnectorDNSError(req.connection_key, exc) from exc
1539
1540 last_exc: Optional[Exception] = None
1541 addr_infos = self._convert_hosts_to_addr_infos(hosts)
1542 while addr_infos:
1543 # Strip trailing dots, certificates contain FQDN without dots.
1544 # See https://github.com/aio-libs/aiohttp/issues/3636
1545 server_hostname = (
1546 (req.server_hostname or host).rstrip(".") if sslcontext else None
1547 )
1548
1549 try:
1550 transp, proto = await self._wrap_create_connection(
1551 self._factory,
1552 timeout=timeout,
1553 ssl=sslcontext,
1554 addr_infos=addr_infos,
1555 server_hostname=server_hostname,
1556 req=req,
1557 client_error=client_error,
1558 )
1559 except (ClientConnectorError, asyncio.TimeoutError) as exc:
1560 last_exc = exc
1561 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave)
1562 continue
1563
1564 if req.is_ssl() and fingerprint:
1565 try:
1566 fingerprint.check(transp)
1567 except ServerFingerprintMismatch as exc:
1568 transp.close()
1569 if not self._cleanup_closed_disabled:
1570 self._cleanup_closed_transports.append(transp)
1571 last_exc = exc
1572 # Remove the bad peer from the list of addr_infos
1573 sock: socket.socket = transp.get_extra_info("socket")
1574 bad_peer = sock.getpeername()
1575 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer)
1576 continue
1577
1578 return transp, proto
1579 else:
1580 assert last_exc is not None
1581 raise last_exc
1582
1583 async def _create_proxy_connection(
1584 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1585 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1586 self._fail_on_no_start_tls(req)
1587 runtime_has_start_tls = self._loop_supports_start_tls()
1588
1589 headers: Dict[str, str] = {}
1590 if req.proxy_headers is not None:
1591 headers = req.proxy_headers # type: ignore[assignment]
1592 headers[hdrs.HOST] = req.headers[hdrs.HOST]
1593
1594 url = req.proxy
1595 assert url is not None
1596 proxy_req = ClientRequest(
1597 hdrs.METH_GET,
1598 url,
1599 headers=headers,
1600 auth=req.proxy_auth,
1601 loop=self._loop,
1602 ssl=req.ssl,
1603 )
1604
1605 # create connection to proxy server
1606 transport, proto = await self._create_direct_connection(
1607 proxy_req, [], timeout, client_error=ClientProxyConnectionError
1608 )
1609
1610 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
1611 if auth is not None:
1612 if not req.is_ssl():
1613 req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1614 else:
1615 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1616
1617 if req.is_ssl():
1618 if runtime_has_start_tls:
1619 self._warn_about_tls_in_tls(transport, req)
1620
1621 # For HTTPS requests over HTTP proxy
1622 # we must notify proxy to tunnel connection
1623 # so we send CONNECT command:
1624 # CONNECT www.python.org:443 HTTP/1.1
1625 # Host: www.python.org
1626 #
1627 # next we must do TLS handshake and so on
1628 # to do this we must wrap raw socket into secure one
1629 # asyncio handles this perfectly
1630 proxy_req.method = hdrs.METH_CONNECT
1631 proxy_req.url = req.url
1632 key = req.connection_key._replace(
1633 proxy=None, proxy_auth=None, proxy_headers_hash=None
1634 )
1635 conn = _ConnectTunnelConnection(self, key, proto, self._loop)
1636 proxy_resp = await proxy_req.send(conn)
1637 try:
1638 protocol = conn._protocol
1639 assert protocol is not None
1640
1641 # read_until_eof=True will ensure the connection isn't closed
1642 # once the response is received and processed allowing
1643 # START_TLS to work on the connection below.
1644 protocol.set_response_params(
1645 read_until_eof=runtime_has_start_tls,
1646 timeout_ceil_threshold=self._timeout_ceil_threshold,
1647 )
1648 resp = await proxy_resp.start(conn)
1649 except BaseException:
1650 proxy_resp.close()
1651 conn.close()
1652 raise
1653 else:
1654 conn._protocol = None
1655 try:
1656 if resp.status != 200:
1657 message = resp.reason
1658 if message is None:
1659 message = HTTPStatus(resp.status).phrase
1660 raise ClientHttpProxyError(
1661 proxy_resp.request_info,
1662 resp.history,
1663 status=resp.status,
1664 message=message,
1665 headers=resp.headers,
1666 )
1667 if not runtime_has_start_tls:
1668 rawsock = transport.get_extra_info("socket", default=None)
1669 if rawsock is None:
1670 raise RuntimeError(
1671 "Transport does not expose socket instance"
1672 )
1673 # Duplicate the socket, so now we can close proxy transport
1674 rawsock = rawsock.dup()
1675 except BaseException:
1676 # It shouldn't be closed in `finally` because it's fed to
1677 # `loop.start_tls()` and the docs say not to touch it after
1678 # passing there.
1679 transport.close()
1680 raise
1681 finally:
1682 if not runtime_has_start_tls:
1683 transport.close()
1684
1685 if not runtime_has_start_tls:
1686 # HTTP proxy with support for upgrade to HTTPS
1687 sslcontext = self._get_ssl_context(req)
1688 return await self._wrap_existing_connection(
1689 self._factory,
1690 timeout=timeout,
1691 ssl=sslcontext,
1692 sock=rawsock,
1693 server_hostname=req.host,
1694 req=req,
1695 )
1696
1697 return await self._start_tls_connection(
1698 # Access the old transport for the last time before it's
1699 # closed and forgotten forever:
1700 transport,
1701 req=req,
1702 timeout=timeout,
1703 )
1704 finally:
1705 proxy_resp.close()
1706
1707 return transport, proto
1708
1709
1710class UnixConnector(BaseConnector):
1711 """Unix socket connector.
1712
1713 path - Unix socket path.
1714 keepalive_timeout - (optional) Keep-alive timeout.
1715 force_close - Set to True to force close and do reconnect
1716 after each request (and between redirects).
1717 limit - The total number of simultaneous connections.
1718 limit_per_host - Number of simultaneous connections to one host.
1719 loop - Optional event loop.
1720 """
1721
1722 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"})
1723
1724 def __init__(
1725 self,
1726 path: str,
1727 force_close: bool = False,
1728 keepalive_timeout: Union[object, float, None] = sentinel,
1729 limit: int = 100,
1730 limit_per_host: int = 0,
1731 loop: Optional[asyncio.AbstractEventLoop] = None,
1732 ) -> None:
1733 super().__init__(
1734 force_close=force_close,
1735 keepalive_timeout=keepalive_timeout,
1736 limit=limit,
1737 limit_per_host=limit_per_host,
1738 loop=loop,
1739 )
1740 self._path = path
1741
1742 @property
1743 def path(self) -> str:
1744 """Path to unix socket."""
1745 return self._path
1746
1747 async def _create_connection(
1748 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1749 ) -> ResponseHandler:
1750 try:
1751 async with ceil_timeout(
1752 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1753 ):
1754 _, proto = await self._loop.create_unix_connection(
1755 self._factory, self._path
1756 )
1757 except OSError as exc:
1758 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1759 raise
1760 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
1761
1762 return proto
1763
1764
1765class NamedPipeConnector(BaseConnector):
1766 """Named pipe connector.
1767
1768 Only supported by the proactor event loop.
1769 See also: https://docs.python.org/3/library/asyncio-eventloop.html
1770
1771 path - Windows named pipe path.
1772 keepalive_timeout - (optional) Keep-alive timeout.
1773 force_close - Set to True to force close and do reconnect
1774 after each request (and between redirects).
1775 limit - The total number of simultaneous connections.
1776 limit_per_host - Number of simultaneous connections to one host.
1777 loop - Optional event loop.
1778 """
1779
1780 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"})
1781
1782 def __init__(
1783 self,
1784 path: str,
1785 force_close: bool = False,
1786 keepalive_timeout: Union[object, float, None] = sentinel,
1787 limit: int = 100,
1788 limit_per_host: int = 0,
1789 loop: Optional[asyncio.AbstractEventLoop] = None,
1790 ) -> None:
1791 super().__init__(
1792 force_close=force_close,
1793 keepalive_timeout=keepalive_timeout,
1794 limit=limit,
1795 limit_per_host=limit_per_host,
1796 loop=loop,
1797 )
1798 if not isinstance(
1799 self._loop,
1800 asyncio.ProactorEventLoop, # type: ignore[attr-defined]
1801 ):
1802 raise RuntimeError(
1803 "Named Pipes only available in proactor loop under windows"
1804 )
1805 self._path = path
1806
1807 @property
1808 def path(self) -> str:
1809 """Path to the named pipe."""
1810 return self._path
1811
1812 async def _create_connection(
1813 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1814 ) -> ResponseHandler:
1815 try:
1816 async with ceil_timeout(
1817 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1818 ):
1819 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined]
1820 self._factory, self._path
1821 )
1822 # the drain is required so that the connection_made is called
1823 # and transport is set otherwise it is not set before the
1824 # `assert conn.transport is not None`
1825 # in client.py's _request method
1826 await asyncio.sleep(0)
1827 # other option is to manually set transport like
1828 # `proto.transport = trans`
1829 except OSError as exc:
1830 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1831 raise
1832 raise ClientConnectorError(req.connection_key, exc) from exc
1833
1834 return cast(ResponseHandler, proto)