Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/connector.py: 17%
670 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import asyncio
2import dataclasses
3import functools
4import logging
5import random
6import sys
7import traceback
8import warnings
9from collections import defaultdict, deque
10from contextlib import suppress
11from http import HTTPStatus
12from http.cookies import SimpleCookie
13from itertools import cycle, islice
14from time import monotonic
15from types import TracebackType
16from typing import ( # noqa
17 TYPE_CHECKING,
18 Any,
19 Awaitable,
20 Callable,
21 DefaultDict,
22 Dict,
23 Iterator,
24 List,
25 Optional,
26 Set,
27 Tuple,
28 Type,
29 Union,
30 cast,
31)
33from . import hdrs, helpers
34from .abc import AbstractResolver
35from .client_exceptions import (
36 ClientConnectionError,
37 ClientConnectorCertificateError,
38 ClientConnectorError,
39 ClientConnectorSSLError,
40 ClientHttpProxyError,
41 ClientProxyConnectionError,
42 ServerFingerprintMismatch,
43 UnixClientConnectorError,
44 cert_errors,
45 ssl_errors,
46)
47from .client_proto import ResponseHandler
48from .client_reqrep import SSL_ALLOWED_TYPES, ClientRequest, Fingerprint
49from .helpers import _SENTINEL, ceil_timeout, is_ip_address, sentinel, set_result
50from .locks import EventResultOrError
51from .resolver import DefaultResolver
53try:
54 import ssl
56 SSLContext = ssl.SSLContext
57except ImportError: # pragma: no cover
58 ssl = None # type: ignore[assignment]
59 SSLContext = object # type: ignore[misc,assignment]
62__all__ = ("BaseConnector", "TCPConnector", "UnixConnector", "NamedPipeConnector")
65if TYPE_CHECKING: # pragma: no cover
66 from .client import ClientTimeout
67 from .client_reqrep import ConnectionKey
68 from .tracing import Trace
71class Connection:
72 _source_traceback = None
73 _transport = None
75 def __init__(
76 self,
77 connector: "BaseConnector",
78 key: "ConnectionKey",
79 protocol: ResponseHandler,
80 loop: asyncio.AbstractEventLoop,
81 ) -> None:
82 self._key = key
83 self._connector = connector
84 self._loop = loop
85 self._protocol: Optional[ResponseHandler] = protocol
86 self._callbacks: List[Callable[[], None]] = []
88 if loop.get_debug():
89 self._source_traceback = traceback.extract_stack(sys._getframe(1))
91 def __repr__(self) -> str:
92 return f"Connection<{self._key}>"
94 def __del__(self, _warnings: Any = warnings) -> None:
95 if self._protocol is not None:
96 _warnings.warn(
97 f"Unclosed connection {self!r}", ResourceWarning, source=self
98 )
99 if self._loop.is_closed():
100 return
102 self._connector._release(self._key, self._protocol, should_close=True)
104 context = {"client_connection": self, "message": "Unclosed connection"}
105 if self._source_traceback is not None:
106 context["source_traceback"] = self._source_traceback
107 self._loop.call_exception_handler(context)
109 @property
110 def transport(self) -> Optional[asyncio.Transport]:
111 if self._protocol is None:
112 return None
113 return self._protocol.transport
115 @property
116 def protocol(self) -> Optional[ResponseHandler]:
117 return self._protocol
119 def add_callback(self, callback: Callable[[], None]) -> None:
120 if callback is not None:
121 self._callbacks.append(callback)
123 def _notify_release(self) -> None:
124 callbacks, self._callbacks = self._callbacks[:], []
126 for cb in callbacks:
127 with suppress(Exception):
128 cb()
130 def close(self) -> None:
131 self._notify_release()
133 if self._protocol is not None:
134 self._connector._release(self._key, self._protocol, should_close=True)
135 self._protocol = None
137 def release(self) -> None:
138 self._notify_release()
140 if self._protocol is not None:
141 self._connector._release(
142 self._key, self._protocol, should_close=self._protocol.should_close
143 )
144 self._protocol = None
146 @property
147 def closed(self) -> bool:
148 return self._protocol is None or not self._protocol.is_connected()
151class _TransportPlaceholder:
152 """placeholder for BaseConnector.connect function"""
154 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
155 fut = loop.create_future()
156 fut.set_result(None)
157 self.closed: asyncio.Future[Optional[Exception]] = fut
159 def close(self) -> None:
160 pass
163class BaseConnector:
164 """Base connector class.
166 keepalive_timeout - (optional) Keep-alive timeout.
167 force_close - Set to True to force close and do reconnect
168 after each request (and between redirects).
169 limit - The total number of simultaneous connections.
170 limit_per_host - Number of simultaneous connections to one host.
171 enable_cleanup_closed - Enables clean-up closed ssl transports.
172 Disabled by default.
173 timeout_ceil_threshold - Trigger ceiling of timeout values when
174 it's above timeout_ceil_threshold.
175 loop - Optional event loop.
176 """
178 _closed = True # prevent AttributeError in __del__ if ctor was failed
179 _source_traceback = None
181 # abort transport after 2 seconds (cleanup broken connections)
182 _cleanup_closed_period = 2.0
184 def __init__(
185 self,
186 *,
187 keepalive_timeout: Union[_SENTINEL, None, float] = sentinel,
188 force_close: bool = False,
189 limit: int = 100,
190 limit_per_host: int = 0,
191 enable_cleanup_closed: bool = False,
192 timeout_ceil_threshold: float = 5,
193 ) -> None:
194 if force_close:
195 if keepalive_timeout is not None and keepalive_timeout is not sentinel:
196 raise ValueError(
197 "keepalive_timeout cannot " "be set if force_close is True"
198 )
199 else:
200 if keepalive_timeout is sentinel:
201 keepalive_timeout = 15.0
203 self._timeout_ceil_threshold = timeout_ceil_threshold
205 loop = asyncio.get_running_loop()
207 self._closed = False
208 if loop.get_debug():
209 self._source_traceback = traceback.extract_stack(sys._getframe(1))
211 self._conns: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {}
212 self._limit = limit
213 self._limit_per_host = limit_per_host
214 self._acquired: Set[ResponseHandler] = set()
215 self._acquired_per_host: DefaultDict[
216 ConnectionKey, Set[ResponseHandler]
217 ] = defaultdict(set)
218 self._keepalive_timeout = cast(float, keepalive_timeout)
219 self._force_close = force_close
221 # {host_key: FIFO list of waiters}
222 self._waiters = defaultdict(deque) # type: ignore[var-annotated]
224 self._loop = loop
225 self._factory = functools.partial(ResponseHandler, loop=loop)
227 self.cookies: SimpleCookie[str] = SimpleCookie()
229 # start keep-alive connection cleanup task
230 self._cleanup_handle: Optional[asyncio.TimerHandle] = None
232 # start cleanup closed transports task
233 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None
234 self._cleanup_closed_disabled = not enable_cleanup_closed
235 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = []
236 self._cleanup_closed()
238 def __del__(self, _warnings: Any = warnings) -> None:
239 if self._closed:
240 return
241 if not self._conns:
242 return
244 conns = [repr(c) for c in self._conns.values()]
246 self._close_immediately()
248 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, source=self)
249 context = {
250 "connector": self,
251 "connections": conns,
252 "message": "Unclosed connector",
253 }
254 if self._source_traceback is not None:
255 context["source_traceback"] = self._source_traceback
256 self._loop.call_exception_handler(context)
258 async def __aenter__(self) -> "BaseConnector":
259 return self
261 async def __aexit__(
262 self,
263 exc_type: Optional[Type[BaseException]] = None,
264 exc_value: Optional[BaseException] = None,
265 exc_traceback: Optional[TracebackType] = None,
266 ) -> None:
267 await self.close()
269 @property
270 def force_close(self) -> bool:
271 """Ultimately close connection on releasing if True."""
272 return self._force_close
274 @property
275 def limit(self) -> int:
276 """The total number for simultaneous connections.
278 If limit is 0 the connector has no limit.
279 The default limit size is 100.
280 """
281 return self._limit
283 @property
284 def limit_per_host(self) -> int:
285 """The limit for simultaneous connections to the same endpoint.
287 Endpoints are the same if they are have equal
288 (host, port, is_ssl) triple.
289 """
290 return self._limit_per_host
292 def _cleanup(self) -> None:
293 """Cleanup unused transports."""
294 if self._cleanup_handle:
295 self._cleanup_handle.cancel()
296 # _cleanup_handle should be unset, otherwise _release() will not
297 # recreate it ever!
298 self._cleanup_handle = None
300 now = self._loop.time()
301 timeout = self._keepalive_timeout
303 if self._conns:
304 connections = {}
305 deadline = now - timeout
306 for key, conns in self._conns.items():
307 alive = []
308 for proto, use_time in conns:
309 if proto.is_connected():
310 if use_time - deadline < 0:
311 transport = proto.transport
312 proto.close()
313 if key.is_ssl and not self._cleanup_closed_disabled:
314 self._cleanup_closed_transports.append(transport)
315 else:
316 alive.append((proto, use_time))
317 else:
318 transport = proto.transport
319 proto.close()
320 if key.is_ssl and not self._cleanup_closed_disabled:
321 self._cleanup_closed_transports.append(transport)
323 if alive:
324 connections[key] = alive
326 self._conns = connections
328 if self._conns:
329 self._cleanup_handle = helpers.weakref_handle(
330 self,
331 "_cleanup",
332 timeout,
333 self._loop,
334 timeout_ceil_threshold=self._timeout_ceil_threshold,
335 )
337 def _drop_acquired_per_host(
338 self, key: "ConnectionKey", val: ResponseHandler
339 ) -> None:
340 acquired_per_host = self._acquired_per_host
341 if key not in acquired_per_host:
342 return
343 conns = acquired_per_host[key]
344 conns.remove(val)
345 if not conns:
346 del self._acquired_per_host[key]
348 def _cleanup_closed(self) -> None:
349 """Double confirmation for transport close.
351 Some broken ssl servers may leave socket open without proper close.
352 """
353 if self._cleanup_closed_handle:
354 self._cleanup_closed_handle.cancel()
356 for transport in self._cleanup_closed_transports:
357 if transport is not None:
358 transport.abort()
360 self._cleanup_closed_transports = []
362 if not self._cleanup_closed_disabled:
363 self._cleanup_closed_handle = helpers.weakref_handle(
364 self,
365 "_cleanup_closed",
366 self._cleanup_closed_period,
367 self._loop,
368 timeout_ceil_threshold=self._timeout_ceil_threshold,
369 )
371 async def close(self) -> None:
372 """Close all opened transports."""
373 waiters = self._close_immediately()
374 if waiters:
375 results = await asyncio.gather(*waiters, return_exceptions=True)
376 for res in results:
377 if isinstance(res, Exception):
378 err_msg = "Error while closing connector: " + repr(res)
379 logging.error(err_msg)
381 def _close_immediately(self) -> List["asyncio.Future[None]"]:
382 waiters: List["asyncio.Future[None]"] = []
384 if self._closed:
385 return waiters
387 self._closed = True
389 try:
390 if self._loop.is_closed():
391 return waiters
393 # cancel cleanup task
394 if self._cleanup_handle:
395 self._cleanup_handle.cancel()
397 # cancel cleanup close task
398 if self._cleanup_closed_handle:
399 self._cleanup_closed_handle.cancel()
401 for data in self._conns.values():
402 for proto, t0 in data:
403 proto.close()
404 waiters.append(proto.closed)
406 for proto in self._acquired:
407 proto.close()
408 waiters.append(proto.closed)
410 # TODO (A.Yushovskiy, 24-May-2019) collect transp. closing futures
411 for transport in self._cleanup_closed_transports:
412 if transport is not None:
413 transport.abort()
415 return waiters
417 finally:
418 self._conns.clear()
419 self._acquired.clear()
420 self._waiters.clear()
421 self._cleanup_handle = None
422 self._cleanup_closed_transports.clear()
423 self._cleanup_closed_handle = None
425 @property
426 def closed(self) -> bool:
427 """Is connector closed.
429 A readonly property.
430 """
431 return self._closed
433 def _available_connections(self, key: "ConnectionKey") -> int:
434 """
435 Return number of available connections.
437 The limit, limit_per_host and the connection key are taken into account.
439 If it returns less than 1 means that there are no connections
440 available.
441 """
442 if self._limit:
443 # total calc available connections
444 available = self._limit - len(self._acquired)
446 # check limit per host
447 if (
448 self._limit_per_host
449 and available > 0
450 and key in self._acquired_per_host
451 ):
452 acquired = self._acquired_per_host.get(key)
453 assert acquired is not None
454 available = self._limit_per_host - len(acquired)
456 elif self._limit_per_host and key in self._acquired_per_host:
457 # check limit per host
458 acquired = self._acquired_per_host.get(key)
459 assert acquired is not None
460 available = self._limit_per_host - len(acquired)
461 else:
462 available = 1
464 return available
466 async def connect(
467 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
468 ) -> Connection:
469 """Get from pool or create new connection."""
470 key = req.connection_key
471 available = self._available_connections(key)
473 # Wait if there are no available connections or if there are/were
474 # waiters (i.e. don't steal connection from a waiter about to wake up)
475 if available <= 0 or key in self._waiters:
476 fut = self._loop.create_future()
478 # This connection will now count towards the limit.
479 self._waiters[key].append(fut)
481 if traces:
482 for trace in traces:
483 await trace.send_connection_queued_start()
485 try:
486 await fut
487 except BaseException as e:
488 if key in self._waiters:
489 # remove a waiter even if it was cancelled, normally it's
490 # removed when it's notified
491 try:
492 self._waiters[key].remove(fut)
493 except ValueError: # fut may no longer be in list
494 pass
496 raise e
497 finally:
498 if key in self._waiters and not self._waiters[key]:
499 del self._waiters[key]
501 if traces:
502 for trace in traces:
503 await trace.send_connection_queued_end()
505 proto = self._get(key)
506 if proto is None:
507 placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop))
508 self._acquired.add(placeholder)
509 self._acquired_per_host[key].add(placeholder)
511 if traces:
512 for trace in traces:
513 await trace.send_connection_create_start()
515 try:
516 proto = await self._create_connection(req, traces, timeout)
517 if self._closed:
518 proto.close()
519 raise ClientConnectionError("Connector is closed.")
520 except BaseException:
521 if not self._closed:
522 self._acquired.remove(placeholder)
523 self._drop_acquired_per_host(key, placeholder)
524 self._release_waiter()
525 raise
526 else:
527 if not self._closed:
528 self._acquired.remove(placeholder)
529 self._drop_acquired_per_host(key, placeholder)
531 if traces:
532 for trace in traces:
533 await trace.send_connection_create_end()
534 else:
535 if traces:
536 # Acquire the connection to prevent race conditions with limits
537 placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop))
538 self._acquired.add(placeholder)
539 self._acquired_per_host[key].add(placeholder)
540 for trace in traces:
541 await trace.send_connection_reuseconn()
542 self._acquired.remove(placeholder)
543 self._drop_acquired_per_host(key, placeholder)
545 self._acquired.add(proto)
546 self._acquired_per_host[key].add(proto)
547 return Connection(self, key, proto, self._loop)
549 def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]:
550 try:
551 conns = self._conns[key]
552 except KeyError:
553 return None
555 t1 = self._loop.time()
556 while conns:
557 proto, t0 = conns.pop()
558 if proto.is_connected():
559 if t1 - t0 > self._keepalive_timeout:
560 transport = proto.transport
561 proto.close()
562 # only for SSL transports
563 if key.is_ssl and not self._cleanup_closed_disabled:
564 self._cleanup_closed_transports.append(transport)
565 else:
566 if not conns:
567 # The very last connection was reclaimed: drop the key
568 del self._conns[key]
569 return proto
570 else:
571 transport = proto.transport
572 proto.close()
573 if key.is_ssl and not self._cleanup_closed_disabled:
574 self._cleanup_closed_transports.append(transport)
576 # No more connections: drop the key
577 del self._conns[key]
578 return None
580 def _release_waiter(self) -> None:
581 """
582 Iterates over all waiters until one to be released is found.
584 The one to be released is not finished and
585 belongs to a host that has available connections.
586 """
587 if not self._waiters:
588 return
590 # Having the dict keys ordered this avoids to iterate
591 # at the same order at each call.
592 queues = list(self._waiters.keys())
593 random.shuffle(queues)
595 for key in queues:
596 if self._available_connections(key) < 1:
597 continue
599 waiters = self._waiters[key]
600 while waiters:
601 waiter = waiters.popleft()
602 if not waiter.done():
603 waiter.set_result(None)
604 return
606 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
607 if self._closed:
608 # acquired connection is already released on connector closing
609 return
611 try:
612 self._acquired.remove(proto)
613 self._drop_acquired_per_host(key, proto)
614 except KeyError: # pragma: no cover
615 # this may be result of undetermenistic order of objects
616 # finalization due garbage collection.
617 pass
618 else:
619 self._release_waiter()
621 def _release(
622 self,
623 key: "ConnectionKey",
624 protocol: ResponseHandler,
625 *,
626 should_close: bool = False,
627 ) -> None:
628 if self._closed:
629 # acquired connection is already released on connector closing
630 return
632 self._release_acquired(key, protocol)
634 if self._force_close:
635 should_close = True
637 if should_close or protocol.should_close:
638 transport = protocol.transport
639 protocol.close()
640 # TODO: Remove once fixed: https://bugs.python.org/issue39951
641 # See PR #6321
642 set_result(protocol.closed, None)
644 if key.is_ssl and not self._cleanup_closed_disabled:
645 self._cleanup_closed_transports.append(transport)
646 else:
647 conns = self._conns.get(key)
648 if conns is None:
649 conns = self._conns[key] = []
650 conns.append((protocol, self._loop.time()))
652 if self._cleanup_handle is None:
653 self._cleanup_handle = helpers.weakref_handle(
654 self,
655 "_cleanup",
656 self._keepalive_timeout,
657 self._loop,
658 timeout_ceil_threshold=self._timeout_ceil_threshold,
659 )
661 async def _create_connection(
662 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
663 ) -> ResponseHandler:
664 raise NotImplementedError()
667class _DNSCacheTable:
668 def __init__(self, ttl: Optional[float] = None) -> None:
669 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[Dict[str, Any]], int]] = {}
670 self._timestamps: Dict[Tuple[str, int], float] = {}
671 self._ttl = ttl
673 def __contains__(self, host: object) -> bool:
674 return host in self._addrs_rr
676 def add(self, key: Tuple[str, int], addrs: List[Dict[str, Any]]) -> None:
677 self._addrs_rr[key] = (cycle(addrs), len(addrs))
679 if self._ttl is not None:
680 self._timestamps[key] = monotonic()
682 def remove(self, key: Tuple[str, int]) -> None:
683 self._addrs_rr.pop(key, None)
685 if self._ttl is not None:
686 self._timestamps.pop(key, None)
688 def clear(self) -> None:
689 self._addrs_rr.clear()
690 self._timestamps.clear()
692 def next_addrs(self, key: Tuple[str, int]) -> List[Dict[str, Any]]:
693 loop, length = self._addrs_rr[key]
694 addrs = list(islice(loop, length))
695 # Consume one more element to shift internal state of `cycle`
696 next(loop)
697 return addrs
699 def expired(self, key: Tuple[str, int]) -> bool:
700 if self._ttl is None:
701 return False
703 return self._timestamps[key] + self._ttl < monotonic()
706class TCPConnector(BaseConnector):
707 """TCP connector.
709 verify_ssl - Set to True to check ssl certifications.
710 fingerprint - Pass the binary sha256
711 digest of the expected certificate in DER format to verify
712 that the certificate the server presents matches. See also
713 https://en.wikipedia.org/wiki/Transport_Layer_Security#Certificate_pinning
714 resolver - Enable DNS lookups and use this
715 resolver
716 use_dns_cache - Use memory cache for DNS lookups.
717 ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
718 family - socket address family
719 local_addr - local tuple of (host, port) to bind socket to
721 keepalive_timeout - (optional) Keep-alive timeout.
722 force_close - Set to True to force close and do reconnect
723 after each request (and between redirects).
724 limit - The total number of simultaneous connections.
725 limit_per_host - Number of simultaneous connections to one host.
726 enable_cleanup_closed - Enables clean-up closed ssl transports.
727 Disabled by default.
728 loop - Optional event loop.
729 """
731 def __init__(
732 self,
733 *,
734 use_dns_cache: bool = True,
735 ttl_dns_cache: Optional[int] = 10,
736 family: int = 0,
737 ssl: Union[None, bool, Fingerprint, SSLContext] = None,
738 local_addr: Optional[Tuple[str, int]] = None,
739 resolver: Optional[AbstractResolver] = None,
740 keepalive_timeout: Union[None, float, _SENTINEL] = sentinel,
741 force_close: bool = False,
742 limit: int = 100,
743 limit_per_host: int = 0,
744 enable_cleanup_closed: bool = False,
745 timeout_ceil_threshold: float = 5,
746 ) -> None:
747 super().__init__(
748 keepalive_timeout=keepalive_timeout,
749 force_close=force_close,
750 limit=limit,
751 limit_per_host=limit_per_host,
752 enable_cleanup_closed=enable_cleanup_closed,
753 timeout_ceil_threshold=timeout_ceil_threshold,
754 )
756 if not isinstance(ssl, SSL_ALLOWED_TYPES):
757 raise TypeError(
758 "ssl should be SSLContext, bool, Fingerprint, "
759 "or None, got {!r} instead.".format(ssl)
760 )
761 self._ssl = ssl
762 if resolver is None:
763 resolver = DefaultResolver()
764 self._resolver: AbstractResolver = resolver
766 self._use_dns_cache = use_dns_cache
767 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)
768 self._throttle_dns_events: Dict[Tuple[str, int], EventResultOrError] = {}
769 self._family = family
770 self._local_addr = local_addr
772 def _close_immediately(self) -> List["asyncio.Future[None]"]:
773 for ev in self._throttle_dns_events.values():
774 ev.cancel()
775 return super()._close_immediately()
777 @property
778 def family(self) -> int:
779 """Socket family like AF_INET."""
780 return self._family
782 @property
783 def use_dns_cache(self) -> bool:
784 """True if local DNS caching is enabled."""
785 return self._use_dns_cache
787 def clear_dns_cache(
788 self, host: Optional[str] = None, port: Optional[int] = None
789 ) -> None:
790 """Remove specified host/port or clear all dns local cache."""
791 if host is not None and port is not None:
792 self._cached_hosts.remove((host, port))
793 elif host is not None or port is not None:
794 raise ValueError("either both host and port " "or none of them are allowed")
795 else:
796 self._cached_hosts.clear()
798 async def _resolve_host(
799 self, host: str, port: int, traces: Optional[List["Trace"]] = None
800 ) -> List[Dict[str, Any]]:
801 if is_ip_address(host):
802 return [
803 {
804 "hostname": host,
805 "host": host,
806 "port": port,
807 "family": self._family,
808 "proto": 0,
809 "flags": 0,
810 }
811 ]
813 if not self._use_dns_cache:
814 if traces:
815 for trace in traces:
816 await trace.send_dns_resolvehost_start(host)
818 res = await self._resolver.resolve(host, port, family=self._family)
820 if traces:
821 for trace in traces:
822 await trace.send_dns_resolvehost_end(host)
824 return res
826 key = (host, port)
828 if (key in self._cached_hosts) and (not self._cached_hosts.expired(key)):
829 # get result early, before any await (#4014)
830 result = self._cached_hosts.next_addrs(key)
832 if traces:
833 for trace in traces:
834 await trace.send_dns_cache_hit(host)
835 return result
837 if key in self._throttle_dns_events:
838 # get event early, before any await (#4014)
839 event = self._throttle_dns_events[key]
840 if traces:
841 for trace in traces:
842 await trace.send_dns_cache_hit(host)
843 await event.wait()
844 else:
845 # update dict early, before any await (#4014)
846 self._throttle_dns_events[key] = EventResultOrError(self._loop)
847 if traces:
848 for trace in traces:
849 await trace.send_dns_cache_miss(host)
850 try:
851 if traces:
852 for trace in traces:
853 await trace.send_dns_resolvehost_start(host)
855 addrs = await self._resolver.resolve(host, port, family=self._family)
856 if traces:
857 for trace in traces:
858 await trace.send_dns_resolvehost_end(host)
860 self._cached_hosts.add(key, addrs)
861 self._throttle_dns_events[key].set()
862 except BaseException as e:
863 # any DNS exception, independently of the implementation
864 # is set for the waiters to raise the same exception.
865 self._throttle_dns_events[key].set(exc=e)
866 raise
867 finally:
868 self._throttle_dns_events.pop(key)
870 return self._cached_hosts.next_addrs(key)
872 async def _create_connection(
873 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
874 ) -> ResponseHandler:
875 """Create connection.
877 Has same keyword arguments as BaseEventLoop.create_connection.
878 """
879 if req.proxy:
880 _, proto = await self._create_proxy_connection(req, traces, timeout)
881 else:
882 _, proto = await self._create_direct_connection(req, traces, timeout)
884 return proto
886 @staticmethod
887 @functools.lru_cache(None)
888 def _make_ssl_context(verified: bool) -> SSLContext:
889 if verified:
890 return ssl.create_default_context()
891 else:
892 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
893 sslcontext.options |= ssl.OP_NO_SSLv2
894 sslcontext.options |= ssl.OP_NO_SSLv3
895 sslcontext.check_hostname = False
896 sslcontext.verify_mode = ssl.CERT_NONE
897 try:
898 sslcontext.options |= ssl.OP_NO_COMPRESSION
899 except AttributeError as attr_err:
900 warnings.warn(
901 "{!s}: The Python interpreter is compiled "
902 "against OpenSSL < 1.0.0. Ref: "
903 "https://docs.python.org/3/library/ssl.html"
904 "#ssl.OP_NO_COMPRESSION".format(attr_err),
905 )
906 sslcontext.set_default_verify_paths()
907 return sslcontext
909 def _get_ssl_context(self, req: "ClientRequest") -> Optional[SSLContext]:
910 """Logic to get the correct SSL context
912 0. if req.ssl is false, return None
914 1. if ssl_context is specified in req, use it
915 2. if _ssl_context is specified in self, use it
916 3. otherwise:
917 1. if verify_ssl is not specified in req, use self.ssl_context
918 (will generate a default context according to self.verify_ssl)
919 2. if verify_ssl is True in req, generate a default SSL context
920 3. if verify_ssl is False in req, generate a SSL context that
921 won't verify
922 """
923 if req.is_ssl():
924 if ssl is None: # pragma: no cover
925 raise RuntimeError("SSL is not supported.")
926 sslcontext = req.ssl
927 if isinstance(sslcontext, ssl.SSLContext):
928 return sslcontext
929 if sslcontext is not None:
930 # not verified or fingerprinted
931 return self._make_ssl_context(False)
932 sslcontext = self._ssl
933 if isinstance(sslcontext, ssl.SSLContext):
934 return sslcontext
935 if sslcontext is not None:
936 # not verified or fingerprinted
937 return self._make_ssl_context(False)
938 return self._make_ssl_context(True)
939 else:
940 return None
942 def _get_fingerprint(self, req: "ClientRequest") -> Optional["Fingerprint"]:
943 ret = req.ssl
944 if isinstance(ret, Fingerprint):
945 return ret
946 ret = self._ssl
947 if isinstance(ret, Fingerprint):
948 return ret
949 return None
951 async def _wrap_create_connection(
952 self,
953 *args: Any,
954 req: "ClientRequest",
955 timeout: "ClientTimeout",
956 client_error: Type[Exception] = ClientConnectorError,
957 **kwargs: Any,
958 ) -> Tuple[asyncio.Transport, ResponseHandler]:
959 try:
960 async with ceil_timeout(
961 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
962 ):
963 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
964 except cert_errors as exc:
965 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
966 except ssl_errors as exc:
967 raise ClientConnectorSSLError(req.connection_key, exc) from exc
968 except OSError as exc:
969 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
970 raise
971 raise client_error(req.connection_key, exc) from exc
973 def _warn_about_tls_in_tls(
974 self,
975 underlying_transport: asyncio.Transport,
976 req: "ClientRequest",
977 ) -> None:
978 """Issue a warning if the requested URL has HTTPS scheme."""
979 if req.request_info.url.scheme != "https":
980 return
982 asyncio_supports_tls_in_tls = getattr(
983 underlying_transport,
984 "_start_tls_compatible",
985 False,
986 )
988 if asyncio_supports_tls_in_tls:
989 return
991 warnings.warn(
992 "An HTTPS request is being sent through an HTTPS proxy. "
993 "This support for TLS in TLS is known to be disabled "
994 "in the stdlib asyncio. This is why you'll probably see "
995 "an error in the log below.\n\n"
996 "It is possible to enable it via monkeypatching under "
997 "Python 3.7 or higher. For more details, see:\n"
998 "* https://bugs.python.org/issue37179\n"
999 "* https://github.com/python/cpython/pull/28073\n\n"
1000 "You can temporarily patch this as follows:\n"
1001 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
1002 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1003 RuntimeWarning,
1004 source=self,
1005 # Why `4`? At least 3 of the calls in the stack originate
1006 # from the methods in this class.
1007 stacklevel=3,
1008 )
1010 async def _start_tls_connection(
1011 self,
1012 underlying_transport: asyncio.Transport,
1013 req: "ClientRequest",
1014 timeout: "ClientTimeout",
1015 client_error: Type[Exception] = ClientConnectorError,
1016 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1017 """Wrap the raw TCP transport with TLS."""
1018 tls_proto = self._factory() # Create a brand new proto for TLS
1020 # Safety of the `cast()` call here is based on the fact that
1021 # internally `_get_ssl_context()` only returns `None` when
1022 # `req.is_ssl()` evaluates to `False` which is never gonna happen
1023 # in this code path. Of course, it's rather fragile
1024 # maintainability-wise but this is to be solved separately.
1025 sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req))
1027 try:
1028 async with ceil_timeout(
1029 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1030 ):
1031 try:
1032 tls_transport = await self._loop.start_tls(
1033 underlying_transport,
1034 tls_proto,
1035 sslcontext,
1036 server_hostname=req.host,
1037 ssl_handshake_timeout=timeout.total,
1038 )
1039 except BaseException:
1040 # We need to close the underlying transport since
1041 # `start_tls()` probably failed before it had a
1042 # chance to do this:
1043 underlying_transport.close()
1044 raise
1045 except cert_errors as exc:
1046 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1047 except ssl_errors as exc:
1048 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1049 except OSError as exc:
1050 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1051 raise
1052 raise client_error(req.connection_key, exc) from exc
1053 except TypeError as type_err:
1054 # Example cause looks like this:
1055 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
1056 # object at 0x7f760615e460> is not supported by start_tls()
1058 raise ClientConnectionError(
1059 "Cannot initialize a TLS-in-TLS connection to host "
1060 f"{req.host!s}:{req.port:d} through an underlying connection "
1061 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
1062 f"[{type_err!s}]"
1063 ) from type_err
1064 else:
1065 tls_proto.connection_made(
1066 tls_transport
1067 ) # Kick the state machine of the new TLS protocol
1069 return tls_transport, tls_proto
1071 async def _create_direct_connection(
1072 self,
1073 req: "ClientRequest",
1074 traces: List["Trace"],
1075 timeout: "ClientTimeout",
1076 *,
1077 client_error: Type[Exception] = ClientConnectorError,
1078 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1079 sslcontext = self._get_ssl_context(req)
1080 fingerprint = self._get_fingerprint(req)
1082 host = req.url.raw_host
1083 assert host is not None
1084 port = req.port
1085 assert port is not None
1086 host_resolved = asyncio.ensure_future(
1087 self._resolve_host(host, port, traces=traces), loop=self._loop
1088 )
1089 try:
1090 # Cancelling this lookup should not cancel the underlying lookup
1091 # or else the cancel event will get broadcast to all the waiters
1092 # across all connections.
1093 hosts = await asyncio.shield(host_resolved)
1094 except asyncio.CancelledError:
1096 def drop_exception(fut: "asyncio.Future[List[Dict[str, Any]]]") -> None:
1097 with suppress(Exception, asyncio.CancelledError):
1098 fut.result()
1100 host_resolved.add_done_callback(drop_exception)
1101 raise
1102 except OSError as exc:
1103 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1104 raise
1105 # in case of proxy it is not ClientProxyConnectionError
1106 # it is problem of resolving proxy ip itself
1107 raise ClientConnectorError(req.connection_key, exc) from exc
1109 last_exc: Optional[Exception] = None
1111 for hinfo in hosts:
1112 host = hinfo["host"]
1113 port = hinfo["port"]
1115 try:
1116 transp, proto = await self._wrap_create_connection(
1117 self._factory,
1118 host,
1119 port,
1120 timeout=timeout,
1121 ssl=sslcontext,
1122 family=hinfo["family"],
1123 proto=hinfo["proto"],
1124 flags=hinfo["flags"],
1125 server_hostname=hinfo["hostname"] if sslcontext else None,
1126 local_addr=self._local_addr,
1127 req=req,
1128 client_error=client_error,
1129 )
1130 except ClientConnectorError as exc:
1131 last_exc = exc
1132 continue
1134 if req.is_ssl() and fingerprint:
1135 try:
1136 fingerprint.check(transp)
1137 except ServerFingerprintMismatch as exc:
1138 transp.close()
1139 if not self._cleanup_closed_disabled:
1140 self._cleanup_closed_transports.append(transp)
1141 last_exc = exc
1142 continue
1144 return transp, proto
1145 assert last_exc is not None
1146 raise last_exc
1148 async def _create_proxy_connection(
1149 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
1150 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1151 headers: Dict[str, str] = {}
1152 if req.proxy_headers is not None:
1153 headers = req.proxy_headers # type: ignore[assignment]
1154 headers[hdrs.HOST] = req.headers[hdrs.HOST]
1156 url = req.proxy
1157 assert url is not None
1158 proxy_req = ClientRequest(
1159 hdrs.METH_GET,
1160 url,
1161 headers=headers,
1162 auth=req.proxy_auth,
1163 loop=self._loop,
1164 ssl=req.ssl,
1165 )
1167 # create connection to proxy server
1168 transport, proto = await self._create_direct_connection(
1169 proxy_req, [], timeout, client_error=ClientProxyConnectionError
1170 )
1172 # Many HTTP proxies has buggy keepalive support. Let's not
1173 # reuse connection but close it after processing every
1174 # response.
1175 proto.force_close()
1177 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
1178 if auth is not None:
1179 if not req.is_ssl():
1180 req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1181 else:
1182 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1184 if req.is_ssl():
1185 self._warn_about_tls_in_tls(transport, req)
1187 # For HTTPS requests over HTTP proxy
1188 # we must notify proxy to tunnel connection
1189 # so we send CONNECT command:
1190 # CONNECT www.python.org:443 HTTP/1.1
1191 # Host: www.python.org
1192 #
1193 # next we must do TLS handshake and so on
1194 # to do this we must wrap raw socket into secure one
1195 # asyncio handles this perfectly
1196 proxy_req.method = hdrs.METH_CONNECT
1197 proxy_req.url = req.url
1198 key = dataclasses.replace(
1199 req.connection_key, proxy=None, proxy_auth=None, proxy_headers_hash=None
1200 )
1201 conn = Connection(self, key, proto, self._loop)
1202 proxy_resp = await proxy_req.send(conn)
1203 try:
1204 protocol = conn._protocol
1205 assert protocol is not None
1207 # read_until_eof=True will ensure the connection isn't closed
1208 # once the response is received and processed allowing
1209 # START_TLS to work on the connection below.
1210 protocol.set_response_params(
1211 read_until_eof=True,
1212 timeout_ceil_threshold=self._timeout_ceil_threshold,
1213 )
1214 resp = await proxy_resp.start(conn)
1215 except BaseException:
1216 proxy_resp.close()
1217 conn.close()
1218 raise
1219 else:
1220 conn._protocol = None
1221 conn._transport = None
1222 try:
1223 if resp.status != 200:
1224 message = resp.reason
1225 if message is None:
1226 message = HTTPStatus(resp.status).phrase
1227 raise ClientHttpProxyError(
1228 proxy_resp.request_info,
1229 resp.history,
1230 status=resp.status,
1231 message=message,
1232 headers=resp.headers,
1233 )
1234 except BaseException:
1235 # It shouldn't be closed in `finally` because it's fed to
1236 # `loop.start_tls()` and the docs say not to touch it after
1237 # passing there.
1238 transport.close()
1239 raise
1241 return await self._start_tls_connection(
1242 # Access the old transport for the last time before it's
1243 # closed and forgotten forever:
1244 transport,
1245 req=req,
1246 timeout=timeout,
1247 )
1248 finally:
1249 proxy_resp.close()
1251 return transport, proto
1254class UnixConnector(BaseConnector):
1255 """Unix socket connector.
1257 path - Unix socket path.
1258 keepalive_timeout - (optional) Keep-alive timeout.
1259 force_close - Set to True to force close and do reconnect
1260 after each request (and between redirects).
1261 limit - The total number of simultaneous connections.
1262 limit_per_host - Number of simultaneous connections to one host.
1263 loop - Optional event loop.
1264 """
1266 def __init__(
1267 self,
1268 path: str,
1269 force_close: bool = False,
1270 keepalive_timeout: Union[_SENTINEL, float, None] = sentinel,
1271 limit: int = 100,
1272 limit_per_host: int = 0,
1273 ) -> None:
1274 super().__init__(
1275 force_close=force_close,
1276 keepalive_timeout=keepalive_timeout,
1277 limit=limit,
1278 limit_per_host=limit_per_host,
1279 )
1280 self._path = path
1282 @property
1283 def path(self) -> str:
1284 """Path to unix socket."""
1285 return self._path
1287 async def _create_connection(
1288 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
1289 ) -> ResponseHandler:
1290 try:
1291 async with ceil_timeout(
1292 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1293 ):
1294 _, proto = await self._loop.create_unix_connection(
1295 self._factory, self._path
1296 )
1297 except OSError as exc:
1298 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1299 raise
1300 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
1302 return proto
1305class NamedPipeConnector(BaseConnector):
1306 """Named pipe connector.
1308 Only supported by the proactor event loop.
1309 See also: https://docs.python.org/3/library/asyncio-eventloop.html
1311 path - Windows named pipe path.
1312 keepalive_timeout - (optional) Keep-alive timeout.
1313 force_close - Set to True to force close and do reconnect
1314 after each request (and between redirects).
1315 limit - The total number of simultaneous connections.
1316 limit_per_host - Number of simultaneous connections to one host.
1317 loop - Optional event loop.
1318 """
1320 def __init__(
1321 self,
1322 path: str,
1323 force_close: bool = False,
1324 keepalive_timeout: Union[_SENTINEL, float, None] = sentinel,
1325 limit: int = 100,
1326 limit_per_host: int = 0,
1327 ) -> None:
1328 super().__init__(
1329 force_close=force_close,
1330 keepalive_timeout=keepalive_timeout,
1331 limit=limit,
1332 limit_per_host=limit_per_host,
1333 )
1334 if not isinstance(
1335 self._loop, asyncio.ProactorEventLoop # type: ignore[attr-defined]
1336 ):
1337 raise RuntimeError(
1338 "Named Pipes only available in proactor " "loop under windows"
1339 )
1340 self._path = path
1342 @property
1343 def path(self) -> str:
1344 """Path to the named pipe."""
1345 return self._path
1347 async def _create_connection(
1348 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
1349 ) -> ResponseHandler:
1350 try:
1351 async with ceil_timeout(
1352 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1353 ):
1354 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] # noqa: E501
1355 self._factory, self._path
1356 )
1357 # the drain is required so that the connection_made is called
1358 # and transport is set otherwise it is not set before the
1359 # `assert conn.transport is not None`
1360 # in client.py's _request method
1361 await asyncio.sleep(0)
1362 # other option is to manually set transport like
1363 # `proto.transport = trans`
1364 except OSError as exc:
1365 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1366 raise
1367 raise ClientConnectorError(req.connection_key, exc) from exc
1369 return cast(ResponseHandler, proto)