Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/connector.py: 33%
710 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
1import asyncio
2import functools
3import random
4import sys
5import traceback
6import warnings
7from collections import defaultdict, deque
8from contextlib import suppress
9from http.cookies import SimpleCookie
10from itertools import cycle, islice
11from time import monotonic
12from types import TracebackType
13from typing import (
14 TYPE_CHECKING,
15 Any,
16 Awaitable,
17 Callable,
18 DefaultDict,
19 Dict,
20 Iterator,
21 List,
22 Optional,
23 Set,
24 Tuple,
25 Type,
26 Union,
27 cast,
28)
30import attr
32from . import hdrs, helpers
33from .abc import AbstractResolver
34from .client_exceptions import (
35 ClientConnectionError,
36 ClientConnectorCertificateError,
37 ClientConnectorError,
38 ClientConnectorSSLError,
39 ClientHttpProxyError,
40 ClientProxyConnectionError,
41 ServerFingerprintMismatch,
42 UnixClientConnectorError,
43 cert_errors,
44 ssl_errors,
45)
46from .client_proto import ResponseHandler
47from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params
48from .helpers import (
49 PY_36,
50 ceil_timeout,
51 get_running_loop,
52 is_ip_address,
53 noop,
54 sentinel,
55)
56from .http import RESPONSES
57from .locks import EventResultOrError
58from .resolver import DefaultResolver
60try:
61 import ssl
63 SSLContext = ssl.SSLContext
64except ImportError: # pragma: no cover
65 ssl = None # type: ignore[assignment]
66 SSLContext = object # type: ignore[misc,assignment]
69__all__ = ("BaseConnector", "TCPConnector", "UnixConnector", "NamedPipeConnector")
72if TYPE_CHECKING: # pragma: no cover
73 from .client import ClientTimeout
74 from .client_reqrep import ConnectionKey
75 from .tracing import Trace
78class _DeprecationWaiter:
79 __slots__ = ("_awaitable", "_awaited")
81 def __init__(self, awaitable: Awaitable[Any]) -> None:
82 self._awaitable = awaitable
83 self._awaited = False
85 def __await__(self) -> Any:
86 self._awaited = True
87 return self._awaitable.__await__()
89 def __del__(self) -> None:
90 if not self._awaited:
91 warnings.warn(
92 "Connector.close() is a coroutine, "
93 "please use await connector.close()",
94 DeprecationWarning,
95 )
98class Connection:
100 _source_traceback = None
101 _transport = None
103 def __init__(
104 self,
105 connector: "BaseConnector",
106 key: "ConnectionKey",
107 protocol: ResponseHandler,
108 loop: asyncio.AbstractEventLoop,
109 ) -> None:
110 self._key = key
111 self._connector = connector
112 self._loop = loop
113 self._protocol: Optional[ResponseHandler] = protocol
114 self._callbacks: List[Callable[[], None]] = []
116 if loop.get_debug():
117 self._source_traceback = traceback.extract_stack(sys._getframe(1))
119 def __repr__(self) -> str:
120 return f"Connection<{self._key}>"
122 def __del__(self, _warnings: Any = warnings) -> None:
123 if self._protocol is not None:
124 if PY_36:
125 kwargs = {"source": self}
126 else:
127 kwargs = {}
128 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs)
129 if self._loop.is_closed():
130 return
132 self._connector._release(self._key, self._protocol, should_close=True)
134 context = {"client_connection": self, "message": "Unclosed connection"}
135 if self._source_traceback is not None:
136 context["source_traceback"] = self._source_traceback
137 self._loop.call_exception_handler(context)
139 @property
140 def loop(self) -> asyncio.AbstractEventLoop:
141 warnings.warn(
142 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2
143 )
144 return self._loop
146 @property
147 def transport(self) -> Optional[asyncio.Transport]:
148 if self._protocol is None:
149 return None
150 return self._protocol.transport
152 @property
153 def protocol(self) -> Optional[ResponseHandler]:
154 return self._protocol
156 def add_callback(self, callback: Callable[[], None]) -> None:
157 if callback is not None:
158 self._callbacks.append(callback)
160 def _notify_release(self) -> None:
161 callbacks, self._callbacks = self._callbacks[:], []
163 for cb in callbacks:
164 with suppress(Exception):
165 cb()
167 def close(self) -> None:
168 self._notify_release()
170 if self._protocol is not None:
171 self._connector._release(self._key, self._protocol, should_close=True)
172 self._protocol = None
174 def release(self) -> None:
175 self._notify_release()
177 if self._protocol is not None:
178 self._connector._release(
179 self._key, self._protocol, should_close=self._protocol.should_close
180 )
181 self._protocol = None
183 @property
184 def closed(self) -> bool:
185 return self._protocol is None or not self._protocol.is_connected()
188class _TransportPlaceholder:
189 """placeholder for BaseConnector.connect function"""
191 def close(self) -> None:
192 pass
195class BaseConnector:
196 """Base connector class.
198 keepalive_timeout - (optional) Keep-alive timeout.
199 force_close - Set to True to force close and do reconnect
200 after each request (and between redirects).
201 limit - The total number of simultaneous connections.
202 limit_per_host - Number of simultaneous connections to one host.
203 enable_cleanup_closed - Enables clean-up closed ssl transports.
204 Disabled by default.
205 loop - Optional event loop.
206 """
208 _closed = True # prevent AttributeError in __del__ if ctor was failed
209 _source_traceback = None
211 # abort transport after 2 seconds (cleanup broken connections)
212 _cleanup_closed_period = 2.0
214 def __init__(
215 self,
216 *,
217 keepalive_timeout: Union[object, None, float] = sentinel,
218 force_close: bool = False,
219 limit: int = 100,
220 limit_per_host: int = 0,
221 enable_cleanup_closed: bool = False,
222 loop: Optional[asyncio.AbstractEventLoop] = None,
223 ) -> None:
225 if force_close:
226 if keepalive_timeout is not None and keepalive_timeout is not sentinel:
227 raise ValueError(
228 "keepalive_timeout cannot " "be set if force_close is True"
229 )
230 else:
231 if keepalive_timeout is sentinel:
232 keepalive_timeout = 15.0
234 loop = get_running_loop(loop)
236 self._closed = False
237 if loop.get_debug():
238 self._source_traceback = traceback.extract_stack(sys._getframe(1))
240 self._conns: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {}
241 self._limit = limit
242 self._limit_per_host = limit_per_host
243 self._acquired: Set[ResponseHandler] = set()
244 self._acquired_per_host: DefaultDict[
245 ConnectionKey, Set[ResponseHandler]
246 ] = defaultdict(set)
247 self._keepalive_timeout = cast(float, keepalive_timeout)
248 self._force_close = force_close
250 # {host_key: FIFO list of waiters}
251 self._waiters = defaultdict(deque) # type: ignore[var-annotated]
253 self._loop = loop
254 self._factory = functools.partial(ResponseHandler, loop=loop)
256 self.cookies: SimpleCookie[str] = SimpleCookie()
258 # start keep-alive connection cleanup task
259 self._cleanup_handle: Optional[asyncio.TimerHandle] = None
261 # start cleanup closed transports task
262 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None
263 self._cleanup_closed_disabled = not enable_cleanup_closed
264 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = []
265 self._cleanup_closed()
267 def __del__(self, _warnings: Any = warnings) -> None:
268 if self._closed:
269 return
270 if not self._conns:
271 return
273 conns = [repr(c) for c in self._conns.values()]
275 self._close()
277 if PY_36:
278 kwargs = {"source": self}
279 else:
280 kwargs = {}
281 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs)
282 context = {
283 "connector": self,
284 "connections": conns,
285 "message": "Unclosed connector",
286 }
287 if self._source_traceback is not None:
288 context["source_traceback"] = self._source_traceback
289 self._loop.call_exception_handler(context)
291 def __enter__(self) -> "BaseConnector":
292 warnings.warn(
293 '"with Connector():" is deprecated, '
294 'use "async with Connector():" instead',
295 DeprecationWarning,
296 )
297 return self
299 def __exit__(self, *exc: Any) -> None:
300 self._close()
302 async def __aenter__(self) -> "BaseConnector":
303 return self
305 async def __aexit__(
306 self,
307 exc_type: Optional[Type[BaseException]] = None,
308 exc_value: Optional[BaseException] = None,
309 exc_traceback: Optional[TracebackType] = None,
310 ) -> None:
311 await self.close()
313 @property
314 def force_close(self) -> bool:
315 """Ultimately close connection on releasing if True."""
316 return self._force_close
318 @property
319 def limit(self) -> int:
320 """The total number for simultaneous connections.
322 If limit is 0 the connector has no limit.
323 The default limit size is 100.
324 """
325 return self._limit
327 @property
328 def limit_per_host(self) -> int:
329 """The limit for simultaneous connections to the same endpoint.
331 Endpoints are the same if they are have equal
332 (host, port, is_ssl) triple.
333 """
334 return self._limit_per_host
336 def _cleanup(self) -> None:
337 """Cleanup unused transports."""
338 if self._cleanup_handle:
339 self._cleanup_handle.cancel()
340 # _cleanup_handle should be unset, otherwise _release() will not
341 # recreate it ever!
342 self._cleanup_handle = None
344 now = self._loop.time()
345 timeout = self._keepalive_timeout
347 if self._conns:
348 connections = {}
349 deadline = now - timeout
350 for key, conns in self._conns.items():
351 alive = []
352 for proto, use_time in conns:
353 if proto.is_connected():
354 if use_time - deadline < 0:
355 transport = proto.transport
356 proto.close()
357 if key.is_ssl and not self._cleanup_closed_disabled:
358 self._cleanup_closed_transports.append(transport)
359 else:
360 alive.append((proto, use_time))
361 else:
362 transport = proto.transport
363 proto.close()
364 if key.is_ssl and not self._cleanup_closed_disabled:
365 self._cleanup_closed_transports.append(transport)
367 if alive:
368 connections[key] = alive
370 self._conns = connections
372 if self._conns:
373 self._cleanup_handle = helpers.weakref_handle(
374 self, "_cleanup", timeout, self._loop
375 )
377 def _drop_acquired_per_host(
378 self, key: "ConnectionKey", val: ResponseHandler
379 ) -> None:
380 acquired_per_host = self._acquired_per_host
381 if key not in acquired_per_host:
382 return
383 conns = acquired_per_host[key]
384 conns.remove(val)
385 if not conns:
386 del self._acquired_per_host[key]
388 def _cleanup_closed(self) -> None:
389 """Double confirmation for transport close.
391 Some broken ssl servers may leave socket open without proper close.
392 """
393 if self._cleanup_closed_handle:
394 self._cleanup_closed_handle.cancel()
396 for transport in self._cleanup_closed_transports:
397 if transport is not None:
398 transport.abort()
400 self._cleanup_closed_transports = []
402 if not self._cleanup_closed_disabled:
403 self._cleanup_closed_handle = helpers.weakref_handle(
404 self, "_cleanup_closed", self._cleanup_closed_period, self._loop
405 )
407 def close(self) -> Awaitable[None]:
408 """Close all opened transports."""
409 self._close()
410 return _DeprecationWaiter(noop())
412 def _close(self) -> None:
413 if self._closed:
414 return
416 self._closed = True
418 try:
419 if self._loop.is_closed():
420 return
422 # cancel cleanup task
423 if self._cleanup_handle:
424 self._cleanup_handle.cancel()
426 # cancel cleanup close task
427 if self._cleanup_closed_handle:
428 self._cleanup_closed_handle.cancel()
430 for data in self._conns.values():
431 for proto, t0 in data:
432 proto.close()
434 for proto in self._acquired:
435 proto.close()
437 for transport in self._cleanup_closed_transports:
438 if transport is not None:
439 transport.abort()
441 finally:
442 self._conns.clear()
443 self._acquired.clear()
444 self._waiters.clear()
445 self._cleanup_handle = None
446 self._cleanup_closed_transports.clear()
447 self._cleanup_closed_handle = None
449 @property
450 def closed(self) -> bool:
451 """Is connector closed.
453 A readonly property.
454 """
455 return self._closed
457 def _available_connections(self, key: "ConnectionKey") -> int:
458 """
459 Return number of available connections.
461 The limit, limit_per_host and the connection key are taken into account.
463 If it returns less than 1 means that there are no connections
464 available.
465 """
466 if self._limit:
467 # total calc available connections
468 available = self._limit - len(self._acquired)
470 # check limit per host
471 if (
472 self._limit_per_host
473 and available > 0
474 and key in self._acquired_per_host
475 ):
476 acquired = self._acquired_per_host.get(key)
477 assert acquired is not None
478 available = self._limit_per_host - len(acquired)
480 elif self._limit_per_host and key in self._acquired_per_host:
481 # check limit per host
482 acquired = self._acquired_per_host.get(key)
483 assert acquired is not None
484 available = self._limit_per_host - len(acquired)
485 else:
486 available = 1
488 return available
490 async def connect(
491 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
492 ) -> Connection:
493 """Get from pool or create new connection."""
494 key = req.connection_key
495 available = self._available_connections(key)
497 # Wait if there are no available connections or if there are/were
498 # waiters (i.e. don't steal connection from a waiter about to wake up)
499 if available <= 0 or key in self._waiters:
500 fut = self._loop.create_future()
502 # This connection will now count towards the limit.
503 self._waiters[key].append(fut)
505 if traces:
506 for trace in traces:
507 await trace.send_connection_queued_start()
509 try:
510 await fut
511 except BaseException as e:
512 if key in self._waiters:
513 # remove a waiter even if it was cancelled, normally it's
514 # removed when it's notified
515 try:
516 self._waiters[key].remove(fut)
517 except ValueError: # fut may no longer be in list
518 pass
520 raise e
521 finally:
522 if key in self._waiters and not self._waiters[key]:
523 del self._waiters[key]
525 if traces:
526 for trace in traces:
527 await trace.send_connection_queued_end()
529 proto = self._get(key)
530 if proto is None:
531 placeholder = cast(ResponseHandler, _TransportPlaceholder())
532 self._acquired.add(placeholder)
533 self._acquired_per_host[key].add(placeholder)
535 if traces:
536 for trace in traces:
537 await trace.send_connection_create_start()
539 try:
540 proto = await self._create_connection(req, traces, timeout)
541 if self._closed:
542 proto.close()
543 raise ClientConnectionError("Connector is closed.")
544 except BaseException:
545 if not self._closed:
546 self._acquired.remove(placeholder)
547 self._drop_acquired_per_host(key, placeholder)
548 self._release_waiter()
549 raise
550 else:
551 if not self._closed:
552 self._acquired.remove(placeholder)
553 self._drop_acquired_per_host(key, placeholder)
555 if traces:
556 for trace in traces:
557 await trace.send_connection_create_end()
558 else:
559 if traces:
560 # Acquire the connection to prevent race conditions with limits
561 placeholder = cast(ResponseHandler, _TransportPlaceholder())
562 self._acquired.add(placeholder)
563 self._acquired_per_host[key].add(placeholder)
564 for trace in traces:
565 await trace.send_connection_reuseconn()
566 self._acquired.remove(placeholder)
567 self._drop_acquired_per_host(key, placeholder)
569 self._acquired.add(proto)
570 self._acquired_per_host[key].add(proto)
571 return Connection(self, key, proto, self._loop)
573 def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]:
574 try:
575 conns = self._conns[key]
576 except KeyError:
577 return None
579 t1 = self._loop.time()
580 while conns:
581 proto, t0 = conns.pop()
582 if proto.is_connected():
583 if t1 - t0 > self._keepalive_timeout:
584 transport = proto.transport
585 proto.close()
586 # only for SSL transports
587 if key.is_ssl and not self._cleanup_closed_disabled:
588 self._cleanup_closed_transports.append(transport)
589 else:
590 if not conns:
591 # The very last connection was reclaimed: drop the key
592 del self._conns[key]
593 return proto
594 else:
595 transport = proto.transport
596 proto.close()
597 if key.is_ssl and not self._cleanup_closed_disabled:
598 self._cleanup_closed_transports.append(transport)
600 # No more connections: drop the key
601 del self._conns[key]
602 return None
604 def _release_waiter(self) -> None:
605 """
606 Iterates over all waiters until one to be released is found.
608 The one to be released is not finsihed and
609 belongs to a host that has available connections.
610 """
611 if not self._waiters:
612 return
614 # Having the dict keys ordered this avoids to iterate
615 # at the same order at each call.
616 queues = list(self._waiters.keys())
617 random.shuffle(queues)
619 for key in queues:
620 if self._available_connections(key) < 1:
621 continue
623 waiters = self._waiters[key]
624 while waiters:
625 waiter = waiters.popleft()
626 if not waiter.done():
627 waiter.set_result(None)
628 return
630 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
631 if self._closed:
632 # acquired connection is already released on connector closing
633 return
635 try:
636 self._acquired.remove(proto)
637 self._drop_acquired_per_host(key, proto)
638 except KeyError: # pragma: no cover
639 # this may be result of undetermenistic order of objects
640 # finalization due garbage collection.
641 pass
642 else:
643 self._release_waiter()
645 def _release(
646 self,
647 key: "ConnectionKey",
648 protocol: ResponseHandler,
649 *,
650 should_close: bool = False,
651 ) -> None:
652 if self._closed:
653 # acquired connection is already released on connector closing
654 return
656 self._release_acquired(key, protocol)
658 if self._force_close:
659 should_close = True
661 if should_close or protocol.should_close:
662 transport = protocol.transport
663 protocol.close()
665 if key.is_ssl and not self._cleanup_closed_disabled:
666 self._cleanup_closed_transports.append(transport)
667 else:
668 conns = self._conns.get(key)
669 if conns is None:
670 conns = self._conns[key] = []
671 conns.append((protocol, self._loop.time()))
673 if self._cleanup_handle is None:
674 self._cleanup_handle = helpers.weakref_handle(
675 self, "_cleanup", self._keepalive_timeout, self._loop
676 )
678 async def _create_connection(
679 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
680 ) -> ResponseHandler:
681 raise NotImplementedError()
684class _DNSCacheTable:
685 def __init__(self, ttl: Optional[float] = None) -> None:
686 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[Dict[str, Any]], int]] = {}
687 self._timestamps: Dict[Tuple[str, int], float] = {}
688 self._ttl = ttl
690 def __contains__(self, host: object) -> bool:
691 return host in self._addrs_rr
693 def add(self, key: Tuple[str, int], addrs: List[Dict[str, Any]]) -> None:
694 self._addrs_rr[key] = (cycle(addrs), len(addrs))
696 if self._ttl:
697 self._timestamps[key] = monotonic()
699 def remove(self, key: Tuple[str, int]) -> None:
700 self._addrs_rr.pop(key, None)
702 if self._ttl:
703 self._timestamps.pop(key, None)
705 def clear(self) -> None:
706 self._addrs_rr.clear()
707 self._timestamps.clear()
709 def next_addrs(self, key: Tuple[str, int]) -> List[Dict[str, Any]]:
710 loop, length = self._addrs_rr[key]
711 addrs = list(islice(loop, length))
712 # Consume one more element to shift internal state of `cycle`
713 next(loop)
714 return addrs
716 def expired(self, key: Tuple[str, int]) -> bool:
717 if self._ttl is None:
718 return False
720 return self._timestamps[key] + self._ttl < monotonic()
723class TCPConnector(BaseConnector):
724 """TCP connector.
726 verify_ssl - Set to True to check ssl certifications.
727 fingerprint - Pass the binary sha256
728 digest of the expected certificate in DER format to verify
729 that the certificate the server presents matches. See also
730 https://en.wikipedia.org/wiki/Transport_Layer_Security#Certificate_pinning
731 resolver - Enable DNS lookups and use this
732 resolver
733 use_dns_cache - Use memory cache for DNS lookups.
734 ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
735 family - socket address family
736 local_addr - local tuple of (host, port) to bind socket to
738 keepalive_timeout - (optional) Keep-alive timeout.
739 force_close - Set to True to force close and do reconnect
740 after each request (and between redirects).
741 limit - The total number of simultaneous connections.
742 limit_per_host - Number of simultaneous connections to one host.
743 enable_cleanup_closed - Enables clean-up closed ssl transports.
744 Disabled by default.
745 loop - Optional event loop.
746 """
748 def __init__(
749 self,
750 *,
751 verify_ssl: bool = True,
752 fingerprint: Optional[bytes] = None,
753 use_dns_cache: bool = True,
754 ttl_dns_cache: Optional[int] = 10,
755 family: int = 0,
756 ssl_context: Optional[SSLContext] = None,
757 ssl: Union[None, bool, Fingerprint, SSLContext] = None,
758 local_addr: Optional[Tuple[str, int]] = None,
759 resolver: Optional[AbstractResolver] = None,
760 keepalive_timeout: Union[None, float, object] = sentinel,
761 force_close: bool = False,
762 limit: int = 100,
763 limit_per_host: int = 0,
764 enable_cleanup_closed: bool = False,
765 loop: Optional[asyncio.AbstractEventLoop] = None,
766 ):
767 super().__init__(
768 keepalive_timeout=keepalive_timeout,
769 force_close=force_close,
770 limit=limit,
771 limit_per_host=limit_per_host,
772 enable_cleanup_closed=enable_cleanup_closed,
773 loop=loop,
774 )
776 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
777 if resolver is None:
778 resolver = DefaultResolver(loop=self._loop)
779 self._resolver = resolver
781 self._use_dns_cache = use_dns_cache
782 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)
783 self._throttle_dns_events: Dict[Tuple[str, int], EventResultOrError] = {}
784 self._family = family
785 self._local_addr = local_addr
787 def close(self) -> Awaitable[None]:
788 """Close all ongoing DNS calls."""
789 for ev in self._throttle_dns_events.values():
790 ev.cancel()
792 return super().close()
794 @property
795 def family(self) -> int:
796 """Socket family like AF_INET."""
797 return self._family
799 @property
800 def use_dns_cache(self) -> bool:
801 """True if local DNS caching is enabled."""
802 return self._use_dns_cache
804 def clear_dns_cache(
805 self, host: Optional[str] = None, port: Optional[int] = None
806 ) -> None:
807 """Remove specified host/port or clear all dns local cache."""
808 if host is not None and port is not None:
809 self._cached_hosts.remove((host, port))
810 elif host is not None or port is not None:
811 raise ValueError("either both host and port " "or none of them are allowed")
812 else:
813 self._cached_hosts.clear()
815 async def _resolve_host(
816 self, host: str, port: int, traces: Optional[List["Trace"]] = None
817 ) -> List[Dict[str, Any]]:
818 if is_ip_address(host):
819 return [
820 {
821 "hostname": host,
822 "host": host,
823 "port": port,
824 "family": self._family,
825 "proto": 0,
826 "flags": 0,
827 }
828 ]
830 if not self._use_dns_cache:
832 if traces:
833 for trace in traces:
834 await trace.send_dns_resolvehost_start(host)
836 res = await self._resolver.resolve(host, port, family=self._family)
838 if traces:
839 for trace in traces:
840 await trace.send_dns_resolvehost_end(host)
842 return res
844 key = (host, port)
846 if (key in self._cached_hosts) and (not self._cached_hosts.expired(key)):
847 # get result early, before any await (#4014)
848 result = self._cached_hosts.next_addrs(key)
850 if traces:
851 for trace in traces:
852 await trace.send_dns_cache_hit(host)
853 return result
855 if key in self._throttle_dns_events:
856 # get event early, before any await (#4014)
857 event = self._throttle_dns_events[key]
858 if traces:
859 for trace in traces:
860 await trace.send_dns_cache_hit(host)
861 await event.wait()
862 else:
863 # update dict early, before any await (#4014)
864 self._throttle_dns_events[key] = EventResultOrError(self._loop)
865 if traces:
866 for trace in traces:
867 await trace.send_dns_cache_miss(host)
868 try:
870 if traces:
871 for trace in traces:
872 await trace.send_dns_resolvehost_start(host)
874 addrs = await self._resolver.resolve(host, port, family=self._family)
875 if traces:
876 for trace in traces:
877 await trace.send_dns_resolvehost_end(host)
879 self._cached_hosts.add(key, addrs)
880 self._throttle_dns_events[key].set()
881 except BaseException as e:
882 # any DNS exception, independently of the implementation
883 # is set for the waiters to raise the same exception.
884 self._throttle_dns_events[key].set(exc=e)
885 raise
886 finally:
887 self._throttle_dns_events.pop(key)
889 return self._cached_hosts.next_addrs(key)
891 async def _create_connection(
892 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
893 ) -> ResponseHandler:
894 """Create connection.
896 Has same keyword arguments as BaseEventLoop.create_connection.
897 """
898 if req.proxy:
899 _, proto = await self._create_proxy_connection(req, traces, timeout)
900 else:
901 _, proto = await self._create_direct_connection(req, traces, timeout)
903 return proto
905 @staticmethod
906 @functools.lru_cache(None)
907 def _make_ssl_context(verified: bool) -> SSLContext:
908 if verified:
909 return ssl.create_default_context()
910 else:
911 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
912 sslcontext.options |= ssl.OP_NO_SSLv2
913 sslcontext.options |= ssl.OP_NO_SSLv3
914 sslcontext.check_hostname = False
915 sslcontext.verify_mode = ssl.CERT_NONE
916 try:
917 sslcontext.options |= ssl.OP_NO_COMPRESSION
918 except AttributeError as attr_err:
919 warnings.warn(
920 "{!s}: The Python interpreter is compiled "
921 "against OpenSSL < 1.0.0. Ref: "
922 "https://docs.python.org/3/library/ssl.html"
923 "#ssl.OP_NO_COMPRESSION".format(attr_err),
924 )
925 sslcontext.set_default_verify_paths()
926 return sslcontext
928 def _get_ssl_context(self, req: "ClientRequest") -> Optional[SSLContext]:
929 """Logic to get the correct SSL context
931 0. if req.ssl is false, return None
933 1. if ssl_context is specified in req, use it
934 2. if _ssl_context is specified in self, use it
935 3. otherwise:
936 1. if verify_ssl is not specified in req, use self.ssl_context
937 (will generate a default context according to self.verify_ssl)
938 2. if verify_ssl is True in req, generate a default SSL context
939 3. if verify_ssl is False in req, generate a SSL context that
940 won't verify
941 """
942 if req.is_ssl():
943 if ssl is None: # pragma: no cover
944 raise RuntimeError("SSL is not supported.")
945 sslcontext = req.ssl
946 if isinstance(sslcontext, ssl.SSLContext):
947 return sslcontext
948 if sslcontext is not None:
949 # not verified or fingerprinted
950 return self._make_ssl_context(False)
951 sslcontext = self._ssl
952 if isinstance(sslcontext, ssl.SSLContext):
953 return sslcontext
954 if sslcontext is not None:
955 # not verified or fingerprinted
956 return self._make_ssl_context(False)
957 return self._make_ssl_context(True)
958 else:
959 return None
961 def _get_fingerprint(self, req: "ClientRequest") -> Optional["Fingerprint"]:
962 ret = req.ssl
963 if isinstance(ret, Fingerprint):
964 return ret
965 ret = self._ssl
966 if isinstance(ret, Fingerprint):
967 return ret
968 return None
970 async def _wrap_create_connection(
971 self,
972 *args: Any,
973 req: "ClientRequest",
974 timeout: "ClientTimeout",
975 client_error: Type[Exception] = ClientConnectorError,
976 **kwargs: Any,
977 ) -> Tuple[asyncio.Transport, ResponseHandler]:
978 try:
979 async with ceil_timeout(timeout.sock_connect):
980 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
981 except cert_errors as exc:
982 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
983 except ssl_errors as exc:
984 raise ClientConnectorSSLError(req.connection_key, exc) from exc
985 except OSError as exc:
986 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
987 raise
988 raise client_error(req.connection_key, exc) from exc
990 def _fail_on_no_start_tls(self, req: "ClientRequest") -> None:
991 """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``.
993 One case is that :py:meth:`asyncio.loop.start_tls` is not yet
994 implemented under Python 3.6. It is necessary for TLS-in-TLS so
995 that it is possible to send HTTPS queries through HTTPS proxies.
997 This doesn't affect regular HTTP requests, though.
998 """
999 if not req.is_ssl():
1000 return
1002 proxy_url = req.proxy
1003 assert proxy_url is not None
1004 if proxy_url.scheme != "https":
1005 return
1007 self._check_loop_for_start_tls()
1009 def _check_loop_for_start_tls(self) -> None:
1010 try:
1011 self._loop.start_tls
1012 except AttributeError as attr_exc:
1013 raise RuntimeError(
1014 "An HTTPS request is being sent through an HTTPS proxy. "
1015 "This needs support for TLS in TLS but it is not implemented "
1016 "in your runtime for the stdlib asyncio.\n\n"
1017 "Please upgrade to Python 3.7 or higher. For more details, "
1018 "please see:\n"
1019 "* https://bugs.python.org/issue37179\n"
1020 "* https://github.com/python/cpython/pull/28073\n"
1021 "* https://docs.aiohttp.org/en/stable/"
1022 "client_advanced.html#proxy-support\n"
1023 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1024 ) from attr_exc
1026 def _loop_supports_start_tls(self) -> bool:
1027 try:
1028 self._check_loop_for_start_tls()
1029 except RuntimeError:
1030 return False
1031 else:
1032 return True
1034 def _warn_about_tls_in_tls(
1035 self,
1036 underlying_transport: asyncio.Transport,
1037 req: "ClientRequest",
1038 ) -> None:
1039 """Issue a warning if the requested URL has HTTPS scheme."""
1040 if req.request_info.url.scheme != "https":
1041 return
1043 asyncio_supports_tls_in_tls = getattr(
1044 underlying_transport,
1045 "_start_tls_compatible",
1046 False,
1047 )
1049 if asyncio_supports_tls_in_tls:
1050 return
1052 warnings.warn(
1053 "An HTTPS request is being sent through an HTTPS proxy. "
1054 "This support for TLS in TLS is known to be disabled "
1055 "in the stdlib asyncio. This is why you'll probably see "
1056 "an error in the log below.\n\n"
1057 "It is possible to enable it via monkeypatching under "
1058 "Python 3.7 or higher. For more details, see:\n"
1059 "* https://bugs.python.org/issue37179\n"
1060 "* https://github.com/python/cpython/pull/28073\n\n"
1061 "You can temporarily patch this as follows:\n"
1062 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
1063 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1064 RuntimeWarning,
1065 source=self,
1066 # Why `4`? At least 3 of the calls in the stack originate
1067 # from the methods in this class.
1068 stacklevel=3,
1069 )
1071 async def _start_tls_connection(
1072 self,
1073 underlying_transport: asyncio.Transport,
1074 req: "ClientRequest",
1075 timeout: "ClientTimeout",
1076 client_error: Type[Exception] = ClientConnectorError,
1077 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1078 """Wrap the raw TCP transport with TLS."""
1079 tls_proto = self._factory() # Create a brand new proto for TLS
1081 # Safety of the `cast()` call here is based on the fact that
1082 # internally `_get_ssl_context()` only returns `None` when
1083 # `req.is_ssl()` evaluates to `False` which is never gonna happen
1084 # in this code path. Of course, it's rather fragile
1085 # maintainability-wise but this is to be solved separately.
1086 sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req))
1088 try:
1089 async with ceil_timeout(timeout.sock_connect):
1090 try:
1091 tls_transport = await self._loop.start_tls(
1092 underlying_transport,
1093 tls_proto,
1094 sslcontext,
1095 server_hostname=req.host,
1096 ssl_handshake_timeout=timeout.total,
1097 )
1098 except BaseException:
1099 # We need to close the underlying transport since
1100 # `start_tls()` probably failed before it had a
1101 # chance to do this:
1102 underlying_transport.close()
1103 raise
1104 except cert_errors as exc:
1105 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1106 except ssl_errors as exc:
1107 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1108 except OSError as exc:
1109 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1110 raise
1111 raise client_error(req.connection_key, exc) from exc
1112 except TypeError as type_err:
1113 # Example cause looks like this:
1114 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
1115 # object at 0x7f760615e460> is not supported by start_tls()
1117 raise ClientConnectionError(
1118 "Cannot initialize a TLS-in-TLS connection to host "
1119 f"{req.host!s}:{req.port:d} through an underlying connection "
1120 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
1121 f"[{type_err!s}]"
1122 ) from type_err
1123 else:
1124 tls_proto.connection_made(
1125 tls_transport
1126 ) # Kick the state machine of the new TLS protocol
1128 return tls_transport, tls_proto
1130 async def _create_direct_connection(
1131 self,
1132 req: "ClientRequest",
1133 traces: List["Trace"],
1134 timeout: "ClientTimeout",
1135 *,
1136 client_error: Type[Exception] = ClientConnectorError,
1137 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1138 sslcontext = self._get_ssl_context(req)
1139 fingerprint = self._get_fingerprint(req)
1141 host = req.url.raw_host
1142 assert host is not None
1143 port = req.port
1144 assert port is not None
1145 host_resolved = asyncio.ensure_future(
1146 self._resolve_host(host, port, traces=traces), loop=self._loop
1147 )
1148 try:
1149 # Cancelling this lookup should not cancel the underlying lookup
1150 # or else the cancel event will get broadcast to all the waiters
1151 # across all connections.
1152 hosts = await asyncio.shield(host_resolved)
1153 except asyncio.CancelledError:
1155 def drop_exception(fut: "asyncio.Future[List[Dict[str, Any]]]") -> None:
1156 with suppress(Exception, asyncio.CancelledError):
1157 fut.result()
1159 host_resolved.add_done_callback(drop_exception)
1160 raise
1161 except OSError as exc:
1162 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1163 raise
1164 # in case of proxy it is not ClientProxyConnectionError
1165 # it is problem of resolving proxy ip itself
1166 raise ClientConnectorError(req.connection_key, exc) from exc
1168 last_exc: Optional[Exception] = None
1170 for hinfo in hosts:
1171 host = hinfo["host"]
1172 port = hinfo["port"]
1174 try:
1175 transp, proto = await self._wrap_create_connection(
1176 self._factory,
1177 host,
1178 port,
1179 timeout=timeout,
1180 ssl=sslcontext,
1181 family=hinfo["family"],
1182 proto=hinfo["proto"],
1183 flags=hinfo["flags"],
1184 server_hostname=hinfo["hostname"] if sslcontext else None,
1185 local_addr=self._local_addr,
1186 req=req,
1187 client_error=client_error,
1188 )
1189 except ClientConnectorError as exc:
1190 last_exc = exc
1191 continue
1193 if req.is_ssl() and fingerprint:
1194 try:
1195 fingerprint.check(transp)
1196 except ServerFingerprintMismatch as exc:
1197 transp.close()
1198 if not self._cleanup_closed_disabled:
1199 self._cleanup_closed_transports.append(transp)
1200 last_exc = exc
1201 continue
1203 return transp, proto
1204 else:
1205 assert last_exc is not None
1206 raise last_exc
1208 async def _create_proxy_connection(
1209 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
1210 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1211 self._fail_on_no_start_tls(req)
1212 runtime_has_start_tls = self._loop_supports_start_tls()
1214 headers: Dict[str, str] = {}
1215 if req.proxy_headers is not None:
1216 headers = req.proxy_headers # type: ignore[assignment]
1217 headers[hdrs.HOST] = req.headers[hdrs.HOST]
1219 url = req.proxy
1220 assert url is not None
1221 proxy_req = ClientRequest(
1222 hdrs.METH_GET,
1223 url,
1224 headers=headers,
1225 auth=req.proxy_auth,
1226 loop=self._loop,
1227 ssl=req.ssl,
1228 )
1230 # create connection to proxy server
1231 transport, proto = await self._create_direct_connection(
1232 proxy_req, [], timeout, client_error=ClientProxyConnectionError
1233 )
1235 # Many HTTP proxies has buggy keepalive support. Let's not
1236 # reuse connection but close it after processing every
1237 # response.
1238 proto.force_close()
1240 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
1241 if auth is not None:
1242 if not req.is_ssl():
1243 req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1244 else:
1245 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1247 if req.is_ssl():
1248 if runtime_has_start_tls:
1249 self._warn_about_tls_in_tls(transport, req)
1251 # For HTTPS requests over HTTP proxy
1252 # we must notify proxy to tunnel connection
1253 # so we send CONNECT command:
1254 # CONNECT www.python.org:443 HTTP/1.1
1255 # Host: www.python.org
1256 #
1257 # next we must do TLS handshake and so on
1258 # to do this we must wrap raw socket into secure one
1259 # asyncio handles this perfectly
1260 proxy_req.method = hdrs.METH_CONNECT
1261 proxy_req.url = req.url
1262 key = attr.evolve(
1263 req.connection_key, proxy=None, proxy_auth=None, proxy_headers_hash=None
1264 )
1265 conn = Connection(self, key, proto, self._loop)
1266 proxy_resp = await proxy_req.send(conn)
1267 try:
1268 protocol = conn._protocol
1269 assert protocol is not None
1271 # read_until_eof=True will ensure the connection isn't closed
1272 # once the response is received and processed allowing
1273 # START_TLS to work on the connection below.
1274 protocol.set_response_params(read_until_eof=runtime_has_start_tls)
1275 resp = await proxy_resp.start(conn)
1276 except BaseException:
1277 proxy_resp.close()
1278 conn.close()
1279 raise
1280 else:
1281 conn._protocol = None
1282 conn._transport = None
1283 try:
1284 if resp.status != 200:
1285 message = resp.reason
1286 if message is None:
1287 message = RESPONSES[resp.status][0]
1288 raise ClientHttpProxyError(
1289 proxy_resp.request_info,
1290 resp.history,
1291 status=resp.status,
1292 message=message,
1293 headers=resp.headers,
1294 )
1295 if not runtime_has_start_tls:
1296 rawsock = transport.get_extra_info("socket", default=None)
1297 if rawsock is None:
1298 raise RuntimeError(
1299 "Transport does not expose socket instance"
1300 )
1301 # Duplicate the socket, so now we can close proxy transport
1302 rawsock = rawsock.dup()
1303 except BaseException:
1304 # It shouldn't be closed in `finally` because it's fed to
1305 # `loop.start_tls()` and the docs say not to touch it after
1306 # passing there.
1307 transport.close()
1308 raise
1309 finally:
1310 if not runtime_has_start_tls:
1311 transport.close()
1313 if not runtime_has_start_tls:
1314 # HTTP proxy with support for upgrade to HTTPS
1315 sslcontext = self._get_ssl_context(req)
1316 return await self._wrap_create_connection(
1317 self._factory,
1318 timeout=timeout,
1319 ssl=sslcontext,
1320 sock=rawsock,
1321 server_hostname=req.host,
1322 req=req,
1323 )
1325 return await self._start_tls_connection(
1326 # Access the old transport for the last time before it's
1327 # closed and forgotten forever:
1328 transport,
1329 req=req,
1330 timeout=timeout,
1331 )
1332 finally:
1333 proxy_resp.close()
1335 return transport, proto
1338class UnixConnector(BaseConnector):
1339 """Unix socket connector.
1341 path - Unix socket path.
1342 keepalive_timeout - (optional) Keep-alive timeout.
1343 force_close - Set to True to force close and do reconnect
1344 after each request (and between redirects).
1345 limit - The total number of simultaneous connections.
1346 limit_per_host - Number of simultaneous connections to one host.
1347 loop - Optional event loop.
1348 """
1350 def __init__(
1351 self,
1352 path: str,
1353 force_close: bool = False,
1354 keepalive_timeout: Union[object, float, None] = sentinel,
1355 limit: int = 100,
1356 limit_per_host: int = 0,
1357 loop: Optional[asyncio.AbstractEventLoop] = None,
1358 ) -> None:
1359 super().__init__(
1360 force_close=force_close,
1361 keepalive_timeout=keepalive_timeout,
1362 limit=limit,
1363 limit_per_host=limit_per_host,
1364 loop=loop,
1365 )
1366 self._path = path
1368 @property
1369 def path(self) -> str:
1370 """Path to unix socket."""
1371 return self._path
1373 async def _create_connection(
1374 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
1375 ) -> ResponseHandler:
1376 try:
1377 async with ceil_timeout(timeout.sock_connect):
1378 _, proto = await self._loop.create_unix_connection(
1379 self._factory, self._path
1380 )
1381 except OSError as exc:
1382 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1383 raise
1384 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
1386 return cast(ResponseHandler, proto)
1389class NamedPipeConnector(BaseConnector):
1390 """Named pipe connector.
1392 Only supported by the proactor event loop.
1393 See also: https://docs.python.org/3.7/library/asyncio-eventloop.html
1395 path - Windows named pipe path.
1396 keepalive_timeout - (optional) Keep-alive timeout.
1397 force_close - Set to True to force close and do reconnect
1398 after each request (and between redirects).
1399 limit - The total number of simultaneous connections.
1400 limit_per_host - Number of simultaneous connections to one host.
1401 loop - Optional event loop.
1402 """
1404 def __init__(
1405 self,
1406 path: str,
1407 force_close: bool = False,
1408 keepalive_timeout: Union[object, float, None] = sentinel,
1409 limit: int = 100,
1410 limit_per_host: int = 0,
1411 loop: Optional[asyncio.AbstractEventLoop] = None,
1412 ) -> None:
1413 super().__init__(
1414 force_close=force_close,
1415 keepalive_timeout=keepalive_timeout,
1416 limit=limit,
1417 limit_per_host=limit_per_host,
1418 loop=loop,
1419 )
1420 if not isinstance(
1421 self._loop, asyncio.ProactorEventLoop # type: ignore[attr-defined]
1422 ):
1423 raise RuntimeError(
1424 "Named Pipes only available in proactor " "loop under windows"
1425 )
1426 self._path = path
1428 @property
1429 def path(self) -> str:
1430 """Path to the named pipe."""
1431 return self._path
1433 async def _create_connection(
1434 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
1435 ) -> ResponseHandler:
1436 try:
1437 async with ceil_timeout(timeout.sock_connect):
1438 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] # noqa: E501
1439 self._factory, self._path
1440 )
1441 # the drain is required so that the connection_made is called
1442 # and transport is set otherwise it is not set before the
1443 # `assert conn.transport is not None`
1444 # in client.py's _request method
1445 await asyncio.sleep(0)
1446 # other option is to manually set transport like
1447 # `proto.transport = trans`
1448 except OSError as exc:
1449 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1450 raise
1451 raise ClientConnectorError(req.connection_key, exc) from exc
1453 return cast(ResponseHandler, proto)