1"""Utility functions for aiohappyeyeballs."""
2
3import ipaddress
4import socket
5
6from .types import AddrInfoType
7
8
9def addr_to_addr_infos(
10 addr: tuple[str, int, int, int] | tuple[str, int, int] | tuple[str, int] | None,
11) -> list[AddrInfoType] | None:
12 """Convert an address tuple to a list of addr_info tuples."""
13 if addr is None:
14 return None
15 host = addr[0]
16 port = addr[1]
17 is_ipv6 = ":" in host
18 if is_ipv6:
19 flowinfo = 0
20 scopeid = 0
21 addr_len = len(addr)
22 if addr_len >= 4:
23 scopeid = addr[3] # type: ignore[misc]
24 if addr_len >= 3:
25 flowinfo = addr[2] # type: ignore[misc]
26 addr = (host, port, flowinfo, scopeid)
27 family = socket.AF_INET6
28 else:
29 addr = (host, port)
30 family = socket.AF_INET
31 return [(family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)]
32
33
34def pop_addr_infos_interleave(
35 addr_infos: list[AddrInfoType], interleave: int | None = None
36) -> None:
37 """
38 Pop addr_info from the list of addr_infos by family up to interleave times.
39
40 The interleave parameter is used to know how many addr_infos for
41 each family should be popped of the top of the list.
42 """
43 seen: dict[int, int] = {}
44 if interleave is None:
45 interleave = 1
46 to_remove: list[AddrInfoType] = []
47 for addr_info in addr_infos:
48 family = addr_info[0]
49 if family not in seen:
50 seen[family] = 0
51 if seen[family] < interleave:
52 to_remove.append(addr_info)
53 seen[family] += 1
54 for addr_info in to_remove:
55 addr_infos.remove(addr_info)
56
57
58def _addr_tuple_to_ip_address(
59 addr: tuple[str, int] | tuple[str, int, int, int],
60) -> tuple[ipaddress.IPv4Address, int] | tuple[ipaddress.IPv6Address, int, int, int]:
61 """Convert an address tuple to an IPv4Address."""
62 return (ipaddress.ip_address(addr[0]), *addr[1:])
63
64
65def remove_addr_infos(
66 addr_infos: list[AddrInfoType],
67 addr: tuple[str, int] | tuple[str, int, int, int],
68) -> None:
69 """
70 Remove an address from the list of addr_infos.
71
72 The addr value is typically the return value of
73 sock.getpeername().
74 """
75 bad_addrs_infos: list[AddrInfoType] = []
76 for addr_info in addr_infos:
77 if addr_info[-1] == addr:
78 bad_addrs_infos.append(addr_info)
79 if bad_addrs_infos:
80 for bad_addr_info in bad_addrs_infos:
81 addr_infos.remove(bad_addr_info)
82 return
83 # Slow path in case addr is formatted differently
84 match_addr = _addr_tuple_to_ip_address(addr)
85 for addr_info in addr_infos:
86 if match_addr == _addr_tuple_to_ip_address(addr_info[-1]):
87 bad_addrs_infos.append(addr_info)
88 if bad_addrs_infos:
89 for bad_addr_info in bad_addrs_infos:
90 addr_infos.remove(bad_addr_info)
91 return
92 raise ValueError(f"Address {addr} not found in addr_infos")