Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohappyeyeballs/impl.py: 13%
86 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
1"""Base implementation."""
2import asyncio
3import collections
4import functools
5import itertools
6import socket
7from asyncio import staggered
8from typing import List, Optional, Sequence
10from .types import AddrInfoType
13async def start_connection(
14 addr_infos: Sequence[AddrInfoType],
15 *,
16 local_addr_infos: Optional[Sequence[AddrInfoType]] = None,
17 happy_eyeballs_delay: Optional[float] = None,
18 interleave: Optional[int] = None,
19 loop: Optional[asyncio.AbstractEventLoop] = None,
20) -> socket.socket:
21 """
22 Connect to a TCP server.
24 Create a socket connection to a specified destination. The
25 destination is specified as a list of AddrInfoType tuples as
26 returned from getaddrinfo().
28 The arguments are, in order:
30 * ``family``: the address family, e.g. ``socket.AF_INET`` or
31 ``socket.AF_INET6``.
32 * ``type``: the socket type, e.g. ``socket.SOCK_STREAM`` or
33 ``socket.SOCK_DGRAM``.
34 * ``proto``: the protocol, e.g. ``socket.IPPROTO_TCP`` or
35 ``socket.IPPROTO_UDP``.
36 * ``canonname``: the canonical name of the address, e.g.
37 ``"www.python.org"``.
38 * ``sockaddr``: the socket address
40 This method is a coroutine which will try to establish the connection
41 in the background. When successful, the coroutine returns a
42 socket.
44 The expected use case is to use this method in conjunction with
45 loop.create_connection() to establish a connection to a server::
47 socket = await start_connection(addr_infos)
48 transport, protocol = await loop.create_connection(
49 MyProtocol, sock=socket, ...)
50 """
51 if not (current_loop := loop):
52 current_loop = asyncio.get_running_loop()
54 single_addr_info = len(addr_infos) == 1
56 if happy_eyeballs_delay is not None and interleave is None:
57 # If using happy eyeballs, default to interleave addresses by family
58 interleave = 1
60 if interleave and not single_addr_info:
61 addr_infos = _interleave_addrinfos(addr_infos, interleave)
63 sock: Optional[socket.socket] = None
64 exceptions: List[List[Exception]] = []
65 if happy_eyeballs_delay is None or single_addr_info:
66 # not using happy eyeballs
67 for addrinfo in addr_infos:
68 try:
69 sock = await _connect_sock(
70 current_loop, exceptions, addrinfo, local_addr_infos
71 )
72 break
73 except OSError:
74 continue
75 else: # using happy eyeballs
76 sock, _, _ = await staggered.staggered_race(
77 (
78 functools.partial(
79 _connect_sock, current_loop, exceptions, addrinfo, local_addr_infos
80 )
81 for addrinfo in addr_infos
82 ),
83 happy_eyeballs_delay,
84 loop=current_loop,
85 )
87 if sock is None:
88 all_exceptions = [exc for sub in exceptions for exc in sub]
89 try:
90 if len(all_exceptions) == 1:
91 raise all_exceptions[0]
92 else:
93 # If they all have the same str(), raise one.
94 model = str(all_exceptions[0])
95 if all(str(exc) == model for exc in all_exceptions):
96 raise all_exceptions[0]
97 # Raise a combined exception so the user can see all
98 # the various error messages.
99 raise OSError(
100 "Multiple exceptions: {}".format(
101 ", ".join(str(exc) for exc in all_exceptions)
102 )
103 )
104 finally:
105 all_exceptions = None # type: ignore[assignment]
106 exceptions = None # type: ignore[assignment]
108 return sock
111async def _connect_sock(
112 loop: asyncio.AbstractEventLoop,
113 exceptions: List[List[Exception]],
114 addr_info: AddrInfoType,
115 local_addr_infos: Optional[Sequence[AddrInfoType]] = None,
116) -> socket.socket:
117 """Create, bind and connect one socket."""
118 my_exceptions: list[Exception] = []
119 exceptions.append(my_exceptions)
120 family, type_, proto, _, address = addr_info
121 sock = None
122 try:
123 sock = socket.socket(family=family, type=type_, proto=proto)
124 sock.setblocking(False)
125 if local_addr_infos is not None:
126 for lfamily, _, _, _, laddr in local_addr_infos:
127 # skip local addresses of different family
128 if lfamily != family:
129 continue
130 try:
131 sock.bind(laddr)
132 break
133 except OSError as exc:
134 msg = (
135 f"error while attempting to bind on "
136 f"address {laddr!r}: "
137 f"{exc.strerror.lower()}"
138 )
139 exc = OSError(exc.errno, msg)
140 my_exceptions.append(exc)
141 else: # all bind attempts failed
142 if my_exceptions:
143 raise my_exceptions.pop()
144 else:
145 raise OSError(f"no matching local address with {family=} found")
146 await loop.sock_connect(sock, address)
147 return sock
148 except OSError as exc:
149 my_exceptions.append(exc)
150 if sock is not None:
151 sock.close()
152 raise
153 except:
154 if sock is not None:
155 sock.close()
156 raise
157 finally:
158 exceptions = my_exceptions = None # type: ignore[assignment]
161def _interleave_addrinfos(
162 addrinfos: Sequence[AddrInfoType], first_address_family_count: int = 1
163) -> List[AddrInfoType]:
164 """Interleave list of addrinfo tuples by family."""
165 # Group addresses by family
166 addrinfos_by_family: collections.OrderedDict[
167 int, List[AddrInfoType]
168 ] = collections.OrderedDict()
169 for addr in addrinfos:
170 family = addr[0]
171 if family not in addrinfos_by_family:
172 addrinfos_by_family[family] = []
173 addrinfos_by_family[family].append(addr)
174 addrinfos_lists = list(addrinfos_by_family.values())
176 reordered: List[AddrInfoType] = []
177 if first_address_family_count > 1:
178 reordered.extend(addrinfos_lists[0][: first_address_family_count - 1])
179 del addrinfos_lists[0][: first_address_family_count - 1]
180 reordered.extend(
181 a
182 for a in itertools.chain.from_iterable(itertools.zip_longest(*addrinfos_lists))
183 if a is not None
184 )
185 return reordered