Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/connector.py: 34%
715 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1import asyncio
2import functools
3import random
4import sys
5import traceback
6import warnings
7from collections import defaultdict, deque
8from contextlib import suppress
9from http import HTTPStatus
10from http.cookies import SimpleCookie
11from itertools import cycle, islice
12from time import monotonic
13from types import TracebackType
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Awaitable,
18 Callable,
19 DefaultDict,
20 Dict,
21 Iterator,
22 List,
23 Literal,
24 Optional,
25 Set,
26 Tuple,
27 Type,
28 Union,
29 cast,
30)
32import attr
34from . import hdrs, helpers
35from .abc import AbstractResolver
36from .client_exceptions import (
37 ClientConnectionError,
38 ClientConnectorCertificateError,
39 ClientConnectorError,
40 ClientConnectorSSLError,
41 ClientHttpProxyError,
42 ClientProxyConnectionError,
43 ServerFingerprintMismatch,
44 UnixClientConnectorError,
45 cert_errors,
46 ssl_errors,
47)
48from .client_proto import ResponseHandler
49from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params
50from .helpers import ceil_timeout, get_running_loop, is_ip_address, noop, sentinel
51from .locks import EventResultOrError
52from .resolver import DefaultResolver
54try:
55 import ssl
57 SSLContext = ssl.SSLContext
58except ImportError: # pragma: no cover
59 ssl = None # type: ignore[assignment]
60 SSLContext = object # type: ignore[misc,assignment]
63__all__ = ("BaseConnector", "TCPConnector", "UnixConnector", "NamedPipeConnector")
66if TYPE_CHECKING: # pragma: no cover
67 from .client import ClientTimeout
68 from .client_reqrep import ConnectionKey
69 from .tracing import Trace
72class _DeprecationWaiter:
73 __slots__ = ("_awaitable", "_awaited")
75 def __init__(self, awaitable: Awaitable[Any]) -> None:
76 self._awaitable = awaitable
77 self._awaited = False
79 def __await__(self) -> Any:
80 self._awaited = True
81 return self._awaitable.__await__()
83 def __del__(self) -> None:
84 if not self._awaited:
85 warnings.warn(
86 "Connector.close() is a coroutine, "
87 "please use await connector.close()",
88 DeprecationWarning,
89 )
92class Connection:
94 _source_traceback = None
95 _transport = None
97 def __init__(
98 self,
99 connector: "BaseConnector",
100 key: "ConnectionKey",
101 protocol: ResponseHandler,
102 loop: asyncio.AbstractEventLoop,
103 ) -> None:
104 self._key = key
105 self._connector = connector
106 self._loop = loop
107 self._protocol: Optional[ResponseHandler] = protocol
108 self._callbacks: List[Callable[[], None]] = []
110 if loop.get_debug():
111 self._source_traceback = traceback.extract_stack(sys._getframe(1))
113 def __repr__(self) -> str:
114 return f"Connection<{self._key}>"
116 def __del__(self, _warnings: Any = warnings) -> None:
117 if self._protocol is not None:
118 kwargs = {"source": self}
119 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs)
120 if self._loop.is_closed():
121 return
123 self._connector._release(self._key, self._protocol, should_close=True)
125 context = {"client_connection": self, "message": "Unclosed connection"}
126 if self._source_traceback is not None:
127 context["source_traceback"] = self._source_traceback
128 self._loop.call_exception_handler(context)
130 def __bool__(self) -> Literal[True]:
131 """Force subclasses to not be falsy, to make checks simpler."""
132 return True
134 @property
135 def loop(self) -> asyncio.AbstractEventLoop:
136 warnings.warn(
137 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2
138 )
139 return self._loop
141 @property
142 def transport(self) -> Optional[asyncio.Transport]:
143 if self._protocol is None:
144 return None
145 return self._protocol.transport
147 @property
148 def protocol(self) -> Optional[ResponseHandler]:
149 return self._protocol
151 def add_callback(self, callback: Callable[[], None]) -> None:
152 if callback is not None:
153 self._callbacks.append(callback)
155 def _notify_release(self) -> None:
156 callbacks, self._callbacks = self._callbacks[:], []
158 for cb in callbacks:
159 with suppress(Exception):
160 cb()
162 def close(self) -> None:
163 self._notify_release()
165 if self._protocol is not None:
166 self._connector._release(self._key, self._protocol, should_close=True)
167 self._protocol = None
169 def release(self) -> None:
170 self._notify_release()
172 if self._protocol is not None:
173 self._connector._release(
174 self._key, self._protocol, should_close=self._protocol.should_close
175 )
176 self._protocol = None
178 @property
179 def closed(self) -> bool:
180 return self._protocol is None or not self._protocol.is_connected()
183class _TransportPlaceholder:
184 """placeholder for BaseConnector.connect function"""
186 def close(self) -> None:
187 pass
190class BaseConnector:
191 """Base connector class.
193 keepalive_timeout - (optional) Keep-alive timeout.
194 force_close - Set to True to force close and do reconnect
195 after each request (and between redirects).
196 limit - The total number of simultaneous connections.
197 limit_per_host - Number of simultaneous connections to one host.
198 enable_cleanup_closed - Enables clean-up closed ssl transports.
199 Disabled by default.
200 timeout_ceil_threshold - Trigger ceiling of timeout values when
201 it's above timeout_ceil_threshold.
202 loop - Optional event loop.
203 """
205 _closed = True # prevent AttributeError in __del__ if ctor was failed
206 _source_traceback = None
208 # abort transport after 2 seconds (cleanup broken connections)
209 _cleanup_closed_period = 2.0
211 def __init__(
212 self,
213 *,
214 keepalive_timeout: Union[object, None, float] = sentinel,
215 force_close: bool = False,
216 limit: int = 100,
217 limit_per_host: int = 0,
218 enable_cleanup_closed: bool = False,
219 loop: Optional[asyncio.AbstractEventLoop] = None,
220 timeout_ceil_threshold: float = 5,
221 ) -> None:
223 if force_close:
224 if keepalive_timeout is not None and keepalive_timeout is not sentinel:
225 raise ValueError(
226 "keepalive_timeout cannot " "be set if force_close is True"
227 )
228 else:
229 if keepalive_timeout is sentinel:
230 keepalive_timeout = 15.0
232 loop = get_running_loop(loop)
233 self._timeout_ceil_threshold = timeout_ceil_threshold
235 self._closed = False
236 if loop.get_debug():
237 self._source_traceback = traceback.extract_stack(sys._getframe(1))
239 self._conns: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {}
240 self._limit = limit
241 self._limit_per_host = limit_per_host
242 self._acquired: Set[ResponseHandler] = set()
243 self._acquired_per_host: DefaultDict[
244 ConnectionKey, Set[ResponseHandler]
245 ] = defaultdict(set)
246 self._keepalive_timeout = cast(float, keepalive_timeout)
247 self._force_close = force_close
249 # {host_key: FIFO list of waiters}
250 self._waiters = defaultdict(deque) # type: ignore[var-annotated]
252 self._loop = loop
253 self._factory = functools.partial(ResponseHandler, loop=loop)
255 self.cookies = SimpleCookie()
257 # start keep-alive connection cleanup task
258 self._cleanup_handle: Optional[asyncio.TimerHandle] = None
260 # start cleanup closed transports task
261 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None
262 self._cleanup_closed_disabled = not enable_cleanup_closed
263 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = []
264 self._cleanup_closed()
266 def __del__(self, _warnings: Any = warnings) -> None:
267 if self._closed:
268 return
269 if not self._conns:
270 return
272 conns = [repr(c) for c in self._conns.values()]
274 self._close()
276 kwargs = {"source": self}
277 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs)
278 context = {
279 "connector": self,
280 "connections": conns,
281 "message": "Unclosed connector",
282 }
283 if self._source_traceback is not None:
284 context["source_traceback"] = self._source_traceback
285 self._loop.call_exception_handler(context)
287 def __enter__(self) -> "BaseConnector":
288 warnings.warn(
289 '"with Connector():" is deprecated, '
290 'use "async with Connector():" instead',
291 DeprecationWarning,
292 )
293 return self
295 def __exit__(self, *exc: Any) -> None:
296 self._close()
298 async def __aenter__(self) -> "BaseConnector":
299 return self
301 async def __aexit__(
302 self,
303 exc_type: Optional[Type[BaseException]] = None,
304 exc_value: Optional[BaseException] = None,
305 exc_traceback: Optional[TracebackType] = None,
306 ) -> None:
307 await self.close()
309 @property
310 def force_close(self) -> bool:
311 """Ultimately close connection on releasing if True."""
312 return self._force_close
314 @property
315 def limit(self) -> int:
316 """The total number for simultaneous connections.
318 If limit is 0 the connector has no limit.
319 The default limit size is 100.
320 """
321 return self._limit
323 @property
324 def limit_per_host(self) -> int:
325 """The limit for simultaneous connections to the same endpoint.
327 Endpoints are the same if they are have equal
328 (host, port, is_ssl) triple.
329 """
330 return self._limit_per_host
332 def _cleanup(self) -> None:
333 """Cleanup unused transports."""
334 if self._cleanup_handle:
335 self._cleanup_handle.cancel()
336 # _cleanup_handle should be unset, otherwise _release() will not
337 # recreate it ever!
338 self._cleanup_handle = None
340 now = self._loop.time()
341 timeout = self._keepalive_timeout
343 if self._conns:
344 connections = {}
345 deadline = now - timeout
346 for key, conns in self._conns.items():
347 alive = []
348 for proto, use_time in conns:
349 if proto.is_connected():
350 if use_time - deadline < 0:
351 transport = proto.transport
352 proto.close()
353 if key.is_ssl and not self._cleanup_closed_disabled:
354 self._cleanup_closed_transports.append(transport)
355 else:
356 alive.append((proto, use_time))
357 else:
358 transport = proto.transport
359 proto.close()
360 if key.is_ssl and not self._cleanup_closed_disabled:
361 self._cleanup_closed_transports.append(transport)
363 if alive:
364 connections[key] = alive
366 self._conns = connections
368 if self._conns:
369 self._cleanup_handle = helpers.weakref_handle(
370 self,
371 "_cleanup",
372 timeout,
373 self._loop,
374 timeout_ceil_threshold=self._timeout_ceil_threshold,
375 )
377 def _drop_acquired_per_host(
378 self, key: "ConnectionKey", val: ResponseHandler
379 ) -> None:
380 acquired_per_host = self._acquired_per_host
381 if key not in acquired_per_host:
382 return
383 conns = acquired_per_host[key]
384 conns.remove(val)
385 if not conns:
386 del self._acquired_per_host[key]
388 def _cleanup_closed(self) -> None:
389 """Double confirmation for transport close.
391 Some broken ssl servers may leave socket open without proper close.
392 """
393 if self._cleanup_closed_handle:
394 self._cleanup_closed_handle.cancel()
396 for transport in self._cleanup_closed_transports:
397 if transport is not None:
398 transport.abort()
400 self._cleanup_closed_transports = []
402 if not self._cleanup_closed_disabled:
403 self._cleanup_closed_handle = helpers.weakref_handle(
404 self,
405 "_cleanup_closed",
406 self._cleanup_closed_period,
407 self._loop,
408 timeout_ceil_threshold=self._timeout_ceil_threshold,
409 )
411 def close(self) -> Awaitable[None]:
412 """Close all opened transports."""
413 self._close()
414 return _DeprecationWaiter(noop())
416 def _close(self) -> None:
417 if self._closed:
418 return
420 self._closed = True
422 try:
423 if self._loop.is_closed():
424 return
426 # cancel cleanup task
427 if self._cleanup_handle:
428 self._cleanup_handle.cancel()
430 # cancel cleanup close task
431 if self._cleanup_closed_handle:
432 self._cleanup_closed_handle.cancel()
434 for data in self._conns.values():
435 for proto, t0 in data:
436 proto.close()
438 for proto in self._acquired:
439 proto.close()
441 for transport in self._cleanup_closed_transports:
442 if transport is not None:
443 transport.abort()
445 finally:
446 self._conns.clear()
447 self._acquired.clear()
448 self._waiters.clear()
449 self._cleanup_handle = None
450 self._cleanup_closed_transports.clear()
451 self._cleanup_closed_handle = None
453 @property
454 def closed(self) -> bool:
455 """Is connector closed.
457 A readonly property.
458 """
459 return self._closed
461 def _available_connections(self, key: "ConnectionKey") -> int:
462 """
463 Return number of available connections.
465 The limit, limit_per_host and the connection key are taken into account.
467 If it returns less than 1 means that there are no connections
468 available.
469 """
470 if self._limit:
471 # total calc available connections
472 available = self._limit - len(self._acquired)
474 # check limit per host
475 if (
476 self._limit_per_host
477 and available > 0
478 and key in self._acquired_per_host
479 ):
480 acquired = self._acquired_per_host.get(key)
481 assert acquired is not None
482 available = self._limit_per_host - len(acquired)
484 elif self._limit_per_host and key in self._acquired_per_host:
485 # check limit per host
486 acquired = self._acquired_per_host.get(key)
487 assert acquired is not None
488 available = self._limit_per_host - len(acquired)
489 else:
490 available = 1
492 return available
494 async def connect(
495 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
496 ) -> Connection:
497 """Get from pool or create new connection."""
498 key = req.connection_key
499 available = self._available_connections(key)
501 # Wait if there are no available connections or if there are/were
502 # waiters (i.e. don't steal connection from a waiter about to wake up)
503 if available <= 0 or key in self._waiters:
504 fut = self._loop.create_future()
506 # This connection will now count towards the limit.
507 self._waiters[key].append(fut)
509 if traces:
510 for trace in traces:
511 await trace.send_connection_queued_start()
513 try:
514 await fut
515 except BaseException as e:
516 if key in self._waiters:
517 # remove a waiter even if it was cancelled, normally it's
518 # removed when it's notified
519 try:
520 self._waiters[key].remove(fut)
521 except ValueError: # fut may no longer be in list
522 pass
524 raise e
525 finally:
526 if key in self._waiters and not self._waiters[key]:
527 del self._waiters[key]
529 if traces:
530 for trace in traces:
531 await trace.send_connection_queued_end()
533 proto = self._get(key)
534 if proto is None:
535 placeholder = cast(ResponseHandler, _TransportPlaceholder())
536 self._acquired.add(placeholder)
537 self._acquired_per_host[key].add(placeholder)
539 if traces:
540 for trace in traces:
541 await trace.send_connection_create_start()
543 try:
544 proto = await self._create_connection(req, traces, timeout)
545 if self._closed:
546 proto.close()
547 raise ClientConnectionError("Connector is closed.")
548 except BaseException:
549 if not self._closed:
550 self._acquired.remove(placeholder)
551 self._drop_acquired_per_host(key, placeholder)
552 self._release_waiter()
553 raise
554 else:
555 if not self._closed:
556 self._acquired.remove(placeholder)
557 self._drop_acquired_per_host(key, placeholder)
559 if traces:
560 for trace in traces:
561 await trace.send_connection_create_end()
562 else:
563 if traces:
564 # Acquire the connection to prevent race conditions with limits
565 placeholder = cast(ResponseHandler, _TransportPlaceholder())
566 self._acquired.add(placeholder)
567 self._acquired_per_host[key].add(placeholder)
568 for trace in traces:
569 await trace.send_connection_reuseconn()
570 self._acquired.remove(placeholder)
571 self._drop_acquired_per_host(key, placeholder)
573 self._acquired.add(proto)
574 self._acquired_per_host[key].add(proto)
575 return Connection(self, key, proto, self._loop)
577 def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]:
578 try:
579 conns = self._conns[key]
580 except KeyError:
581 return None
583 t1 = self._loop.time()
584 while conns:
585 proto, t0 = conns.pop()
586 if proto.is_connected():
587 if t1 - t0 > self._keepalive_timeout:
588 transport = proto.transport
589 proto.close()
590 # only for SSL transports
591 if key.is_ssl and not self._cleanup_closed_disabled:
592 self._cleanup_closed_transports.append(transport)
593 else:
594 if not conns:
595 # The very last connection was reclaimed: drop the key
596 del self._conns[key]
597 return proto
598 else:
599 transport = proto.transport
600 proto.close()
601 if key.is_ssl and not self._cleanup_closed_disabled:
602 self._cleanup_closed_transports.append(transport)
604 # No more connections: drop the key
605 del self._conns[key]
606 return None
608 def _release_waiter(self) -> None:
609 """
610 Iterates over all waiters until one to be released is found.
612 The one to be released is not finished and
613 belongs to a host that has available connections.
614 """
615 if not self._waiters:
616 return
618 # Having the dict keys ordered this avoids to iterate
619 # at the same order at each call.
620 queues = list(self._waiters.keys())
621 random.shuffle(queues)
623 for key in queues:
624 if self._available_connections(key) < 1:
625 continue
627 waiters = self._waiters[key]
628 while waiters:
629 waiter = waiters.popleft()
630 if not waiter.done():
631 waiter.set_result(None)
632 return
634 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
635 if self._closed:
636 # acquired connection is already released on connector closing
637 return
639 try:
640 self._acquired.remove(proto)
641 self._drop_acquired_per_host(key, proto)
642 except KeyError: # pragma: no cover
643 # this may be result of undetermenistic order of objects
644 # finalization due garbage collection.
645 pass
646 else:
647 self._release_waiter()
649 def _release(
650 self,
651 key: "ConnectionKey",
652 protocol: ResponseHandler,
653 *,
654 should_close: bool = False,
655 ) -> None:
656 if self._closed:
657 # acquired connection is already released on connector closing
658 return
660 self._release_acquired(key, protocol)
662 if self._force_close:
663 should_close = True
665 if should_close or protocol.should_close:
666 transport = protocol.transport
667 protocol.close()
669 if key.is_ssl and not self._cleanup_closed_disabled:
670 self._cleanup_closed_transports.append(transport)
671 else:
672 conns = self._conns.get(key)
673 if conns is None:
674 conns = self._conns[key] = []
675 conns.append((protocol, self._loop.time()))
677 if self._cleanup_handle is None:
678 self._cleanup_handle = helpers.weakref_handle(
679 self,
680 "_cleanup",
681 self._keepalive_timeout,
682 self._loop,
683 timeout_ceil_threshold=self._timeout_ceil_threshold,
684 )
686 async def _create_connection(
687 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
688 ) -> ResponseHandler:
689 raise NotImplementedError()
692class _DNSCacheTable:
693 def __init__(self, ttl: Optional[float] = None) -> None:
694 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[Dict[str, Any]], int]] = {}
695 self._timestamps: Dict[Tuple[str, int], float] = {}
696 self._ttl = ttl
698 def __contains__(self, host: object) -> bool:
699 return host in self._addrs_rr
701 def add(self, key: Tuple[str, int], addrs: List[Dict[str, Any]]) -> None:
702 self._addrs_rr[key] = (cycle(addrs), len(addrs))
704 if self._ttl is not None:
705 self._timestamps[key] = monotonic()
707 def remove(self, key: Tuple[str, int]) -> None:
708 self._addrs_rr.pop(key, None)
710 if self._ttl is not None:
711 self._timestamps.pop(key, None)
713 def clear(self) -> None:
714 self._addrs_rr.clear()
715 self._timestamps.clear()
717 def next_addrs(self, key: Tuple[str, int]) -> List[Dict[str, Any]]:
718 loop, length = self._addrs_rr[key]
719 addrs = list(islice(loop, length))
720 # Consume one more element to shift internal state of `cycle`
721 next(loop)
722 return addrs
724 def expired(self, key: Tuple[str, int]) -> bool:
725 if self._ttl is None:
726 return False
728 return self._timestamps[key] + self._ttl < monotonic()
731class TCPConnector(BaseConnector):
732 """TCP connector.
734 verify_ssl - Set to True to check ssl certifications.
735 fingerprint - Pass the binary sha256
736 digest of the expected certificate in DER format to verify
737 that the certificate the server presents matches. See also
738 https://en.wikipedia.org/wiki/Transport_Layer_Security#Certificate_pinning
739 resolver - Enable DNS lookups and use this
740 resolver
741 use_dns_cache - Use memory cache for DNS lookups.
742 ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
743 family - socket address family
744 local_addr - local tuple of (host, port) to bind socket to
746 keepalive_timeout - (optional) Keep-alive timeout.
747 force_close - Set to True to force close and do reconnect
748 after each request (and between redirects).
749 limit - The total number of simultaneous connections.
750 limit_per_host - Number of simultaneous connections to one host.
751 enable_cleanup_closed - Enables clean-up closed ssl transports.
752 Disabled by default.
753 loop - Optional event loop.
754 """
756 def __init__(
757 self,
758 *,
759 verify_ssl: bool = True,
760 fingerprint: Optional[bytes] = None,
761 use_dns_cache: bool = True,
762 ttl_dns_cache: Optional[int] = 10,
763 family: int = 0,
764 ssl_context: Optional[SSLContext] = None,
765 ssl: Union[None, Literal[False], Fingerprint, SSLContext] = None,
766 local_addr: Optional[Tuple[str, int]] = None,
767 resolver: Optional[AbstractResolver] = None,
768 keepalive_timeout: Union[None, float, object] = sentinel,
769 force_close: bool = False,
770 limit: int = 100,
771 limit_per_host: int = 0,
772 enable_cleanup_closed: bool = False,
773 loop: Optional[asyncio.AbstractEventLoop] = None,
774 timeout_ceil_threshold: float = 5,
775 ):
776 super().__init__(
777 keepalive_timeout=keepalive_timeout,
778 force_close=force_close,
779 limit=limit,
780 limit_per_host=limit_per_host,
781 enable_cleanup_closed=enable_cleanup_closed,
782 loop=loop,
783 timeout_ceil_threshold=timeout_ceil_threshold,
784 )
786 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
787 if resolver is None:
788 resolver = DefaultResolver(loop=self._loop)
789 self._resolver = resolver
791 self._use_dns_cache = use_dns_cache
792 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)
793 self._throttle_dns_events: Dict[Tuple[str, int], EventResultOrError] = {}
794 self._family = family
795 self._local_addr = local_addr
797 def close(self) -> Awaitable[None]:
798 """Close all ongoing DNS calls."""
799 for ev in self._throttle_dns_events.values():
800 ev.cancel()
802 return super().close()
804 @property
805 def family(self) -> int:
806 """Socket family like AF_INET."""
807 return self._family
809 @property
810 def use_dns_cache(self) -> bool:
811 """True if local DNS caching is enabled."""
812 return self._use_dns_cache
814 def clear_dns_cache(
815 self, host: Optional[str] = None, port: Optional[int] = None
816 ) -> None:
817 """Remove specified host/port or clear all dns local cache."""
818 if host is not None and port is not None:
819 self._cached_hosts.remove((host, port))
820 elif host is not None or port is not None:
821 raise ValueError("either both host and port " "or none of them are allowed")
822 else:
823 self._cached_hosts.clear()
825 async def _resolve_host(
826 self, host: str, port: int, traces: Optional[List["Trace"]] = None
827 ) -> List[Dict[str, Any]]:
828 if is_ip_address(host):
829 return [
830 {
831 "hostname": host,
832 "host": host,
833 "port": port,
834 "family": self._family,
835 "proto": 0,
836 "flags": 0,
837 }
838 ]
840 if not self._use_dns_cache:
842 if traces:
843 for trace in traces:
844 await trace.send_dns_resolvehost_start(host)
846 res = await self._resolver.resolve(host, port, family=self._family)
848 if traces:
849 for trace in traces:
850 await trace.send_dns_resolvehost_end(host)
852 return res
854 key = (host, port)
856 if (key in self._cached_hosts) and (not self._cached_hosts.expired(key)):
857 # get result early, before any await (#4014)
858 result = self._cached_hosts.next_addrs(key)
860 if traces:
861 for trace in traces:
862 await trace.send_dns_cache_hit(host)
863 return result
865 if key in self._throttle_dns_events:
866 # get event early, before any await (#4014)
867 event = self._throttle_dns_events[key]
868 if traces:
869 for trace in traces:
870 await trace.send_dns_cache_hit(host)
871 await event.wait()
872 else:
873 # update dict early, before any await (#4014)
874 self._throttle_dns_events[key] = EventResultOrError(self._loop)
875 if traces:
876 for trace in traces:
877 await trace.send_dns_cache_miss(host)
878 try:
880 if traces:
881 for trace in traces:
882 await trace.send_dns_resolvehost_start(host)
884 addrs = await self._resolver.resolve(host, port, family=self._family)
885 if traces:
886 for trace in traces:
887 await trace.send_dns_resolvehost_end(host)
889 self._cached_hosts.add(key, addrs)
890 self._throttle_dns_events[key].set()
891 except BaseException as e:
892 # any DNS exception, independently of the implementation
893 # is set for the waiters to raise the same exception.
894 self._throttle_dns_events[key].set(exc=e)
895 raise
896 finally:
897 self._throttle_dns_events.pop(key)
899 return self._cached_hosts.next_addrs(key)
901 async def _create_connection(
902 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
903 ) -> ResponseHandler:
904 """Create connection.
906 Has same keyword arguments as BaseEventLoop.create_connection.
907 """
908 if req.proxy:
909 _, proto = await self._create_proxy_connection(req, traces, timeout)
910 else:
911 _, proto = await self._create_direct_connection(req, traces, timeout)
913 return proto
915 @staticmethod
916 @functools.lru_cache(None)
917 def _make_ssl_context(verified: bool) -> SSLContext:
918 if verified:
919 return ssl.create_default_context()
920 else:
921 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
922 sslcontext.options |= ssl.OP_NO_SSLv2
923 sslcontext.options |= ssl.OP_NO_SSLv3
924 sslcontext.check_hostname = False
925 sslcontext.verify_mode = ssl.CERT_NONE
926 try:
927 sslcontext.options |= ssl.OP_NO_COMPRESSION
928 except AttributeError as attr_err:
929 warnings.warn(
930 "{!s}: The Python interpreter is compiled "
931 "against OpenSSL < 1.0.0. Ref: "
932 "https://docs.python.org/3/library/ssl.html"
933 "#ssl.OP_NO_COMPRESSION".format(attr_err),
934 )
935 sslcontext.set_default_verify_paths()
936 return sslcontext
938 def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]:
939 """Logic to get the correct SSL context
941 0. if req.ssl is false, return None
943 1. if ssl_context is specified in req, use it
944 2. if _ssl_context is specified in self, use it
945 3. otherwise:
946 1. if verify_ssl is not specified in req, use self.ssl_context
947 (will generate a default context according to self.verify_ssl)
948 2. if verify_ssl is True in req, generate a default SSL context
949 3. if verify_ssl is False in req, generate a SSL context that
950 won't verify
951 """
952 if req.is_ssl():
953 if ssl is None: # pragma: no cover
954 raise RuntimeError("SSL is not supported.")
955 sslcontext = req.ssl
956 if isinstance(sslcontext, ssl.SSLContext):
957 return sslcontext
958 if sslcontext is not None:
959 # not verified or fingerprinted
960 return self._make_ssl_context(False)
961 sslcontext = self._ssl
962 if isinstance(sslcontext, ssl.SSLContext):
963 return sslcontext
964 if sslcontext is not None:
965 # not verified or fingerprinted
966 return self._make_ssl_context(False)
967 return self._make_ssl_context(True)
968 else:
969 return None
971 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]:
972 ret = req.ssl
973 if isinstance(ret, Fingerprint):
974 return ret
975 ret = self._ssl
976 if isinstance(ret, Fingerprint):
977 return ret
978 return None
980 async def _wrap_create_connection(
981 self,
982 *args: Any,
983 req: ClientRequest,
984 timeout: "ClientTimeout",
985 client_error: Type[Exception] = ClientConnectorError,
986 **kwargs: Any,
987 ) -> Tuple[asyncio.Transport, ResponseHandler]:
988 try:
989 async with ceil_timeout(
990 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
991 ):
992 return await self._loop.create_connection(*args, **kwargs)
993 except cert_errors as exc:
994 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
995 except ssl_errors as exc:
996 raise ClientConnectorSSLError(req.connection_key, exc) from exc
997 except OSError as exc:
998 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
999 raise
1000 raise client_error(req.connection_key, exc) from exc
1002 def _fail_on_no_start_tls(self, req: "ClientRequest") -> None:
1003 """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``.
1005 It is necessary for TLS-in-TLS so that it is possible to
1006 send HTTPS queries through HTTPS proxies.
1008 This doesn't affect regular HTTP requests, though.
1009 """
1010 if not req.is_ssl():
1011 return
1013 proxy_url = req.proxy
1014 assert proxy_url is not None
1015 if proxy_url.scheme != "https":
1016 return
1018 self._check_loop_for_start_tls()
1020 def _check_loop_for_start_tls(self) -> None:
1021 try:
1022 self._loop.start_tls
1023 except AttributeError as attr_exc:
1024 raise RuntimeError(
1025 "An HTTPS request is being sent through an HTTPS proxy. "
1026 "This needs support for TLS in TLS but it is not implemented "
1027 "in your runtime for the stdlib asyncio.\n\n"
1028 "Please upgrade to Python 3.11 or higher. For more details, "
1029 "please see:\n"
1030 "* https://bugs.python.org/issue37179\n"
1031 "* https://github.com/python/cpython/pull/28073\n"
1032 "* https://docs.aiohttp.org/en/stable/"
1033 "client_advanced.html#proxy-support\n"
1034 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1035 ) from attr_exc
1037 def _loop_supports_start_tls(self) -> bool:
1038 try:
1039 self._check_loop_for_start_tls()
1040 except RuntimeError:
1041 return False
1042 else:
1043 return True
1045 def _warn_about_tls_in_tls(
1046 self,
1047 underlying_transport: asyncio.Transport,
1048 req: ClientRequest,
1049 ) -> None:
1050 """Issue a warning if the requested URL has HTTPS scheme."""
1051 if req.request_info.url.scheme != "https":
1052 return
1054 asyncio_supports_tls_in_tls = getattr(
1055 underlying_transport,
1056 "_start_tls_compatible",
1057 False,
1058 )
1060 if asyncio_supports_tls_in_tls:
1061 return
1063 warnings.warn(
1064 "An HTTPS request is being sent through an HTTPS proxy. "
1065 "This support for TLS in TLS is known to be disabled "
1066 "in the stdlib asyncio (Python <3.11). This is why you'll probably see "
1067 "an error in the log below.\n\n"
1068 "It is possible to enable it via monkeypatching. "
1069 "For more details, see:\n"
1070 "* https://bugs.python.org/issue37179\n"
1071 "* https://github.com/python/cpython/pull/28073\n\n"
1072 "You can temporarily patch this as follows:\n"
1073 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
1074 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1075 RuntimeWarning,
1076 source=self,
1077 # Why `4`? At least 3 of the calls in the stack originate
1078 # from the methods in this class.
1079 stacklevel=3,
1080 )
1082 async def _start_tls_connection(
1083 self,
1084 underlying_transport: asyncio.Transport,
1085 req: ClientRequest,
1086 timeout: "ClientTimeout",
1087 client_error: Type[Exception] = ClientConnectorError,
1088 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1089 """Wrap the raw TCP transport with TLS."""
1090 tls_proto = self._factory() # Create a brand new proto for TLS
1092 # Safety of the `cast()` call here is based on the fact that
1093 # internally `_get_ssl_context()` only returns `None` when
1094 # `req.is_ssl()` evaluates to `False` which is never gonna happen
1095 # in this code path. Of course, it's rather fragile
1096 # maintainability-wise but this is to be solved separately.
1097 sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req))
1099 try:
1100 async with ceil_timeout(
1101 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1102 ):
1103 try:
1104 tls_transport = await self._loop.start_tls(
1105 underlying_transport,
1106 tls_proto,
1107 sslcontext,
1108 server_hostname=req.server_hostname or req.host,
1109 ssl_handshake_timeout=timeout.total,
1110 )
1111 except BaseException:
1112 # We need to close the underlying transport since
1113 # `start_tls()` probably failed before it had a
1114 # chance to do this:
1115 underlying_transport.close()
1116 raise
1117 except cert_errors as exc:
1118 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1119 except ssl_errors as exc:
1120 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1121 except OSError as exc:
1122 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1123 raise
1124 raise client_error(req.connection_key, exc) from exc
1125 except TypeError as type_err:
1126 # Example cause looks like this:
1127 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
1128 # object at 0x7f760615e460> is not supported by start_tls()
1130 raise ClientConnectionError(
1131 "Cannot initialize a TLS-in-TLS connection to host "
1132 f"{req.host!s}:{req.port:d} through an underlying connection "
1133 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
1134 f"[{type_err!s}]"
1135 ) from type_err
1136 else:
1137 if tls_transport is None:
1138 msg = "Failed to start TLS (possibly caused by closing transport)"
1139 raise client_error(req.connection_key, OSError(msg))
1140 tls_proto.connection_made(
1141 tls_transport
1142 ) # Kick the state machine of the new TLS protocol
1144 return tls_transport, tls_proto
1146 async def _create_direct_connection(
1147 self,
1148 req: ClientRequest,
1149 traces: List["Trace"],
1150 timeout: "ClientTimeout",
1151 *,
1152 client_error: Type[Exception] = ClientConnectorError,
1153 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1154 sslcontext = self._get_ssl_context(req)
1155 fingerprint = self._get_fingerprint(req)
1157 host = req.url.raw_host
1158 assert host is not None
1159 # Replace multiple trailing dots with a single one.
1160 # A trailing dot is only present for fully-qualified domain names.
1161 # See https://github.com/aio-libs/aiohttp/pull/7364.
1162 if host.endswith(".."):
1163 host = host.rstrip(".") + "."
1164 port = req.port
1165 assert port is not None
1166 host_resolved = asyncio.ensure_future(
1167 self._resolve_host(host, port, traces=traces), loop=self._loop
1168 )
1169 try:
1170 # Cancelling this lookup should not cancel the underlying lookup
1171 # or else the cancel event will get broadcast to all the waiters
1172 # across all connections.
1173 hosts = await asyncio.shield(host_resolved)
1174 except asyncio.CancelledError:
1176 def drop_exception(fut: "asyncio.Future[List[Dict[str, Any]]]") -> None:
1177 with suppress(Exception, asyncio.CancelledError):
1178 fut.result()
1180 host_resolved.add_done_callback(drop_exception)
1181 raise
1182 except OSError as exc:
1183 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1184 raise
1185 # in case of proxy it is not ClientProxyConnectionError
1186 # it is problem of resolving proxy ip itself
1187 raise ClientConnectorError(req.connection_key, exc) from exc
1189 last_exc: Optional[Exception] = None
1191 for hinfo in hosts:
1192 host = hinfo["host"]
1193 port = hinfo["port"]
1195 # Strip trailing dots, certificates contain FQDN without dots.
1196 # See https://github.com/aio-libs/aiohttp/issues/3636
1197 server_hostname = (
1198 (req.server_hostname or hinfo["hostname"]).rstrip(".")
1199 if sslcontext
1200 else None
1201 )
1203 try:
1204 transp, proto = await self._wrap_create_connection(
1205 self._factory,
1206 host,
1207 port,
1208 timeout=timeout,
1209 ssl=sslcontext,
1210 family=hinfo["family"],
1211 proto=hinfo["proto"],
1212 flags=hinfo["flags"],
1213 server_hostname=server_hostname,
1214 local_addr=self._local_addr,
1215 req=req,
1216 client_error=client_error,
1217 )
1218 except ClientConnectorError as exc:
1219 last_exc = exc
1220 continue
1222 if req.is_ssl() and fingerprint:
1223 try:
1224 fingerprint.check(transp)
1225 except ServerFingerprintMismatch as exc:
1226 transp.close()
1227 if not self._cleanup_closed_disabled:
1228 self._cleanup_closed_transports.append(transp)
1229 last_exc = exc
1230 continue
1232 return transp, proto
1233 else:
1234 assert last_exc is not None
1235 raise last_exc
1237 async def _create_proxy_connection(
1238 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1239 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1240 self._fail_on_no_start_tls(req)
1241 runtime_has_start_tls = self._loop_supports_start_tls()
1243 headers: Dict[str, str] = {}
1244 if req.proxy_headers is not None:
1245 headers = req.proxy_headers # type: ignore[assignment]
1246 headers[hdrs.HOST] = req.headers[hdrs.HOST]
1248 url = req.proxy
1249 assert url is not None
1250 proxy_req = ClientRequest(
1251 hdrs.METH_GET,
1252 url,
1253 headers=headers,
1254 auth=req.proxy_auth,
1255 loop=self._loop,
1256 ssl=req.ssl,
1257 )
1259 # create connection to proxy server
1260 transport, proto = await self._create_direct_connection(
1261 proxy_req, [], timeout, client_error=ClientProxyConnectionError
1262 )
1264 # Many HTTP proxies has buggy keepalive support. Let's not
1265 # reuse connection but close it after processing every
1266 # response.
1267 proto.force_close()
1269 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
1270 if auth is not None:
1271 if not req.is_ssl():
1272 req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1273 else:
1274 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1276 if req.is_ssl():
1277 if runtime_has_start_tls:
1278 self._warn_about_tls_in_tls(transport, req)
1280 # For HTTPS requests over HTTP proxy
1281 # we must notify proxy to tunnel connection
1282 # so we send CONNECT command:
1283 # CONNECT www.python.org:443 HTTP/1.1
1284 # Host: www.python.org
1285 #
1286 # next we must do TLS handshake and so on
1287 # to do this we must wrap raw socket into secure one
1288 # asyncio handles this perfectly
1289 proxy_req.method = hdrs.METH_CONNECT
1290 proxy_req.url = req.url
1291 key = attr.evolve(
1292 req.connection_key, proxy=None, proxy_auth=None, proxy_headers_hash=None
1293 )
1294 conn = Connection(self, key, proto, self._loop)
1295 proxy_resp = await proxy_req.send(conn)
1296 try:
1297 protocol = conn._protocol
1298 assert protocol is not None
1300 # read_until_eof=True will ensure the connection isn't closed
1301 # once the response is received and processed allowing
1302 # START_TLS to work on the connection below.
1303 protocol.set_response_params(
1304 read_until_eof=runtime_has_start_tls,
1305 timeout_ceil_threshold=self._timeout_ceil_threshold,
1306 )
1307 resp = await proxy_resp.start(conn)
1308 except BaseException:
1309 proxy_resp.close()
1310 conn.close()
1311 raise
1312 else:
1313 conn._protocol = None
1314 conn._transport = None
1315 try:
1316 if resp.status != 200:
1317 message = resp.reason
1318 if message is None:
1319 message = HTTPStatus(resp.status).phrase
1320 raise ClientHttpProxyError(
1321 proxy_resp.request_info,
1322 resp.history,
1323 status=resp.status,
1324 message=message,
1325 headers=resp.headers,
1326 )
1327 if not runtime_has_start_tls:
1328 rawsock = transport.get_extra_info("socket", default=None)
1329 if rawsock is None:
1330 raise RuntimeError(
1331 "Transport does not expose socket instance"
1332 )
1333 # Duplicate the socket, so now we can close proxy transport
1334 rawsock = rawsock.dup()
1335 except BaseException:
1336 # It shouldn't be closed in `finally` because it's fed to
1337 # `loop.start_tls()` and the docs say not to touch it after
1338 # passing there.
1339 transport.close()
1340 raise
1341 finally:
1342 if not runtime_has_start_tls:
1343 transport.close()
1345 if not runtime_has_start_tls:
1346 # HTTP proxy with support for upgrade to HTTPS
1347 sslcontext = self._get_ssl_context(req)
1348 return await self._wrap_create_connection(
1349 self._factory,
1350 timeout=timeout,
1351 ssl=sslcontext,
1352 sock=rawsock,
1353 server_hostname=req.host,
1354 req=req,
1355 )
1357 return await self._start_tls_connection(
1358 # Access the old transport for the last time before it's
1359 # closed and forgotten forever:
1360 transport,
1361 req=req,
1362 timeout=timeout,
1363 )
1364 finally:
1365 proxy_resp.close()
1367 return transport, proto
1370class UnixConnector(BaseConnector):
1371 """Unix socket connector.
1373 path - Unix socket path.
1374 keepalive_timeout - (optional) Keep-alive timeout.
1375 force_close - Set to True to force close and do reconnect
1376 after each request (and between redirects).
1377 limit - The total number of simultaneous connections.
1378 limit_per_host - Number of simultaneous connections to one host.
1379 loop - Optional event loop.
1380 """
1382 def __init__(
1383 self,
1384 path: str,
1385 force_close: bool = False,
1386 keepalive_timeout: Union[object, float, None] = sentinel,
1387 limit: int = 100,
1388 limit_per_host: int = 0,
1389 loop: Optional[asyncio.AbstractEventLoop] = None,
1390 ) -> None:
1391 super().__init__(
1392 force_close=force_close,
1393 keepalive_timeout=keepalive_timeout,
1394 limit=limit,
1395 limit_per_host=limit_per_host,
1396 loop=loop,
1397 )
1398 self._path = path
1400 @property
1401 def path(self) -> str:
1402 """Path to unix socket."""
1403 return self._path
1405 async def _create_connection(
1406 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1407 ) -> ResponseHandler:
1408 try:
1409 async with ceil_timeout(
1410 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1411 ):
1412 _, proto = await self._loop.create_unix_connection(
1413 self._factory, self._path
1414 )
1415 except OSError as exc:
1416 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1417 raise
1418 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
1420 return proto
1423class NamedPipeConnector(BaseConnector):
1424 """Named pipe connector.
1426 Only supported by the proactor event loop.
1427 See also: https://docs.python.org/3/library/asyncio-eventloop.html
1429 path - Windows named pipe path.
1430 keepalive_timeout - (optional) Keep-alive timeout.
1431 force_close - Set to True to force close and do reconnect
1432 after each request (and between redirects).
1433 limit - The total number of simultaneous connections.
1434 limit_per_host - Number of simultaneous connections to one host.
1435 loop - Optional event loop.
1436 """
1438 def __init__(
1439 self,
1440 path: str,
1441 force_close: bool = False,
1442 keepalive_timeout: Union[object, float, None] = sentinel,
1443 limit: int = 100,
1444 limit_per_host: int = 0,
1445 loop: Optional[asyncio.AbstractEventLoop] = None,
1446 ) -> None:
1447 super().__init__(
1448 force_close=force_close,
1449 keepalive_timeout=keepalive_timeout,
1450 limit=limit,
1451 limit_per_host=limit_per_host,
1452 loop=loop,
1453 )
1454 if not isinstance(
1455 self._loop, asyncio.ProactorEventLoop # type: ignore[attr-defined]
1456 ):
1457 raise RuntimeError(
1458 "Named Pipes only available in proactor " "loop under windows"
1459 )
1460 self._path = path
1462 @property
1463 def path(self) -> str:
1464 """Path to the named pipe."""
1465 return self._path
1467 async def _create_connection(
1468 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1469 ) -> ResponseHandler:
1470 try:
1471 async with ceil_timeout(
1472 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1473 ):
1474 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined]
1475 self._factory, self._path
1476 )
1477 # the drain is required so that the connection_made is called
1478 # and transport is set otherwise it is not set before the
1479 # `assert conn.transport is not None`
1480 # in client.py's _request method
1481 await asyncio.sleep(0)
1482 # other option is to manually set transport like
1483 # `proto.transport = trans`
1484 except OSError as exc:
1485 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1486 raise
1487 raise ClientConnectorError(req.connection_key, exc) from exc
1489 return cast(ResponseHandler, proto)