Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/connector.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import functools
3import logging
4import random
5import socket
6import sys
7import traceback
8import warnings
9from collections import OrderedDict, defaultdict, deque
10from contextlib import suppress
11from http import HTTPStatus
12from itertools import chain, cycle, islice
13from time import monotonic
14from types import TracebackType
15from typing import (
16 TYPE_CHECKING,
17 Any,
18 Awaitable,
19 Callable,
20 DefaultDict,
21 Deque,
22 Dict,
23 Iterator,
24 List,
25 Literal,
26 Optional,
27 Sequence,
28 Set,
29 Tuple,
30 Type,
31 Union,
32 cast,
33)
35import aiohappyeyeballs
36from aiohappyeyeballs import AddrInfoType, SocketFactoryType
38from . import hdrs, helpers
39from .abc import AbstractResolver, ResolveResult
40from .client_exceptions import (
41 ClientConnectionError,
42 ClientConnectorCertificateError,
43 ClientConnectorDNSError,
44 ClientConnectorError,
45 ClientConnectorSSLError,
46 ClientHttpProxyError,
47 ClientProxyConnectionError,
48 ServerFingerprintMismatch,
49 UnixClientConnectorError,
50 cert_errors,
51 ssl_errors,
52)
53from .client_proto import ResponseHandler
54from .client_reqrep import SSL_ALLOWED_TYPES, ClientRequest, Fingerprint
55from .helpers import (
56 _SENTINEL,
57 ceil_timeout,
58 is_ip_address,
59 sentinel,
60 set_exception,
61 set_result,
62)
63from .resolver import DefaultResolver
65if sys.version_info >= (3, 12):
66 from collections.abc import Buffer
67else:
68 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
70if TYPE_CHECKING:
71 import ssl
73 SSLContext = ssl.SSLContext
74else:
75 try:
76 import ssl
78 SSLContext = ssl.SSLContext
79 except ImportError: # pragma: no cover
80 ssl = None # type: ignore[assignment]
81 SSLContext = object # type: ignore[misc,assignment]
83EMPTY_SCHEMA_SET = frozenset({""})
84HTTP_SCHEMA_SET = frozenset({"http", "https"})
85WS_SCHEMA_SET = frozenset({"ws", "wss"})
87HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET
88HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET
90NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < (
91 3,
92 13,
93 1,
94) or sys.version_info < (3, 12, 7)
95# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960
96# which first appeared in Python 3.12.7 and 3.13.1
99__all__ = (
100 "BaseConnector",
101 "TCPConnector",
102 "UnixConnector",
103 "NamedPipeConnector",
104 "AddrInfoType",
105 "SocketFactoryType",
106)
109if TYPE_CHECKING:
110 from .client import ClientTimeout
111 from .client_reqrep import ConnectionKey
112 from .tracing import Trace
115class Connection:
116 """Represents a single connection."""
118 __slots__ = (
119 "_key",
120 "_connector",
121 "_loop",
122 "_protocol",
123 "_callbacks",
124 "_source_traceback",
125 )
127 def __init__(
128 self,
129 connector: "BaseConnector",
130 key: "ConnectionKey",
131 protocol: ResponseHandler,
132 loop: asyncio.AbstractEventLoop,
133 ) -> None:
134 self._key = key
135 self._connector = connector
136 self._loop = loop
137 self._protocol: Optional[ResponseHandler] = protocol
138 self._callbacks: List[Callable[[], None]] = []
139 self._source_traceback = (
140 traceback.extract_stack(sys._getframe(1)) if loop.get_debug() else None
141 )
143 def __repr__(self) -> str:
144 return f"Connection<{self._key}>"
146 def __del__(self, _warnings: Any = warnings) -> None:
147 if self._protocol is not None:
148 _warnings.warn(
149 f"Unclosed connection {self!r}", ResourceWarning, source=self
150 )
151 if self._loop.is_closed():
152 return
154 self._connector._release(self._key, self._protocol, should_close=True)
156 context = {"client_connection": self, "message": "Unclosed connection"}
157 if self._source_traceback is not None:
158 context["source_traceback"] = self._source_traceback
159 self._loop.call_exception_handler(context)
161 def __bool__(self) -> Literal[True]:
162 """Force subclasses to not be falsy, to make checks simpler."""
163 return True
165 @property
166 def transport(self) -> Optional[asyncio.Transport]:
167 if self._protocol is None:
168 return None
169 return self._protocol.transport
171 @property
172 def protocol(self) -> Optional[ResponseHandler]:
173 return self._protocol
175 def add_callback(self, callback: Callable[[], None]) -> None:
176 if callback is not None:
177 self._callbacks.append(callback)
179 def _notify_release(self) -> None:
180 callbacks, self._callbacks = self._callbacks[:], []
182 for cb in callbacks:
183 with suppress(Exception):
184 cb()
186 def close(self) -> None:
187 self._notify_release()
189 if self._protocol is not None:
190 self._connector._release(self._key, self._protocol, should_close=True)
191 self._protocol = None
193 def release(self) -> None:
194 self._notify_release()
196 if self._protocol is not None:
197 self._connector._release(self._key, self._protocol)
198 self._protocol = None
200 @property
201 def closed(self) -> bool:
202 return self._protocol is None or not self._protocol.is_connected()
205class _TransportPlaceholder:
206 """placeholder for BaseConnector.connect function"""
208 __slots__ = ("closed",)
210 def __init__(self, closed_future: asyncio.Future[Optional[Exception]]) -> None:
211 """Initialize a placeholder for a transport."""
212 self.closed = closed_future
214 def close(self) -> None:
215 """Close the placeholder."""
218class BaseConnector:
219 """Base connector class.
221 keepalive_timeout - (optional) Keep-alive timeout.
222 force_close - Set to True to force close and do reconnect
223 after each request (and between redirects).
224 limit - The total number of simultaneous connections.
225 limit_per_host - Number of simultaneous connections to one host.
226 enable_cleanup_closed - Enables clean-up closed ssl transports.
227 Disabled by default.
228 timeout_ceil_threshold - Trigger ceiling of timeout values when
229 it's above timeout_ceil_threshold.
230 loop - Optional event loop.
231 """
233 _closed = True # prevent AttributeError in __del__ if ctor was failed
234 _source_traceback = None
236 # abort transport after 2 seconds (cleanup broken connections)
237 _cleanup_closed_period = 2.0
239 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET
241 def __init__(
242 self,
243 *,
244 keepalive_timeout: Union[_SENTINEL, None, float] = sentinel,
245 force_close: bool = False,
246 limit: int = 100,
247 limit_per_host: int = 0,
248 enable_cleanup_closed: bool = False,
249 timeout_ceil_threshold: float = 5,
250 ) -> None:
251 if force_close:
252 if keepalive_timeout is not None and keepalive_timeout is not sentinel:
253 raise ValueError(
254 "keepalive_timeout cannot be set if force_close is True"
255 )
256 else:
257 if keepalive_timeout is sentinel:
258 keepalive_timeout = 15.0
260 self._timeout_ceil_threshold = timeout_ceil_threshold
262 loop = asyncio.get_running_loop()
264 self._closed = False
265 if loop.get_debug():
266 self._source_traceback = traceback.extract_stack(sys._getframe(1))
268 # Connection pool of reusable connections.
269 # We use a deque to store connections because it has O(1) popleft()
270 # and O(1) append() operations to implement a FIFO queue.
271 self._conns: DefaultDict[
272 ConnectionKey, Deque[Tuple[ResponseHandler, float]]
273 ] = defaultdict(deque)
274 self._limit = limit
275 self._limit_per_host = limit_per_host
276 self._acquired: Set[ResponseHandler] = set()
277 self._acquired_per_host: DefaultDict[ConnectionKey, Set[ResponseHandler]] = (
278 defaultdict(set)
279 )
280 self._keepalive_timeout = cast(float, keepalive_timeout)
281 self._force_close = force_close
283 # {host_key: FIFO list of waiters}
284 # The FIFO is implemented with an OrderedDict with None keys because
285 # python does not have an ordered set.
286 self._waiters: DefaultDict[
287 ConnectionKey, OrderedDict[asyncio.Future[None], None]
288 ] = defaultdict(OrderedDict)
290 self._loop = loop
291 self._factory = functools.partial(ResponseHandler, loop=loop)
293 # start keep-alive connection cleanup task
294 self._cleanup_handle: Optional[asyncio.TimerHandle] = None
296 # start cleanup closed transports task
297 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None
299 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED:
300 warnings.warn(
301 "enable_cleanup_closed ignored because "
302 "https://github.com/python/cpython/pull/118960 is fixed "
303 f"in Python version {sys.version_info}",
304 DeprecationWarning,
305 stacklevel=2,
306 )
307 enable_cleanup_closed = False
309 self._cleanup_closed_disabled = not enable_cleanup_closed
310 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = []
312 self._placeholder_future: asyncio.Future[Optional[Exception]] = (
313 loop.create_future()
314 )
315 self._placeholder_future.set_result(None)
316 self._cleanup_closed()
318 def __del__(self, _warnings: Any = warnings) -> None:
319 if self._closed:
320 return
321 if not self._conns:
322 return
324 conns = [repr(c) for c in self._conns.values()]
326 self._close_immediately()
328 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, source=self)
329 context = {
330 "connector": self,
331 "connections": conns,
332 "message": "Unclosed connector",
333 }
334 if self._source_traceback is not None:
335 context["source_traceback"] = self._source_traceback
336 self._loop.call_exception_handler(context)
338 async def __aenter__(self) -> "BaseConnector":
339 return self
341 async def __aexit__(
342 self,
343 exc_type: Optional[Type[BaseException]] = None,
344 exc_value: Optional[BaseException] = None,
345 exc_traceback: Optional[TracebackType] = None,
346 ) -> None:
347 await self.close()
349 @property
350 def force_close(self) -> bool:
351 """Ultimately close connection on releasing if True."""
352 return self._force_close
354 @property
355 def limit(self) -> int:
356 """The total number for simultaneous connections.
358 If limit is 0 the connector has no limit.
359 The default limit size is 100.
360 """
361 return self._limit
363 @property
364 def limit_per_host(self) -> int:
365 """The limit for simultaneous connections to the same endpoint.
367 Endpoints are the same if they are have equal
368 (host, port, is_ssl) triple.
369 """
370 return self._limit_per_host
372 def _cleanup(self) -> None:
373 """Cleanup unused transports."""
374 if self._cleanup_handle:
375 self._cleanup_handle.cancel()
376 # _cleanup_handle should be unset, otherwise _release() will not
377 # recreate it ever!
378 self._cleanup_handle = None
380 now = monotonic()
381 timeout = self._keepalive_timeout
383 if self._conns:
384 connections = defaultdict(deque)
385 deadline = now - timeout
386 for key, conns in self._conns.items():
387 alive: Deque[Tuple[ResponseHandler, float]] = deque()
388 for proto, use_time in conns:
389 if proto.is_connected() and use_time - deadline >= 0:
390 alive.append((proto, use_time))
391 continue
392 transport = proto.transport
393 proto.close()
394 if not self._cleanup_closed_disabled and key.is_ssl:
395 self._cleanup_closed_transports.append(transport)
397 if alive:
398 connections[key] = alive
400 self._conns = connections
402 if self._conns:
403 self._cleanup_handle = helpers.weakref_handle(
404 self,
405 "_cleanup",
406 timeout,
407 self._loop,
408 timeout_ceil_threshold=self._timeout_ceil_threshold,
409 )
411 def _cleanup_closed(self) -> None:
412 """Double confirmation for transport close.
414 Some broken ssl servers may leave socket open without proper close.
415 """
416 if self._cleanup_closed_handle:
417 self._cleanup_closed_handle.cancel()
419 for transport in self._cleanup_closed_transports:
420 if transport is not None:
421 transport.abort()
423 self._cleanup_closed_transports = []
425 if not self._cleanup_closed_disabled:
426 self._cleanup_closed_handle = helpers.weakref_handle(
427 self,
428 "_cleanup_closed",
429 self._cleanup_closed_period,
430 self._loop,
431 timeout_ceil_threshold=self._timeout_ceil_threshold,
432 )
434 async def close(self) -> None:
435 """Close all opened transports."""
436 waiters = self._close_immediately()
437 if waiters:
438 results = await asyncio.gather(*waiters, return_exceptions=True)
439 for res in results:
440 if isinstance(res, Exception):
441 err_msg = "Error while closing connector: " + repr(res)
442 logging.error(err_msg)
444 def _close_immediately(self) -> List[Awaitable[object]]:
445 waiters: List[Awaitable[object]] = []
447 if self._closed:
448 return waiters
450 self._closed = True
452 try:
453 if self._loop.is_closed():
454 return waiters
456 # cancel cleanup task
457 if self._cleanup_handle:
458 self._cleanup_handle.cancel()
460 # cancel cleanup close task
461 if self._cleanup_closed_handle:
462 self._cleanup_closed_handle.cancel()
464 for data in self._conns.values():
465 for proto, t0 in data:
466 proto.close()
467 waiters.append(proto.closed)
469 for proto in self._acquired:
470 proto.close()
471 waiters.append(proto.closed)
473 # TODO (A.Yushovskiy, 24-May-2019) collect transp. closing futures
474 for transport in self._cleanup_closed_transports:
475 if transport is not None:
476 transport.abort()
478 return waiters
480 finally:
481 self._conns.clear()
482 self._acquired.clear()
483 for keyed_waiters in self._waiters.values():
484 for keyed_waiter in keyed_waiters:
485 keyed_waiter.cancel()
486 self._waiters.clear()
487 self._cleanup_handle = None
488 self._cleanup_closed_transports.clear()
489 self._cleanup_closed_handle = None
491 @property
492 def closed(self) -> bool:
493 """Is connector closed.
495 A readonly property.
496 """
497 return self._closed
499 def _available_connections(self, key: "ConnectionKey") -> int:
500 """
501 Return number of available connections.
503 The limit, limit_per_host and the connection key are taken into account.
505 If it returns less than 1 means that there are no connections
506 available.
507 """
508 # check total available connections
509 # If there are no limits, this will always return 1
510 total_remain = 1
512 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0:
513 return total_remain
515 # check limit per host
516 if host_remain := self._limit_per_host:
517 if acquired := self._acquired_per_host.get(key):
518 host_remain -= len(acquired)
519 if total_remain > host_remain:
520 return host_remain
522 return total_remain
524 async def connect(
525 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
526 ) -> Connection:
527 """Get from pool or create new connection."""
528 key = req.connection_key
529 if (conn := await self._get(key, traces)) is not None:
530 # If we do not have to wait and we can get a connection from the pool
531 # we can avoid the timeout ceil logic and directly return the connection
532 return conn
534 async with ceil_timeout(timeout.connect, timeout.ceil_threshold):
535 if self._available_connections(key) <= 0:
536 await self._wait_for_available_connection(key, traces)
537 if (conn := await self._get(key, traces)) is not None:
538 return conn
540 placeholder = cast(
541 ResponseHandler, _TransportPlaceholder(self._placeholder_future)
542 )
543 self._acquired.add(placeholder)
544 if self._limit_per_host:
545 self._acquired_per_host[key].add(placeholder)
547 try:
548 # Traces are done inside the try block to ensure that the
549 # that the placeholder is still cleaned up if an exception
550 # is raised.
551 if traces:
552 for trace in traces:
553 await trace.send_connection_create_start()
554 proto = await self._create_connection(req, traces, timeout)
555 if traces:
556 for trace in traces:
557 await trace.send_connection_create_end()
558 except BaseException:
559 self._release_acquired(key, placeholder)
560 raise
561 else:
562 if self._closed:
563 proto.close()
564 raise ClientConnectionError("Connector is closed.")
566 # The connection was successfully created, drop the placeholder
567 # and add the real connection to the acquired set. There should
568 # be no awaits after the proto is added to the acquired set
569 # to ensure that the connection is not left in the acquired set
570 # on cancellation.
571 self._acquired.remove(placeholder)
572 self._acquired.add(proto)
573 if self._limit_per_host:
574 acquired_per_host = self._acquired_per_host[key]
575 acquired_per_host.remove(placeholder)
576 acquired_per_host.add(proto)
577 return Connection(self, key, proto, self._loop)
579 async def _wait_for_available_connection(
580 self, key: "ConnectionKey", traces: List["Trace"]
581 ) -> None:
582 """Wait for an available connection slot."""
583 # We loop here because there is a race between
584 # the connection limit check and the connection
585 # being acquired. If the connection is acquired
586 # between the check and the await statement, we
587 # need to loop again to check if the connection
588 # slot is still available.
589 attempts = 0
590 while True:
591 fut: asyncio.Future[None] = self._loop.create_future()
592 keyed_waiters = self._waiters[key]
593 keyed_waiters[fut] = None
594 if attempts:
595 # If we have waited before, we need to move the waiter
596 # to the front of the queue as otherwise we might get
597 # starved and hit the timeout.
598 keyed_waiters.move_to_end(fut, last=False)
600 try:
601 # Traces happen in the try block to ensure that the
602 # the waiter is still cleaned up if an exception is raised.
603 if traces:
604 for trace in traces:
605 await trace.send_connection_queued_start()
606 await fut
607 if traces:
608 for trace in traces:
609 await trace.send_connection_queued_end()
610 finally:
611 # pop the waiter from the queue if its still
612 # there and not already removed by _release_waiter
613 keyed_waiters.pop(fut, None)
614 if not self._waiters.get(key, True):
615 del self._waiters[key]
617 if self._available_connections(key) > 0:
618 break
619 attempts += 1
621 async def _get(
622 self, key: "ConnectionKey", traces: List["Trace"]
623 ) -> Optional[Connection]:
624 """Get next reusable connection for the key or None.
626 The connection will be marked as acquired.
627 """
628 if (conns := self._conns.get(key)) is None:
629 return None
631 t1 = monotonic()
632 while conns:
633 proto, t0 = conns.popleft()
634 # We will we reuse the connection if its connected and
635 # the keepalive timeout has not been exceeded
636 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout:
637 if not conns:
638 # The very last connection was reclaimed: drop the key
639 del self._conns[key]
640 self._acquired.add(proto)
641 if self._limit_per_host:
642 self._acquired_per_host[key].add(proto)
643 if traces:
644 for trace in traces:
645 try:
646 await trace.send_connection_reuseconn()
647 except BaseException:
648 self._release_acquired(key, proto)
649 raise
650 return Connection(self, key, proto, self._loop)
652 # Connection cannot be reused, close it
653 transport = proto.transport
654 proto.close()
655 # only for SSL transports
656 if not self._cleanup_closed_disabled and key.is_ssl:
657 self._cleanup_closed_transports.append(transport)
659 # No more connections: drop the key
660 del self._conns[key]
661 return None
663 def _release_waiter(self) -> None:
664 """
665 Iterates over all waiters until one to be released is found.
667 The one to be released is not finished and
668 belongs to a host that has available connections.
669 """
670 if not self._waiters:
671 return
673 # Having the dict keys ordered this avoids to iterate
674 # at the same order at each call.
675 queues = list(self._waiters)
676 random.shuffle(queues)
678 for key in queues:
679 if self._available_connections(key) < 1:
680 continue
682 waiters = self._waiters[key]
683 while waiters:
684 waiter, _ = waiters.popitem(last=False)
685 if not waiter.done():
686 waiter.set_result(None)
687 return
689 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
690 """Release acquired connection."""
691 if self._closed:
692 # acquired connection is already released on connector closing
693 return
695 self._acquired.discard(proto)
696 if self._limit_per_host and (conns := self._acquired_per_host.get(key)):
697 conns.discard(proto)
698 if not conns:
699 del self._acquired_per_host[key]
700 self._release_waiter()
702 def _release(
703 self,
704 key: "ConnectionKey",
705 protocol: ResponseHandler,
706 *,
707 should_close: bool = False,
708 ) -> None:
709 if self._closed:
710 # acquired connection is already released on connector closing
711 return
713 self._release_acquired(key, protocol)
715 if self._force_close or should_close or protocol.should_close:
716 transport = protocol.transport
717 protocol.close()
718 if key.is_ssl and not self._cleanup_closed_disabled:
719 self._cleanup_closed_transports.append(transport)
720 return
722 self._conns[key].append((protocol, monotonic()))
724 if self._cleanup_handle is None:
725 self._cleanup_handle = helpers.weakref_handle(
726 self,
727 "_cleanup",
728 self._keepalive_timeout,
729 self._loop,
730 timeout_ceil_threshold=self._timeout_ceil_threshold,
731 )
733 async def _create_connection(
734 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
735 ) -> ResponseHandler:
736 raise NotImplementedError()
739class _DNSCacheTable:
740 def __init__(self, ttl: Optional[float] = None) -> None:
741 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[ResolveResult], int]] = {}
742 self._timestamps: Dict[Tuple[str, int], float] = {}
743 self._ttl = ttl
745 def __contains__(self, host: object) -> bool:
746 return host in self._addrs_rr
748 def add(self, key: Tuple[str, int], addrs: List[ResolveResult]) -> None:
749 self._addrs_rr[key] = (cycle(addrs), len(addrs))
751 if self._ttl is not None:
752 self._timestamps[key] = monotonic()
754 def remove(self, key: Tuple[str, int]) -> None:
755 self._addrs_rr.pop(key, None)
757 if self._ttl is not None:
758 self._timestamps.pop(key, None)
760 def clear(self) -> None:
761 self._addrs_rr.clear()
762 self._timestamps.clear()
764 def next_addrs(self, key: Tuple[str, int]) -> List[ResolveResult]:
765 loop, length = self._addrs_rr[key]
766 addrs = list(islice(loop, length))
767 # Consume one more element to shift internal state of `cycle`
768 next(loop)
769 return addrs
771 def expired(self, key: Tuple[str, int]) -> bool:
772 if self._ttl is None:
773 return False
775 return self._timestamps[key] + self._ttl < monotonic()
778def _make_ssl_context(verified: bool) -> SSLContext:
779 """Create SSL context.
781 This method is not async-friendly and should be called from a thread
782 because it will load certificates from disk and do other blocking I/O.
783 """
784 if ssl is None:
785 # No ssl support
786 return None # type: ignore[unreachable]
787 if verified:
788 sslcontext = ssl.create_default_context()
789 else:
790 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
791 sslcontext.options |= ssl.OP_NO_SSLv2
792 sslcontext.options |= ssl.OP_NO_SSLv3
793 sslcontext.check_hostname = False
794 sslcontext.verify_mode = ssl.CERT_NONE
795 sslcontext.options |= ssl.OP_NO_COMPRESSION
796 sslcontext.set_default_verify_paths()
797 sslcontext.set_alpn_protocols(("http/1.1",))
798 return sslcontext
801# The default SSLContext objects are created at import time
802# since they do blocking I/O to load certificates from disk,
803# and imports should always be done before the event loop starts
804# or in a thread.
805_SSL_CONTEXT_VERIFIED = _make_ssl_context(True)
806_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False)
809class TCPConnector(BaseConnector):
810 """TCP connector.
812 verify_ssl - Set to True to check ssl certifications.
813 fingerprint - Pass the binary sha256
814 digest of the expected certificate in DER format to verify
815 that the certificate the server presents matches. See also
816 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning
817 resolver - Enable DNS lookups and use this
818 resolver
819 use_dns_cache - Use memory cache for DNS lookups.
820 ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
821 family - socket address family
822 local_addr - local tuple of (host, port) to bind socket to
824 keepalive_timeout - (optional) Keep-alive timeout.
825 force_close - Set to True to force close and do reconnect
826 after each request (and between redirects).
827 limit - The total number of simultaneous connections.
828 limit_per_host - Number of simultaneous connections to one host.
829 enable_cleanup_closed - Enables clean-up closed ssl transports.
830 Disabled by default.
831 happy_eyeballs_delay - This is the “Connection Attempt Delay”
832 as defined in RFC 8305. To disable
833 the happy eyeballs algorithm, set to None.
834 interleave - “First Address Family Count” as defined in RFC 8305
835 loop - Optional event loop.
836 socket_factory - A SocketFactoryType function that, if supplied,
837 will be used to create sockets given an
838 AddrInfoType.
839 """
841 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"})
843 def __init__(
844 self,
845 *,
846 use_dns_cache: bool = True,
847 ttl_dns_cache: Optional[int] = 10,
848 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC,
849 ssl: Union[bool, Fingerprint, SSLContext] = True,
850 local_addr: Optional[Tuple[str, int]] = None,
851 resolver: Optional[AbstractResolver] = None,
852 keepalive_timeout: Union[None, float, _SENTINEL] = sentinel,
853 force_close: bool = False,
854 limit: int = 100,
855 limit_per_host: int = 0,
856 enable_cleanup_closed: bool = False,
857 timeout_ceil_threshold: float = 5,
858 happy_eyeballs_delay: Optional[float] = 0.25,
859 interleave: Optional[int] = None,
860 socket_factory: Optional[SocketFactoryType] = None,
861 ):
862 super().__init__(
863 keepalive_timeout=keepalive_timeout,
864 force_close=force_close,
865 limit=limit,
866 limit_per_host=limit_per_host,
867 enable_cleanup_closed=enable_cleanup_closed,
868 timeout_ceil_threshold=timeout_ceil_threshold,
869 )
871 if not isinstance(ssl, SSL_ALLOWED_TYPES):
872 raise TypeError(
873 "ssl should be SSLContext, Fingerprint, or bool, "
874 "got {!r} instead.".format(ssl)
875 )
876 self._ssl = ssl
877 if resolver is None:
878 resolver = DefaultResolver()
879 self._resolver: AbstractResolver = resolver
881 self._use_dns_cache = use_dns_cache
882 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)
883 self._throttle_dns_futures: Dict[Tuple[str, int], Set[asyncio.Future[None]]] = (
884 {}
885 )
886 self._family = family
887 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr)
888 self._happy_eyeballs_delay = happy_eyeballs_delay
889 self._interleave = interleave
890 self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set()
891 self._socket_factory = socket_factory
893 def _close_immediately(self) -> List[Awaitable[object]]:
894 for fut in chain.from_iterable(self._throttle_dns_futures.values()):
895 fut.cancel()
897 waiters = super()._close_immediately()
899 for t in self._resolve_host_tasks:
900 t.cancel()
901 waiters.append(t)
903 return waiters
905 @property
906 def family(self) -> int:
907 """Socket family like AF_INET."""
908 return self._family
910 @property
911 def use_dns_cache(self) -> bool:
912 """True if local DNS caching is enabled."""
913 return self._use_dns_cache
915 def clear_dns_cache(
916 self, host: Optional[str] = None, port: Optional[int] = None
917 ) -> None:
918 """Remove specified host/port or clear all dns local cache."""
919 if host is not None and port is not None:
920 self._cached_hosts.remove((host, port))
921 elif host is not None or port is not None:
922 raise ValueError("either both host and port or none of them are allowed")
923 else:
924 self._cached_hosts.clear()
926 async def _resolve_host(
927 self, host: str, port: int, traces: Optional[Sequence["Trace"]] = None
928 ) -> List[ResolveResult]:
929 """Resolve host and return list of addresses."""
930 if is_ip_address(host):
931 return [
932 {
933 "hostname": host,
934 "host": host,
935 "port": port,
936 "family": self._family,
937 "proto": 0,
938 "flags": 0,
939 }
940 ]
942 if not self._use_dns_cache:
943 if traces:
944 for trace in traces:
945 await trace.send_dns_resolvehost_start(host)
947 res = await self._resolver.resolve(host, port, family=self._family)
949 if traces:
950 for trace in traces:
951 await trace.send_dns_resolvehost_end(host)
953 return res
955 key = (host, port)
956 if key in self._cached_hosts and not self._cached_hosts.expired(key):
957 # get result early, before any await (#4014)
958 result = self._cached_hosts.next_addrs(key)
960 if traces:
961 for trace in traces:
962 await trace.send_dns_cache_hit(host)
963 return result
965 futures: Set[asyncio.Future[None]]
966 #
967 # If multiple connectors are resolving the same host, we wait
968 # for the first one to resolve and then use the result for all of them.
969 # We use a throttle to ensure that we only resolve the host once
970 # and then use the result for all the waiters.
971 #
972 if key in self._throttle_dns_futures:
973 # get futures early, before any await (#4014)
974 futures = self._throttle_dns_futures[key]
975 future: asyncio.Future[None] = self._loop.create_future()
976 futures.add(future)
977 if traces:
978 for trace in traces:
979 await trace.send_dns_cache_hit(host)
980 try:
981 await future
982 finally:
983 futures.discard(future)
984 return self._cached_hosts.next_addrs(key)
986 # update dict early, before any await (#4014)
987 self._throttle_dns_futures[key] = futures = set()
988 # In this case we need to create a task to ensure that we can shield
989 # the task from cancellation as cancelling this lookup should not cancel
990 # the underlying lookup or else the cancel event will get broadcast to
991 # all the waiters across all connections.
992 #
993 coro = self._resolve_host_with_throttle(key, host, port, futures, traces)
994 loop = asyncio.get_running_loop()
995 if sys.version_info >= (3, 12):
996 # Optimization for Python 3.12, try to send immediately
997 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True)
998 else:
999 resolved_host_task = loop.create_task(coro)
1001 if not resolved_host_task.done():
1002 self._resolve_host_tasks.add(resolved_host_task)
1003 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard)
1005 try:
1006 return await asyncio.shield(resolved_host_task)
1007 except asyncio.CancelledError:
1009 def drop_exception(fut: "asyncio.Future[List[ResolveResult]]") -> None:
1010 with suppress(Exception, asyncio.CancelledError):
1011 fut.result()
1013 resolved_host_task.add_done_callback(drop_exception)
1014 raise
1016 async def _resolve_host_with_throttle(
1017 self,
1018 key: Tuple[str, int],
1019 host: str,
1020 port: int,
1021 futures: Set[asyncio.Future[None]],
1022 traces: Optional[Sequence["Trace"]],
1023 ) -> List[ResolveResult]:
1024 """Resolve host and set result for all waiters.
1026 This method must be run in a task and shielded from cancellation
1027 to avoid cancelling the underlying lookup.
1028 """
1029 try:
1030 if traces:
1031 for trace in traces:
1032 await trace.send_dns_cache_miss(host)
1034 for trace in traces:
1035 await trace.send_dns_resolvehost_start(host)
1037 addrs = await self._resolver.resolve(host, port, family=self._family)
1038 if traces:
1039 for trace in traces:
1040 await trace.send_dns_resolvehost_end(host)
1042 self._cached_hosts.add(key, addrs)
1043 for fut in futures:
1044 set_result(fut, None)
1045 except BaseException as e:
1046 # any DNS exception is set for the waiters to raise the same exception.
1047 # This coro is always run in task that is shielded from cancellation so
1048 # we should never be propagating cancellation here.
1049 for fut in futures:
1050 set_exception(fut, e)
1051 raise
1052 finally:
1053 self._throttle_dns_futures.pop(key)
1055 return self._cached_hosts.next_addrs(key)
1057 async def _create_connection(
1058 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1059 ) -> ResponseHandler:
1060 """Create connection.
1062 Has same keyword arguments as BaseEventLoop.create_connection.
1063 """
1064 if req.proxy:
1065 _, proto = await self._create_proxy_connection(req, traces, timeout)
1066 else:
1067 _, proto = await self._create_direct_connection(req, traces, timeout)
1069 return proto
1071 def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]:
1072 """Logic to get the correct SSL context
1074 0. if req.ssl is false, return None
1076 1. if ssl_context is specified in req, use it
1077 2. if _ssl_context is specified in self, use it
1078 3. otherwise:
1079 1. if verify_ssl is not specified in req, use self.ssl_context
1080 (will generate a default context according to self.verify_ssl)
1081 2. if verify_ssl is True in req, generate a default SSL context
1082 3. if verify_ssl is False in req, generate a SSL context that
1083 won't verify
1084 """
1085 if not req.is_ssl():
1086 return None
1088 if ssl is None: # pragma: no cover
1089 raise RuntimeError("SSL is not supported.")
1090 sslcontext = req.ssl
1091 if isinstance(sslcontext, ssl.SSLContext):
1092 return sslcontext
1093 if sslcontext is not True:
1094 # not verified or fingerprinted
1095 return _SSL_CONTEXT_UNVERIFIED
1096 sslcontext = self._ssl
1097 if isinstance(sslcontext, ssl.SSLContext):
1098 return sslcontext
1099 if sslcontext is not True:
1100 # not verified or fingerprinted
1101 return _SSL_CONTEXT_UNVERIFIED
1102 return _SSL_CONTEXT_VERIFIED
1104 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]:
1105 ret = req.ssl
1106 if isinstance(ret, Fingerprint):
1107 return ret
1108 ret = self._ssl
1109 if isinstance(ret, Fingerprint):
1110 return ret
1111 return None
1113 async def _wrap_create_connection(
1114 self,
1115 *args: Any,
1116 addr_infos: List[AddrInfoType],
1117 req: ClientRequest,
1118 timeout: "ClientTimeout",
1119 client_error: Type[Exception] = ClientConnectorError,
1120 **kwargs: Any,
1121 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1122 try:
1123 async with ceil_timeout(
1124 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1125 ):
1126 sock = await aiohappyeyeballs.start_connection(
1127 addr_infos=addr_infos,
1128 local_addr_infos=self._local_addr_infos,
1129 happy_eyeballs_delay=self._happy_eyeballs_delay,
1130 interleave=self._interleave,
1131 loop=self._loop,
1132 socket_factory=self._socket_factory,
1133 )
1134 return await self._loop.create_connection(*args, **kwargs, sock=sock)
1135 except cert_errors as exc:
1136 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1137 except ssl_errors as exc:
1138 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1139 except OSError as exc:
1140 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1141 raise
1142 raise client_error(req.connection_key, exc) from exc
1144 def _warn_about_tls_in_tls(
1145 self,
1146 underlying_transport: asyncio.Transport,
1147 req: ClientRequest,
1148 ) -> None:
1149 """Issue a warning if the requested URL has HTTPS scheme."""
1150 if req.request_info.url.scheme != "https":
1151 return
1153 # Check if uvloop is being used, which supports TLS in TLS,
1154 # otherwise assume that asyncio's native transport is being used.
1155 if type(underlying_transport).__module__.startswith("uvloop"):
1156 return
1158 # Support in asyncio was added in Python 3.11 (bpo-44011)
1159 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr(
1160 underlying_transport,
1161 "_start_tls_compatible",
1162 False,
1163 )
1165 if asyncio_supports_tls_in_tls:
1166 return
1168 warnings.warn(
1169 "An HTTPS request is being sent through an HTTPS proxy. "
1170 "This support for TLS in TLS is known to be disabled "
1171 "in the stdlib asyncio. This is why you'll probably see "
1172 "an error in the log below.\n\n"
1173 "It is possible to enable it via monkeypatching. "
1174 "For more details, see:\n"
1175 "* https://bugs.python.org/issue37179\n"
1176 "* https://github.com/python/cpython/pull/28073\n\n"
1177 "You can temporarily patch this as follows:\n"
1178 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
1179 "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
1180 RuntimeWarning,
1181 source=self,
1182 # Why `4`? At least 3 of the calls in the stack originate
1183 # from the methods in this class.
1184 stacklevel=3,
1185 )
1187 async def _start_tls_connection(
1188 self,
1189 underlying_transport: asyncio.Transport,
1190 req: ClientRequest,
1191 timeout: "ClientTimeout",
1192 client_error: Type[Exception] = ClientConnectorError,
1193 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1194 """Wrap the raw TCP transport with TLS."""
1195 tls_proto = self._factory() # Create a brand new proto for TLS
1196 sslcontext = self._get_ssl_context(req)
1197 if TYPE_CHECKING:
1198 # _start_tls_connection is unreachable in the current code path
1199 # if sslcontext is None.
1200 assert sslcontext is not None
1202 try:
1203 async with ceil_timeout(
1204 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1205 ):
1206 try:
1207 tls_transport = await self._loop.start_tls(
1208 underlying_transport,
1209 tls_proto,
1210 sslcontext,
1211 server_hostname=req.server_hostname or req.host,
1212 ssl_handshake_timeout=timeout.total,
1213 )
1214 except BaseException:
1215 # We need to close the underlying transport since
1216 # `start_tls()` probably failed before it had a
1217 # chance to do this:
1218 underlying_transport.close()
1219 raise
1220 if isinstance(tls_transport, asyncio.Transport):
1221 fingerprint = self._get_fingerprint(req)
1222 if fingerprint:
1223 try:
1224 fingerprint.check(tls_transport)
1225 except ServerFingerprintMismatch:
1226 tls_transport.close()
1227 if not self._cleanup_closed_disabled:
1228 self._cleanup_closed_transports.append(tls_transport)
1229 raise
1230 except cert_errors as exc:
1231 raise ClientConnectorCertificateError(req.connection_key, exc) from exc
1232 except ssl_errors as exc:
1233 raise ClientConnectorSSLError(req.connection_key, exc) from exc
1234 except OSError as exc:
1235 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1236 raise
1237 raise client_error(req.connection_key, exc) from exc
1238 except TypeError as type_err:
1239 # Example cause looks like this:
1240 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
1241 # object at 0x7f760615e460> is not supported by start_tls()
1243 raise ClientConnectionError(
1244 "Cannot initialize a TLS-in-TLS connection to host "
1245 f"{req.host!s}:{req.port:d} through an underlying connection "
1246 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
1247 f"[{type_err!s}]"
1248 ) from type_err
1249 else:
1250 if tls_transport is None:
1251 msg = "Failed to start TLS (possibly caused by closing transport)"
1252 raise client_error(req.connection_key, OSError(msg))
1253 tls_proto.connection_made(
1254 tls_transport
1255 ) # Kick the state machine of the new TLS protocol
1257 return tls_transport, tls_proto
1259 def _convert_hosts_to_addr_infos(
1260 self, hosts: List[ResolveResult]
1261 ) -> List[AddrInfoType]:
1262 """Converts the list of hosts to a list of addr_infos.
1264 The list of hosts is the result of a DNS lookup. The list of
1265 addr_infos is the result of a call to `socket.getaddrinfo()`.
1266 """
1267 addr_infos: List[AddrInfoType] = []
1268 for hinfo in hosts:
1269 host = hinfo["host"]
1270 is_ipv6 = ":" in host
1271 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET
1272 if self._family and self._family != family:
1273 continue
1274 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"])
1275 addr_infos.append(
1276 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)
1277 )
1278 return addr_infos
1280 async def _create_direct_connection(
1281 self,
1282 req: ClientRequest,
1283 traces: List["Trace"],
1284 timeout: "ClientTimeout",
1285 *,
1286 client_error: Type[Exception] = ClientConnectorError,
1287 ) -> Tuple[asyncio.Transport, ResponseHandler]:
1288 sslcontext = self._get_ssl_context(req)
1289 fingerprint = self._get_fingerprint(req)
1291 host = req.url.raw_host
1292 assert host is not None
1293 # Replace multiple trailing dots with a single one.
1294 # A trailing dot is only present for fully-qualified domain names.
1295 # See https://github.com/aio-libs/aiohttp/pull/7364.
1296 if host.endswith(".."):
1297 host = host.rstrip(".") + "."
1298 port = req.port
1299 assert port is not None
1300 try:
1301 # Cancelling this lookup should not cancel the underlying lookup
1302 # or else the cancel event will get broadcast to all the waiters
1303 # across all connections.
1304 hosts = await self._resolve_host(host, port, traces=traces)
1305 except OSError as exc:
1306 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1307 raise
1308 # in case of proxy it is not ClientProxyConnectionError
1309 # it is problem of resolving proxy ip itself
1310 raise ClientConnectorDNSError(req.connection_key, exc) from exc
1312 last_exc: Optional[Exception] = None
1313 addr_infos = self._convert_hosts_to_addr_infos(hosts)
1314 while addr_infos:
1315 # Strip trailing dots, certificates contain FQDN without dots.
1316 # See https://github.com/aio-libs/aiohttp/issues/3636
1317 server_hostname = (
1318 (req.server_hostname or host).rstrip(".") if sslcontext else None
1319 )
1321 try:
1322 transp, proto = await self._wrap_create_connection(
1323 self._factory,
1324 timeout=timeout,
1325 ssl=sslcontext,
1326 addr_infos=addr_infos,
1327 server_hostname=server_hostname,
1328 req=req,
1329 client_error=client_error,
1330 )
1331 except (ClientConnectorError, asyncio.TimeoutError) as exc:
1332 last_exc = exc
1333 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave)
1334 continue
1336 if req.is_ssl() and fingerprint:
1337 try:
1338 fingerprint.check(transp)
1339 except ServerFingerprintMismatch as exc:
1340 transp.close()
1341 if not self._cleanup_closed_disabled:
1342 self._cleanup_closed_transports.append(transp)
1343 last_exc = exc
1344 # Remove the bad peer from the list of addr_infos
1345 sock: socket.socket = transp.get_extra_info("socket")
1346 bad_peer = sock.getpeername()
1347 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer)
1348 continue
1350 return transp, proto
1351 assert last_exc is not None
1352 raise last_exc
1354 async def _create_proxy_connection(
1355 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1356 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
1357 headers: Dict[str, str] = {}
1358 if req.proxy_headers is not None:
1359 headers = req.proxy_headers # type: ignore[assignment]
1360 headers[hdrs.HOST] = req.headers[hdrs.HOST]
1362 url = req.proxy
1363 assert url is not None
1364 proxy_req = ClientRequest(
1365 hdrs.METH_GET,
1366 url,
1367 headers=headers,
1368 auth=req.proxy_auth,
1369 loop=self._loop,
1370 ssl=req.ssl,
1371 )
1373 # create connection to proxy server
1374 transport, proto = await self._create_direct_connection(
1375 proxy_req, [], timeout, client_error=ClientProxyConnectionError
1376 )
1378 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
1379 if auth is not None:
1380 if not req.is_ssl():
1381 req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1382 else:
1383 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
1385 if req.is_ssl():
1386 self._warn_about_tls_in_tls(transport, req)
1388 # For HTTPS requests over HTTP proxy
1389 # we must notify proxy to tunnel connection
1390 # so we send CONNECT command:
1391 # CONNECT www.python.org:443 HTTP/1.1
1392 # Host: www.python.org
1393 #
1394 # next we must do TLS handshake and so on
1395 # to do this we must wrap raw socket into secure one
1396 # asyncio handles this perfectly
1397 proxy_req.method = hdrs.METH_CONNECT
1398 proxy_req.url = req.url
1399 key = req.connection_key._replace(
1400 proxy=None, proxy_auth=None, proxy_headers_hash=None
1401 )
1402 conn = Connection(self, key, proto, self._loop)
1403 proxy_resp = await proxy_req.send(conn)
1404 try:
1405 protocol = conn._protocol
1406 assert protocol is not None
1408 # read_until_eof=True will ensure the connection isn't closed
1409 # once the response is received and processed allowing
1410 # START_TLS to work on the connection below.
1411 protocol.set_response_params(
1412 read_until_eof=True,
1413 timeout_ceil_threshold=self._timeout_ceil_threshold,
1414 )
1415 resp = await proxy_resp.start(conn)
1416 except BaseException:
1417 proxy_resp.close()
1418 conn.close()
1419 raise
1420 else:
1421 conn._protocol = None
1422 try:
1423 if resp.status != 200:
1424 message = resp.reason
1425 if message is None:
1426 message = HTTPStatus(resp.status).phrase
1427 raise ClientHttpProxyError(
1428 proxy_resp.request_info,
1429 resp.history,
1430 status=resp.status,
1431 message=message,
1432 headers=resp.headers,
1433 )
1434 except BaseException:
1435 # It shouldn't be closed in `finally` because it's fed to
1436 # `loop.start_tls()` and the docs say not to touch it after
1437 # passing there.
1438 transport.close()
1439 raise
1441 return await self._start_tls_connection(
1442 # Access the old transport for the last time before it's
1443 # closed and forgotten forever:
1444 transport,
1445 req=req,
1446 timeout=timeout,
1447 )
1448 finally:
1449 proxy_resp.close()
1451 return transport, proto
1454class UnixConnector(BaseConnector):
1455 """Unix socket connector.
1457 path - Unix socket path.
1458 keepalive_timeout - (optional) Keep-alive timeout.
1459 force_close - Set to True to force close and do reconnect
1460 after each request (and between redirects).
1461 limit - The total number of simultaneous connections.
1462 limit_per_host - Number of simultaneous connections to one host.
1463 loop - Optional event loop.
1464 """
1466 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"})
1468 def __init__(
1469 self,
1470 path: str,
1471 force_close: bool = False,
1472 keepalive_timeout: Union[_SENTINEL, float, None] = sentinel,
1473 limit: int = 100,
1474 limit_per_host: int = 0,
1475 ) -> None:
1476 super().__init__(
1477 force_close=force_close,
1478 keepalive_timeout=keepalive_timeout,
1479 limit=limit,
1480 limit_per_host=limit_per_host,
1481 )
1482 self._path = path
1484 @property
1485 def path(self) -> str:
1486 """Path to unix socket."""
1487 return self._path
1489 async def _create_connection(
1490 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1491 ) -> ResponseHandler:
1492 try:
1493 async with ceil_timeout(
1494 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1495 ):
1496 _, proto = await self._loop.create_unix_connection(
1497 self._factory, self._path
1498 )
1499 except OSError as exc:
1500 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1501 raise
1502 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
1504 return proto
1507class NamedPipeConnector(BaseConnector):
1508 """Named pipe connector.
1510 Only supported by the proactor event loop.
1511 See also: https://docs.python.org/3/library/asyncio-eventloop.html
1513 path - Windows named pipe path.
1514 keepalive_timeout - (optional) Keep-alive timeout.
1515 force_close - Set to True to force close and do reconnect
1516 after each request (and between redirects).
1517 limit - The total number of simultaneous connections.
1518 limit_per_host - Number of simultaneous connections to one host.
1519 loop - Optional event loop.
1520 """
1522 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"})
1524 def __init__(
1525 self,
1526 path: str,
1527 force_close: bool = False,
1528 keepalive_timeout: Union[_SENTINEL, float, None] = sentinel,
1529 limit: int = 100,
1530 limit_per_host: int = 0,
1531 ) -> None:
1532 super().__init__(
1533 force_close=force_close,
1534 keepalive_timeout=keepalive_timeout,
1535 limit=limit,
1536 limit_per_host=limit_per_host,
1537 )
1538 if not isinstance(
1539 self._loop, asyncio.ProactorEventLoop # type: ignore[attr-defined]
1540 ):
1541 raise RuntimeError(
1542 "Named Pipes only available in proactor loop under windows"
1543 )
1544 self._path = path
1546 @property
1547 def path(self) -> str:
1548 """Path to the named pipe."""
1549 return self._path
1551 async def _create_connection(
1552 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
1553 ) -> ResponseHandler:
1554 try:
1555 async with ceil_timeout(
1556 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
1557 ):
1558 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined]
1559 self._factory, self._path
1560 )
1561 # the drain is required so that the connection_made is called
1562 # and transport is set otherwise it is not set before the
1563 # `assert conn.transport is not None`
1564 # in client.py's _request method
1565 await asyncio.sleep(0)
1566 # other option is to manually set transport like
1567 # `proto.transport = trans`
1568 except OSError as exc:
1569 if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
1570 raise
1571 raise ClientConnectorError(req.connection_key, exc) from exc
1573 return cast(ResponseHandler, proto)