Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/connector.py: 17%
701 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
1import asyncio
2import dataclasses
3import functools
4import logging
5import random
6import socket
7import sys
8import traceback
9import warnings
10from collections import defaultdict, deque
11from contextlib import suppress
12from http import HTTPStatus
13from http.cookies import SimpleCookie
14from itertools import cycle, islice
15from time import monotonic
16from types import TracebackType
17from typing import ( # noqa
18 TYPE_CHECKING,
19 Any,
20 Awaitable,
21 Callable,
22 DefaultDict,
23 Dict,
24 Iterator,
25 List,
26 Literal,
27 Optional,
28 Set,
29 Tuple,
30 Type,
31 Union,
32 cast,
33)
35import aiohappyeyeballs
37from . import hdrs, helpers
38from .abc import AbstractResolver
39from .client_exceptions import (
40 ClientConnectionError,
41 ClientConnectorCertificateError,
42 ClientConnectorError,
43 ClientConnectorSSLError,
44 ClientHttpProxyError,
45 ClientProxyConnectionError,
46 ServerFingerprintMismatch,
47 UnixClientConnectorError,
48 cert_errors,
49 ssl_errors,
50)
51from .client_proto import ResponseHandler
52from .client_reqrep import SSL_ALLOWED_TYPES, ClientRequest, Fingerprint
53from .helpers import _SENTINEL, ceil_timeout, is_ip_address, sentinel, set_result
54from .locks import EventResultOrError
55from .resolver import DefaultResolver
57try:
58 import ssl
60 SSLContext = ssl.SSLContext
61except ImportError: # pragma: no cover
62 ssl = None # type: ignore[assignment]
63 SSLContext = object # type: ignore[misc,assignment]
66__all__ = ("BaseConnector", "TCPConnector", "UnixConnector", "NamedPipeConnector")
69if TYPE_CHECKING:
70 from .client import ClientTimeout
71 from .client_reqrep import ConnectionKey
72 from .tracing import Trace
75class Connection:
76 _source_traceback = None
77 _transport = None
79 def __init__(
80 self,
81 connector: "BaseConnector",
82 key: "ConnectionKey",
83 protocol: ResponseHandler,
84 loop: asyncio.AbstractEventLoop,
85 ) -> None:
86 self._key = key
87 self._connector = connector
88 self._loop = loop
89 self._protocol: Optional[ResponseHandler] = protocol
90 self._callbacks: List[Callable[[], None]] = []
92 if loop.get_debug():
93 self._source_traceback = traceback.extract_stack(sys._getframe(1))
95 def __repr__(self) -> str:
96 return f"Connection<{self._key}>"
98 def __del__(self, _warnings: Any = warnings) -> None:
99 if self._protocol is not None:
100 _warnings.warn(
101 f"Unclosed connection {self!r}", ResourceWarning, source=self
102 )
103 if self._loop.is_closed():
104 return
106 self._connector._release(self._key, self._protocol, should_close=True)
108 context = {"client_connection": self, "message": "Unclosed connection"}
109 if self._source_traceback is not None:
110 context["source_traceback"] = self._source_traceback
111 self._loop.call_exception_handler(context)
113 def __bool__(self) -> Literal[True]:
114 """Force subclasses to not be falsy, to make checks simpler."""
115 return True
117 @property
118 def transport(self) -> Optional[asyncio.Transport]:
119 if self._protocol is None:
120 return None
121 return self._protocol.transport
123 @property
124 def protocol(self) -> Optional[ResponseHandler]:
125 return self._protocol
127 def add_callback(self, callback: Callable[[], None]) -> None:
128 if callback is not None:
129 self._callbacks.append(callback)
131 def _notify_release(self) -> None:
132 callbacks, self._callbacks = self._callbacks[:], []
134 for cb in callbacks:
135 with suppress(Exception):
136 cb()
138 def close(self) -> None:
139 self._notify_release()
141 if self._protocol is not None:
142 self._connector._release(self._key, self._protocol, should_close=True)
143 self._protocol = None
145 def release(self) -> None:
146 self._notify_release()
148 if self._protocol is not None:
149 self._connector._release(
150 self._key, self._protocol, should_close=self._protocol.should_close
151 )
152 self._protocol = None
154 @property
155 def closed(self) -> bool:
156 return self._protocol is None or not self._protocol.is_connected()
159class _TransportPlaceholder:
160 """placeholder for BaseConnector.connect function"""
162 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
163 fut = loop.create_future()
164 fut.set_result(None)
165 self.closed: asyncio.Future[Optional[Exception]] = fut
167 def close(self) -> None:
168 pass
171class BaseConnector:
172 """Base connector class.
174 keepalive_timeout - (optional) Keep-alive timeout.
175 force_close - Set to True to force close and do reconnect
176 after each request (and between redirects).
177 limit - The total number of simultaneous connections.
178 limit_per_host - Number of simultaneous connections to one host.
179 enable_cleanup_closed - Enables clean-up closed ssl transports.
180 Disabled by default.
181 timeout_ceil_threshold - Trigger ceiling of timeout values when
182 it's above timeout_ceil_threshold.
183 loop - Optional event loop.
184 """
186 _closed = True # prevent AttributeError in __del__ if ctor was failed
187 _source_traceback = None
189 # abort transport after 2 seconds (cleanup broken connections)
190 _cleanup_closed_period = 2.0
192 def __init__(
193 self,
194 *,
195 keepalive_timeout: Union[_SENTINEL, None, float] = sentinel,
196 force_close: bool = False,
197 limit: int = 100,
198 limit_per_host: int = 0,
199 enable_cleanup_closed: bool = False,
200 timeout_ceil_threshold: float = 5,
201 ) -> None:
202 if force_close:
203 if keepalive_timeout is not None and keepalive_timeout is not sentinel:
204 raise ValueError(
205 "keepalive_timeout cannot " "be set if force_close is True"
206 )
207 else:
208 if keepalive_timeout is sentinel:
209 keepalive_timeout = 15.0
211 self._timeout_ceil_threshold = timeout_ceil_threshold
213 loop = asyncio.get_running_loop()
215 self._closed = False
216 if loop.get_debug():
217 self._source_traceback = traceback.extract_stack(sys._getframe(1))
219 self._conns: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {}
220 self._limit = limit
221 self._limit_per_host = limit_per_host
222 self._acquired: Set[ResponseHandler] = set()
223 self._acquired_per_host: DefaultDict[
224 ConnectionKey, Set[ResponseHandler]
225 ] = defaultdict(set)
226 self._keepalive_timeout = cast(float, keepalive_timeout)
227 self._force_close = force_close
229 # {host_key: FIFO list of waiters}
230 self._waiters = defaultdict(deque) # type: ignore[var-annotated]
232 self._loop = loop
233 self._factory = functools.partial(ResponseHandler, loop=loop)
235 self.cookies = SimpleCookie()
237 # start keep-alive connection cleanup task
238 self._cleanup_handle: Optional[asyncio.TimerHandle] = None
240 # start cleanup closed transports task
241 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None
242 self._cleanup_closed_disabled = not enable_cleanup_closed
243 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = []
244 self._cleanup_closed()
246 def __del__(self, _warnings: Any = warnings) -> None:
247 if self._closed:
248 return
249 if not self._conns:
250 return
252 conns = [repr(c) for c in self._conns.values()]
254 self._close_immediately()
256 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, source=self)
257 context = {
258 "connector": self,
259 "connections": conns,
260 "message": "Unclosed connector",
261 }
262 if self._source_traceback is not None:
263 context["source_traceback"] = self._source_traceback
264 self._loop.call_exception_handler(context)
266 async def __aenter__(self) -> "BaseConnector":
267 return self
269 async def __aexit__(
270 self,
271 exc_type: Optional[Type[BaseException]] = None,
272 exc_value: Optional[BaseException] = None,
273 exc_traceback: Optional[TracebackType] = None,
274 ) -> None:
275 await self.close()
277 @property
278 def force_close(self) -> bool:
279 """Ultimately close connection on releasing if True."""
280 return self._force_close
282 @property
283 def limit(self) -> int:
284 """The total number for simultaneous connections.
286 If limit is 0 the connector has no limit.
287 The default limit size is 100.
288 """
289 return self._limit
291 @property
292 def limit_per_host(self) -> int:
293 """The limit for simultaneous connections to the same endpoint.
295 Endpoints are the same if they are have equal
296 (host, port, is_ssl) triple.
297 """
298 return self._limit_per_host
300 def _cleanup(self) -> None:
301 """Cleanup unused transports."""
302 if self._cleanup_handle:
303 self._cleanup_handle.cancel()
304 # _cleanup_handle should be unset, otherwise _release() will not
305 # recreate it ever!
306 self._cleanup_handle = None
308 now = self._loop.time()
309 timeout = self._keepalive_timeout
311 if self._conns:
312 connections = {}
313 deadline = now - timeout
314 for key, conns in self._conns.items():
315 alive = []
316 for proto, use_time in conns:
317 if proto.is_connected():
318 if use_time - deadline < 0:
319 transport = proto.transport
320 proto.close()
321 if key.is_ssl and not self._cleanup_closed_disabled:
322 self._cleanup_closed_transports.append(transport)
323 else:
324 alive.append((proto, use_time))
325 else:
326 transport = proto.transport
327 proto.close()
328 if key.is_ssl and not self._cleanup_closed_disabled:
329 self._cleanup_closed_transports.append(transport)
331 if alive:
332 connections[key] = alive
334 self._conns = connections
336 if self._conns:
337 self._cleanup_handle = helpers.weakref_handle(
338 self,
339 "_cleanup",
340 timeout,
341 self._loop,
342 timeout_ceil_threshold=self._timeout_ceil_threshold,
343 )
345 def _drop_acquired_per_host(
346 self, key: "ConnectionKey", val: ResponseHandler
347 ) -> None:
348 acquired_per_host = self._acquired_per_host
349 if key not in acquired_per_host:
350 return
351 conns = acquired_per_host[key]
352 conns.remove(val)
353 if not conns:
354 del self._acquired_per_host[key]
356 def _cleanup_closed(self) -> None:
357 """Double confirmation for transport close.
359 Some broken ssl servers may leave socket open without proper close.
360 """
361 if self._cleanup_closed_handle:
362 self._cleanup_closed_handle.cancel()
364 for transport in self._cleanup_closed_transports:
365 if transport is not None:
366 transport.abort()
368 self._cleanup_closed_transports = []
370 if not self._cleanup_closed_disabled:
371 self._cleanup_closed_handle = helpers.weakref_handle(
372 self,
373 "_cleanup_closed",
374 self._cleanup_closed_period,
375 self._loop,
376 timeout_ceil_threshold=self._timeout_ceil_threshold,
377 )
379 async def close(self) -> None:
380 """Close all opened transports."""
381 waiters = self._close_immediately()
382 if waiters:
383 results = await asyncio.gather(*waiters, return_exceptions=True)
384 for res in results:
385 if isinstance(res, Exception):
386 err_msg = "Error while closing connector: " + repr(res)
387 logging.error(err_msg)
389 def _close_immediately(self) -> List["asyncio.Future[None]"]:
390 waiters: List["asyncio.Future[None]"] = []
392 if self._closed:
393 return waiters
395 self._closed = True
397 try:
398 if self._loop.is_closed():
399 return waiters
401 # cancel cleanup task
402 if self._cleanup_handle:
403 self._cleanup_handle.cancel()
405 # cancel cleanup close task
406 if self._cleanup_closed_handle:
407 self._cleanup_closed_handle.cancel()
409 for data in self._conns.values():
410 for proto, t0 in data:
411 proto.close()
412 waiters.append(proto.closed)
414 for proto in self._acquired:
415 proto.close()
416 waiters.append(proto.closed)
418 # TODO (A.Yushovskiy, 24-May-2019) collect transp. closing futures
419 for transport in self._cleanup_closed_transports:
420 if transport is not None:
421 transport.abort()
423 return waiters
425 finally:
426 self._conns.clear()
427 self._acquired.clear()
428 self._waiters.clear()
429 self._cleanup_handle = None
430 self._cleanup_closed_transports.clear()
431 self._cleanup_closed_handle = None
433 @property
434 def closed(self) -> bool:
435 """Is connector closed.
437 A readonly property.
438 """
439 return self._closed
441 def _available_connections(self, key: "ConnectionKey") -> int:
442 """
443 Return number of available connections.
445 The limit, limit_per_host and the connection key are taken into account.
447 If it returns less than 1 means that there are no connections
448 available.
449 """
450 if self._limit:
451 # total calc available connections
452 available = self._limit - len(self._acquired)
454 # check limit per host
455 if (
456 self._limit_per_host
457 and available > 0
458 and key in self._acquired_per_host
459 ):
460 acquired = self._acquired_per_host.get(key)
461 assert acquired is not None
462 available = self._limit_per_host - len(acquired)
464 elif self._limit_per_host and key in self._acquired_per_host:
465 # check limit per host
466 acquired = self._acquired_per_host.get(key)
467 assert acquired is not None
468 available = self._limit_per_host - len(acquired)
469 else:
470 available = 1
472 return available
474 async def connect(
475 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
476 ) -> Connection:
477 """Get from pool or create new connection."""
478 key = req.connection_key
479 available = self._available_connections(key)
481 # Wait if there are no available connections or if there are/were
482 # waiters (i.e. don't steal connection from a waiter about to wake up)
483 if available <= 0 or key in self._waiters:
484 fut = self._loop.create_future()
486 # This connection will now count towards the limit.
487 self._waiters[key].append(fut)
489 if traces:
490 for trace in traces:
491 await trace.send_connection_queued_start()
493 try:
494 await fut
495 except BaseException as e:
496 if key in self._waiters:
497 # remove a waiter even if it was cancelled, normally it's
498 # removed when it's notified
499 try:
500 self._waiters[key].remove(fut)
501 except ValueError: # fut may no longer be in list
502 pass
504 raise e
505 finally:
506 if key in self._waiters and not self._waiters[key]:
507 del self._waiters[key]
509 if traces:
510 for trace in traces:
511 await trace.send_connection_queued_end()
513 proto = self._get(key)
514 if proto is None:
515 placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop))
516 self._acquired.add(placeholder)
517 self._acquired_per_host[key].add(placeholder)
519 if traces:
520 for trace in traces:
521 await trace.send_connection_create_start()
523 try:
524 proto = await self._create_connection(req, traces, timeout)
525 if self._closed:
526 proto.close()
527 raise ClientConnectionError("Connector is closed.")
528 except BaseException:
529 if not self._closed:
530 self._acquired.remove(placeholder)
531 self._drop_acquired_per_host(key, placeholder)
532 self._release_waiter()
533 raise
534 else:
535 if not self._closed:
536 self._acquired.remove(placeholder)
537 self._drop_acquired_per_host(key, placeholder)
539 if traces:
540 for trace in traces:
541 await trace.send_connection_create_end()
542 else:
543 if traces:
544 # Acquire the connection to prevent race conditions with limits
545 placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop))
546 self._acquired.add(placeholder)
547 self._acquired_per_host[key].add(placeholder)
548 for trace in traces:
549 await trace.send_connection_reuseconn()
550 self._acquired.remove(placeholder)
551 self._drop_acquired_per_host(key, placeholder)
553 self._acquired.add(proto)
554 self._acquired_per_host[key].add(proto)
555 return Connection(self, key, proto, self._loop)
557 def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]:
558 try:
559 conns = self._conns[key]
560 except KeyError:
561 return None
563 t1 = self._loop.time()
564 while conns:
565 proto, t0 = conns.pop()
566 if proto.is_connected():
567 if t1 - t0 > self._keepalive_timeout:
568 transport = proto.transport
569 proto.close()
570 # only for SSL transports
571 if key.is_ssl and not self._cleanup_closed_disabled:
572 self._cleanup_closed_transports.append(transport)
573 else:
574 if not conns:
575 # The very last connection was reclaimed: drop the key
576 del self._conns[key]
577 return proto
578 else:
579 transport = proto.transport
580 proto.close()
581 if key.is_ssl and not self._cleanup_closed_disabled:
582 self._cleanup_closed_transports.append(transport)
584 # No more connections: drop the key
585 del self._conns[key]
586 return None
588 def _release_waiter(self) -> None:
589 """
590 Iterates over all waiters until one to be released is found.
592 The one to be released is not finished and
593 belongs to a host that has available connections.
594 """
595 if not self._waiters:
596 return
598 # Having the dict keys ordered this avoids to iterate
599 # at the same order at each call.
600 queues = list(self._waiters.keys())
601 random.shuffle(queues)
603 for key in queues:
604 if self._available_connections(key) < 1:
605 continue
607 waiters = self._waiters[key]
608 while waiters:
609 waiter = waiters.popleft()
610 if not waiter.done():
611 waiter.set_result(None)
612 return
614 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
615 if self._closed:
616 # acquired connection is already released on connector closing
617 return
619 try:
620 self._acquired.remove(proto)
621 self._drop_acquired_per_host(key, proto)
622 except KeyError: # pragma: no cover
623 # this may be result of undetermenistic order of objects
624 # finalization due garbage collection.
625 pass
626 else:
627 self._release_waiter()
629 def _release(
630 self,
631 key: "ConnectionKey",
632 protocol: ResponseHandler,
633 *,
634 should_close: bool = False,
635 ) -> None:
636 if self._closed:
637 # acquired connection is already released on connector closing
638 return
640 self._release_acquired(key, protocol)
642 if self._force_close:
643 should_close = True
645 if should_close or protocol.should_close:
646 transport = protocol.transport
647 protocol.close()
648 # TODO: Remove once fixed: https://bugs.python.org/issue39951
649 # See PR #6321
650 set_result(protocol.closed, None)
652 if key.is_ssl and not self._cleanup_closed_disabled:
653 self._cleanup_closed_transports.append(transport)
654 else:
655 conns = self._conns.get(key)
656 if conns is None:
657 conns = self._conns[key] = []
658 conns.append((protocol, self._loop.time()))
660 if self._cleanup_handle is None:
661 self._cleanup_handle = helpers.weakref_handle(
662 self,
663 "_cleanup",
664 self._keepalive_timeout,
665 self._loop,
666 timeout_ceil_threshold=self._timeout_ceil_threshold,
667 )
669 async def _create_connection(
670 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
671 ) -> ResponseHandler:
672 raise NotImplementedError()
675class _DNSCacheTable:
676 def __init__(self, ttl: Optional[float] = None) -> None:
677 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[Dict[str, Any]], int]] = {}
678 self._timestamps: Dict[Tuple[str, int], float] = {}
679 self._ttl = ttl
681 def __contains__(self, host: object) -> bool:
682 return host in self._addrs_rr
684 def add(self, key: Tuple[str, int], addrs: List[Dict[str, Any]]) -> None:
685 self._addrs_rr[key] = (cycle(addrs), len(addrs))
687 if self._ttl is not None:
688 self._timestamps[key] = monotonic()
690 def remove(self, key: Tuple[str, int]) -> None:
691 self._addrs_rr.pop(key, None)
693 if self._ttl is not None:
694 self._timestamps.pop(key, None)
696 def clear(self) -> None:
697 self._addrs_rr.clear()
698 self._timestamps.clear()
700 def next_addrs(self, key: Tuple[str, int]) -> List[Dict[str, Any]]:
701 loop, length = self._addrs_rr[key]
702 addrs = list(islice(loop, length))
703 # Consume one more element to shift internal state of `cycle`
704 next(loop)
705 return addrs
707 def expired(self, key: Tuple[str, int]) -> bool:
708 if self._ttl is None:
709 return False
711 return self._timestamps[key] + self._ttl < monotonic()
714class TCPConnector(BaseConnector):
715 """TCP connector.
717 verify_ssl - Set to True to check ssl certifications.
718 fingerprint - Pass the binary sha256
719 digest of the expected certificate in DER format to verify
720 that the certificate the server presents matches. See also
721 https://en.wikipedia.org/wiki/Transport_Layer_Security#Certificate_pinning
722 resolver - Enable DNS lookups and use this
723 resolver
724 use_dns_cache - Use memory cache for DNS lookups.
725 ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
726 family - socket address family
727 local_addr - local tuple of (host, port) to bind socket to
729 keepalive_timeout - (optional) Keep-alive timeout.
730 force_close - Set to True to force close and do reconnect
731 after each request (and between redirects).
732 limit - The total number of simultaneous connections.
733 limit_per_host - Number of simultaneous connections to one host.
734 enable_cleanup_closed - Enables clean-up closed ssl transports.
735 Disabled by default.
736 happy_eyeballs_delay - This is the “Connection Attempt Delay”
737 as defined in RFC 8305. To disable
738 the happy eyeballs algorithm, set to None.
739 interleave - “First Address Family Count” as defined in RFC 8305
740 loop - Optional event loop.
741 """
743 def __init__(
744 self,
745 *,
746 use_dns_cache: bool = True,
747 ttl_dns_cache: Optional[int] = 10,
748 family: int = 0,
749 ssl: Union[bool, Fingerprint, SSLContext] = True,
750 local_addr: Optional[Tuple[str, int]] = None,
751 resolver: Optional[AbstractResolver] = None,
752 keepalive_timeout: Union[None, float, _SENTINEL] = sentinel,
753 force_close: bool = False,
754 limit: int = 100,
755 limit_per_host: int = 0,
756 enable_cleanup_closed: bool = False,
757 timeout_ceil_threshold: float = 5,
758 happy_eyeballs_delay: Optional[float] = 0.25,
759 interleave: Optional[int] = None,
760 ) -> None:
761 super().__init__(
762 keepalive_timeout=keepalive_timeout,
763 force_close=force_close,
764 limit=limit,
765 limit_per_host=limit_per_host,
766 enable_cleanup_closed=enable_cleanup_closed,
767 timeout_ceil_threshold=timeout_ceil_threshold,
768 )
770 if not isinstance(ssl, SSL_ALLOWED_TYPES):
771 raise TypeError(
772 "ssl should be SSLContext, Fingerprint, or bool, "
773 "got {!r} instead.".format(ssl)
774 )
775 self._ssl = ssl
776 if resolver is None:
777 resolver = DefaultResolver()
778 self._resolver: AbstractResolver = resolver
780 self._use_dns_cache = use_dns_cache
781 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)
782 self._throttle_dns_events: Dict[Tuple[str, int], EventResultOrError] = {}
783 self._family = family
784 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr)
785 self._happy_eyeballs_delay = happy_eyeballs_delay
786 self._interleave = interleave
788 def _close_immediately(self) -> List["asyncio.Future[None]"]:
789 for ev in self._throttle_dns_events.values():
790 ev.cancel()
791 return super()._close_immediately()
793 @property
794 def family(self) -> int:
795 """Socket family like AF_INET."""
796 return self._family
798 @property
799 def use_dns_cache(self) -> bool:
800 """True if local DNS caching is enabled."""
801 return self._use_dns_cache
803 def clear_dns_cache(
804 self, host: Optional[str] = None, port: Optional[int] = None
805 ) -> None:
806 """Remove specified host/port or clear all dns local cache."""
807 if host is not None and port is not None:
808 self._cached_hosts.remove((host, port))
809 elif host is not None or port is not None:
810 raise ValueError("either both host and port " "or none of them are allowed")
811 else:
812 self._cached_hosts.clear()
814 async def _resolve_host(
815 self, host: str, port: int, traces: Optional[List["Trace"]] = None
816 ) -> List[Dict[str, Any]]:
817 if is_ip_address(host):
818 return [
819 {
820 "hostname": host,
821 "host": host,
822 "port": port,
823 "family": self._family,
824 "proto": 0,
825 "flags": 0,
826 }
827 ]
829 if not self._use_dns_cache:
830 if traces:
831 for trace in traces:
832 await trace.send_dns_resolvehost_start(host)
834 res = await self._resolver.resolve(host, port, family=self._family)
836 if traces:
837 for trace in traces:
838 await trace.send_dns_resolvehost_end(host)
840 return res
842 key = (host, port)
844 if (key in self._cached_hosts) and (not self._cached_hosts.expired(key)):
845 # get result early, before any await (#4014)
846 result = self._cached_hosts.next_addrs(key)
848 if traces:
849 for trace in traces:
850 await trace.send_dns_cache_hit(host)
851 return result
853 if key in self._throttle_dns_events:
854 # get event early, before any await (#4014)
855 event = self._throttle_dns_events[key]
856 if traces:
857 for trace in traces:
858 await trace.send_dns_cache_hit(host)
859 await event.wait()
860 else:
861 # update dict early, before any await (#4014)
862 self._throttle_dns_events[key] = EventResultOrError(self._loop)
863 if traces:
864 for trace in traces:
865 await trace.send_dns_cache_miss(host)
866 try:
867 if traces:
868 for trace in traces:
869 await trace.send_dns_resolvehost_start(host)
871 addrs = await self._resolver.resolve(host, port, family=self._family)
872 if traces:
873 for trace in traces:
874 await trace.send_dns_resolvehost_end(host)
876 self._cached_hosts.add(key, addrs)
877 self._throttle_dns_events[key].set()
878 except BaseException as e:
879 # any DNS exception, independently of the implementation
880 # is set for the waiters to raise the same exception.
881 self._throttle_dns_events[key].set(exc=e)
882 raise
883 finally:
884 self._throttle_dns_events.pop(key)
886 return self._cached_hosts.next_addrs(key)
888 async def _create_connection(
889 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
890 ) -> ResponseHandler:
891 """Create connection.
893 Has same keyword arguments as BaseEventLoop.create_connection.
894 """
895 if req.proxy:
896 _, proto = await self._create_proxy_connection(req, traces, timeout)
897 else:
898 _, proto = await self._create_direct_connection(req, traces, timeout)
900 return proto
902 @staticmethod
903 @functools.lru_cache(None)
904 def _make_ssl_context(verified: bool) -> SSLContext:
905 if verified:
906 return ssl.create_default_context()
907 else:
908 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
909 sslcontext.options |= ssl.OP_NO_SSLv2
910 sslcontext.options |= ssl.OP_NO_SSLv3
911 sslcontext.check_hostname = False
912 sslcontext.verify_mode = ssl.CERT_NONE
913 try:
914 sslcontext.options |= ssl.OP_NO_COMPRESSION
915 except AttributeError as attr_err:
916 warnings.warn(
917 "{!s}: The Python interpreter is compiled "
918 "against OpenSSL < 1.0.0. Ref: "
919 "https://docs.python.org/3/library/ssl.html"
920 "#ssl.OP_NO_COMPRESSION".format(attr_err),
921 )
922 sslcontext.set_default_verify_paths()
923 return sslcontext
925 def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]:
926 """Logic to get the correct SSL context
928 0. if req.ssl is false, return None
930 1. if ssl_context is specified in req, use it
931 2. if _ssl_context is specified in self, use it
932 3. otherwise:
933 1. if verify_ssl is not specified in req, use self.ssl_context
934 (will generate a default context according to self.verify_ssl)
935 2. if verify_ssl is True in req, generate a default SSL context
936 3. if verify_ssl is False in req, generate a SSL context that
937 won't verify
938 """
939 if req.is_ssl():
940 if ssl is None: # pragma: no cover
941 raise RuntimeError("SSL is not supported.")
942 sslcontext = req.ssl
943 if isinstance(sslcontext, ssl.SSLContext):
944 return sslcontext
945 if sslcontext is not True:
946 # not verified or fingerprinted
947 return self._make_ssl_context(False)
948 sslcontext = self._ssl
949 if isinstance(sslcontext, ssl.SSLContext):
950 return sslcontext
951 if sslcontext is not True:
952 # not verified or fingerprinted
953 return self._make_ssl_context(False)
954 return self._make_ssl_context(True)
955 else:
956 return None
958 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]:
959 ret = req.ssl
960 if isinstance(ret, Fingerprint):
961 return ret
962 ret = self._ssl
963 if isinstance(ret, Fingerprint):
964 return ret
965 return None
967 async def _wrap_create_connection(
968 self,
969 *args: Any,
970 addr_infos: List[aiohappyeyeballs.AddrInfoType],
971 req: ClientRequest,
972 timeout: "ClientTimeout",
973 client_error: Type[Exception] = ClientConnectorError,
974 **kwargs: Any,
975 ) -> Tuple[asyncio.Transport, ResponseHandler]:
976 try:
977 async with ceil_timeout(
978 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
979 ):
980 sock = await aiohappyeyeballs.start_connection(
981 addr_infos=addr_infos,
982 local_addr_infos=self._local_addr_infos,
983 happy_eyeballs_delay=self._happy_eyeballs_delay,
984 interleave=self._interleave,
985 loop=self._loop,
986 )
987 return await self._loop.create_connection(*args, **kwargs, sock=sock)
988 except cert_errors as exc:
989 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
990 except ssl_errors as exc:
991 raise ClientConnectorSSLError(req.connection_key, exc) from exc
992 except OSError as exc:
993 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
994 raise
995 raise client_error(req.connection_key, exc) from exc
997 def _warn_about_tls_in_tls(
998 self,
999 underlying_transport: asyncio.Transport,
1000 req: ClientRequest,
1001 ) -> None:
1002 """Issue a warning if the requested URL has HTTPS scheme."""
1003 if req.request_info.url.scheme != "https":
1004 return
1006 asyncio_supports_tls_in_tls = getattr(
1007 underlying_transport,
1008 "_start_tls_compatible",
1009 False,
1010 )
1012 if asyncio_supports_tls_in_tls:
1013 return
1015 warnings.warn(
1016 "An HTTPS request is being sent through an HTTPS proxy. "
1017 "This support for TLS in TLS is known to be disabled "
1018 "in the stdlib asyncio. This is why you'll probably see "
1019 "an error in the log below.\n\n"
1020 "It is possible to enable it via monkeypatching. "
1021 "For more details, see:\n"
1022 "* https://bugs.python.org/issue37179\n"
1023 "* https://github.com/python/cpython/pull/28073\n\n"
1024 "You can temporarily patch this as follows:\n"
1025 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
1026 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1027 RuntimeWarning,
1028 source=self,
1029 # Why `4`? At least 3 of the calls in the stack originate
1030 # from the methods in this class.
1031 stacklevel=3,
1032 )
1034 async def _start_tls_connection(
1035 self,
1036 underlying_transport: asyncio.Transport,
1037 req: ClientRequest,
1038 timeout: "ClientTimeout",
1039 client_error: Type[Exception] = ClientConnectorError,
1040 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1041 """Wrap the raw TCP transport with TLS."""
1042 tls_proto = self._factory() # Create a brand new proto for TLS
1044 # Safety of the `cast()` call here is based on the fact that
1045 # internally `_get_ssl_context()` only returns `None` when
1046 # `req.is_ssl()` evaluates to `False` which is never gonna happen
1047 # in this code path. Of course, it's rather fragile
1048 # maintainability-wise but this is to be solved separately.
1049 sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req))
1051 try:
1052 async with ceil_timeout(
1053 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1054 ):
1055 try:
1056 tls_transport = await self._loop.start_tls(
1057 underlying_transport,
1058 tls_proto,
1059 sslcontext,
1060 server_hostname=req.server_hostname or req.host,
1061 ssl_handshake_timeout=timeout.total,
1062 )
1063 except BaseException:
1064 # We need to close the underlying transport since
1065 # `start_tls()` probably failed before it had a
1066 # chance to do this:
1067 underlying_transport.close()
1068 raise
1069 except cert_errors as exc:
1070 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1071 except ssl_errors as exc:
1072 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1073 except OSError as exc:
1074 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1075 raise
1076 raise client_error(req.connection_key, exc) from exc
1077 except TypeError as type_err:
1078 # Example cause looks like this:
1079 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
1080 # object at 0x7f760615e460> is not supported by start_tls()
1082 raise ClientConnectionError(
1083 "Cannot initialize a TLS-in-TLS connection to host "
1084 f"{req.host!s}:{req.port:d} through an underlying connection "
1085 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
1086 f"[{type_err!s}]"
1087 ) from type_err
1088 else:
1089 if tls_transport is None:
1090 msg = "Failed to start TLS (possibly caused by closing transport)"
1091 raise client_error(req.connection_key, OSError(msg))
1092 tls_proto.connection_made(
1093 tls_transport
1094 ) # Kick the state machine of the new TLS protocol
1096 return tls_transport, tls_proto
1098 def _convert_hosts_to_addr_infos(
1099 self, hosts: List[Dict[str, Any]]
1100 ) -> List[aiohappyeyeballs.AddrInfoType]:
1101 """Converts the list of hosts to a list of addr_infos.
1103 The list of hosts is the result of a DNS lookup. The list of
1104 addr_infos is the result of a call to `socket.getaddrinfo()`.
1105 """
1106 addr_infos: List[aiohappyeyeballs.AddrInfoType] = []
1107 for hinfo in hosts:
1108 host = hinfo["host"]
1109 is_ipv6 = ":" in host
1110 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET
1111 if self._family and self._family != family:
1112 continue
1113 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"])
1114 addr_infos.append(
1115 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)
1116 )
1117 return addr_infos
1119 async def _create_direct_connection(
1120 self,
1121 req: ClientRequest,
1122 traces: List["Trace"],
1123 timeout: "ClientTimeout",
1124 *,
1125 client_error: Type[Exception] = ClientConnectorError,
1126 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1127 sslcontext = self._get_ssl_context(req)
1128 fingerprint = self._get_fingerprint(req)
1130 host = req.url.raw_host
1131 assert host is not None
1132 # Replace multiple trailing dots with a single one.
1133 # A trailing dot is only present for fully-qualified domain names.
1134 # See https://github.com/aio-libs/aiohttp/pull/7364.
1135 if host.endswith(".."):
1136 host = host.rstrip(".") + "."
1137 port = req.port
1138 assert port is not None
1139 host_resolved = asyncio.ensure_future(
1140 self._resolve_host(host, port, traces=traces), loop=self._loop
1141 )
1142 try:
1143 # Cancelling this lookup should not cancel the underlying lookup
1144 # or else the cancel event will get broadcast to all the waiters
1145 # across all connections.
1146 hosts = await asyncio.shield(host_resolved)
1147 except asyncio.CancelledError:
1149 def drop_exception(fut: "asyncio.Future[List[Dict[str, Any]]]") -> None:
1150 with suppress(Exception, asyncio.CancelledError):
1151 fut.result()
1153 host_resolved.add_done_callback(drop_exception)
1154 raise
1155 except OSError as exc:
1156 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1157 raise
1158 # in case of proxy it is not ClientProxyConnectionError
1159 # it is problem of resolving proxy ip itself
1160 raise ClientConnectorError(req.connection_key, exc) from exc
1162 last_exc: Optional[Exception] = None
1163 addr_infos = self._convert_hosts_to_addr_infos(hosts)
1164 while addr_infos:
1165 # Strip trailing dots, certificates contain FQDN without dots.
1166 # See https://github.com/aio-libs/aiohttp/issues/3636
1167 server_hostname = (
1168 (req.server_hostname or host).rstrip(".") if sslcontext else None
1169 )
1171 try:
1172 transp, proto = await self._wrap_create_connection(
1173 self._factory,
1174 timeout=timeout,
1175 ssl=sslcontext,
1176 addr_infos=addr_infos,
1177 server_hostname=server_hostname,
1178 req=req,
1179 client_error=client_error,
1180 )
1181 except ClientConnectorError as exc:
1182 last_exc = exc
1183 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave)
1184 continue
1186 if req.is_ssl() and fingerprint:
1187 try:
1188 fingerprint.check(transp)
1189 except ServerFingerprintMismatch as exc:
1190 transp.close()
1191 if not self._cleanup_closed_disabled:
1192 self._cleanup_closed_transports.append(transp)
1193 last_exc = exc
1194 # Remove the bad peer from the list of addr_infos
1195 sock: socket.socket = transp.get_extra_info("socket")
1196 bad_peer = sock.getpeername()
1197 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer)
1198 continue
1200 return transp, proto
1201 assert last_exc is not None
1202 raise last_exc
1204 async def _create_proxy_connection(
1205 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1206 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1207 headers: Dict[str, str] = {}
1208 if req.proxy_headers is not None:
1209 headers = req.proxy_headers # type: ignore[assignment]
1210 headers[hdrs.HOST] = req.headers[hdrs.HOST]
1212 url = req.proxy
1213 assert url is not None
1214 proxy_req = ClientRequest(
1215 hdrs.METH_GET,
1216 url,
1217 headers=headers,
1218 auth=req.proxy_auth,
1219 loop=self._loop,
1220 ssl=req.ssl,
1221 )
1223 # create connection to proxy server
1224 transport, proto = await self._create_direct_connection(
1225 proxy_req, [], timeout, client_error=ClientProxyConnectionError
1226 )
1228 # Many HTTP proxies has buggy keepalive support. Let's not
1229 # reuse connection but close it after processing every
1230 # response.
1231 proto.force_close()
1233 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
1234 if auth is not None:
1235 if not req.is_ssl():
1236 req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1237 else:
1238 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1240 if req.is_ssl():
1241 self._warn_about_tls_in_tls(transport, req)
1243 # For HTTPS requests over HTTP proxy
1244 # we must notify proxy to tunnel connection
1245 # so we send CONNECT command:
1246 # CONNECT www.python.org:443 HTTP/1.1
1247 # Host: www.python.org
1248 #
1249 # next we must do TLS handshake and so on
1250 # to do this we must wrap raw socket into secure one
1251 # asyncio handles this perfectly
1252 proxy_req.method = hdrs.METH_CONNECT
1253 proxy_req.url = req.url
1254 key = dataclasses.replace(
1255 req.connection_key, proxy=None, proxy_auth=None, proxy_headers_hash=None
1256 )
1257 conn = Connection(self, key, proto, self._loop)
1258 proxy_resp = await proxy_req.send(conn)
1259 try:
1260 protocol = conn._protocol
1261 assert protocol is not None
1263 # read_until_eof=True will ensure the connection isn't closed
1264 # once the response is received and processed allowing
1265 # START_TLS to work on the connection below.
1266 protocol.set_response_params(
1267 read_until_eof=True,
1268 timeout_ceil_threshold=self._timeout_ceil_threshold,
1269 )
1270 resp = await proxy_resp.start(conn)
1271 except BaseException:
1272 proxy_resp.close()
1273 conn.close()
1274 raise
1275 else:
1276 conn._protocol = None
1277 conn._transport = None
1278 try:
1279 if resp.status != 200:
1280 message = resp.reason
1281 if message is None:
1282 message = HTTPStatus(resp.status).phrase
1283 raise ClientHttpProxyError(
1284 proxy_resp.request_info,
1285 resp.history,
1286 status=resp.status,
1287 message=message,
1288 headers=resp.headers,
1289 )
1290 except BaseException:
1291 # It shouldn't be closed in `finally` because it's fed to
1292 # `loop.start_tls()` and the docs say not to touch it after
1293 # passing there.
1294 transport.close()
1295 raise
1297 return await self._start_tls_connection(
1298 # Access the old transport for the last time before it's
1299 # closed and forgotten forever:
1300 transport,
1301 req=req,
1302 timeout=timeout,
1303 )
1304 finally:
1305 proxy_resp.close()
1307 return transport, proto
1310class UnixConnector(BaseConnector):
1311 """Unix socket connector.
1313 path - Unix socket path.
1314 keepalive_timeout - (optional) Keep-alive timeout.
1315 force_close - Set to True to force close and do reconnect
1316 after each request (and between redirects).
1317 limit - The total number of simultaneous connections.
1318 limit_per_host - Number of simultaneous connections to one host.
1319 loop - Optional event loop.
1320 """
1322 def __init__(
1323 self,
1324 path: str,
1325 force_close: bool = False,
1326 keepalive_timeout: Union[_SENTINEL, float, None] = sentinel,
1327 limit: int = 100,
1328 limit_per_host: int = 0,
1329 ) -> None:
1330 super().__init__(
1331 force_close=force_close,
1332 keepalive_timeout=keepalive_timeout,
1333 limit=limit,
1334 limit_per_host=limit_per_host,
1335 )
1336 self._path = path
1338 @property
1339 def path(self) -> str:
1340 """Path to unix socket."""
1341 return self._path
1343 async def _create_connection(
1344 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1345 ) -> ResponseHandler:
1346 try:
1347 async with ceil_timeout(
1348 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1349 ):
1350 _, proto = await self._loop.create_unix_connection(
1351 self._factory, self._path
1352 )
1353 except OSError as exc:
1354 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1355 raise
1356 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
1358 return proto
1361class NamedPipeConnector(BaseConnector):
1362 """Named pipe connector.
1364 Only supported by the proactor event loop.
1365 See also: https://docs.python.org/3/library/asyncio-eventloop.html
1367 path - Windows named pipe path.
1368 keepalive_timeout - (optional) Keep-alive timeout.
1369 force_close - Set to True to force close and do reconnect
1370 after each request (and between redirects).
1371 limit - The total number of simultaneous connections.
1372 limit_per_host - Number of simultaneous connections to one host.
1373 loop - Optional event loop.
1374 """
1376 def __init__(
1377 self,
1378 path: str,
1379 force_close: bool = False,
1380 keepalive_timeout: Union[_SENTINEL, float, None] = sentinel,
1381 limit: int = 100,
1382 limit_per_host: int = 0,
1383 ) -> None:
1384 super().__init__(
1385 force_close=force_close,
1386 keepalive_timeout=keepalive_timeout,
1387 limit=limit,
1388 limit_per_host=limit_per_host,
1389 )
1390 if not isinstance(
1391 self._loop, asyncio.ProactorEventLoop # type: ignore[attr-defined]
1392 ):
1393 raise RuntimeError(
1394 "Named Pipes only available in proactor " "loop under windows"
1395 )
1396 self._path = path
1398 @property
1399 def path(self) -> str:
1400 """Path to the named pipe."""
1401 return self._path
1403 async def _create_connection(
1404 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1405 ) -> ResponseHandler:
1406 try:
1407 async with ceil_timeout(
1408 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1409 ):
1410 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined]
1411 self._factory, self._path
1412 )
1413 # the drain is required so that the connection_made is called
1414 # and transport is set otherwise it is not set before the
1415 # `assert conn.transport is not None`
1416 # in client.py's _request method
1417 await asyncio.sleep(0)
1418 # other option is to manually set transport like
1419 # `proto.transport = trans`
1420 except OSError as exc:
1421 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1422 raise
1423 raise ClientConnectorError(req.connection_key, exc) from exc
1425 return cast(ResponseHandler, proto)