1"""Utility functions for aiohappyeyeballs."""
2
3import ipaddress
4import socket
5from typing import Dict, List, Optional, Tuple, Union
6
7from .types import AddrInfoType
8
9
10def addr_to_addr_infos(
11 addr: Optional[
12 Union[Tuple[str, int, int, int], Tuple[str, int, int], Tuple[str, int]]
13 ],
14) -> Optional[List[AddrInfoType]]:
15 """Convert an address tuple to a list of addr_info tuples."""
16 if addr is None:
17 return None
18 host = addr[0]
19 port = addr[1]
20 is_ipv6 = ":" in host
21 if is_ipv6:
22 flowinfo = 0
23 scopeid = 0
24 addr_len = len(addr)
25 if addr_len >= 4:
26 scopeid = addr[3] # type: ignore[misc]
27 if addr_len >= 3:
28 flowinfo = addr[2] # type: ignore[misc]
29 addr = (host, port, flowinfo, scopeid)
30 family = socket.AF_INET6
31 else:
32 addr = (host, port)
33 family = socket.AF_INET
34 return [(family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)]
35
36
37def pop_addr_infos_interleave(
38 addr_infos: List[AddrInfoType], interleave: Optional[int] = None
39) -> None:
40 """
41 Pop addr_info from the list of addr_infos by family up to interleave times.
42
43 The interleave parameter is used to know how many addr_infos for
44 each family should be popped of the top of the list.
45 """
46 seen: Dict[int, int] = {}
47 if interleave is None:
48 interleave = 1
49 to_remove: List[AddrInfoType] = []
50 for addr_info in addr_infos:
51 family = addr_info[0]
52 if family not in seen:
53 seen[family] = 0
54 if seen[family] < interleave:
55 to_remove.append(addr_info)
56 seen[family] += 1
57 for addr_info in to_remove:
58 addr_infos.remove(addr_info)
59
60
61def _addr_tuple_to_ip_address(
62 addr: Union[Tuple[str, int], Tuple[str, int, int, int]],
63) -> Union[
64 Tuple[ipaddress.IPv4Address, int], Tuple[ipaddress.IPv6Address, int, int, int]
65]:
66 """Convert an address tuple to an IPv4Address."""
67 return (ipaddress.ip_address(addr[0]), *addr[1:])
68
69
70def remove_addr_infos(
71 addr_infos: List[AddrInfoType],
72 addr: Union[Tuple[str, int], Tuple[str, int, int, int]],
73) -> None:
74 """
75 Remove an address from the list of addr_infos.
76
77 The addr value is typically the return value of
78 sock.getpeername().
79 """
80 bad_addrs_infos: List[AddrInfoType] = []
81 for addr_info in addr_infos:
82 if addr_info[-1] == addr:
83 bad_addrs_infos.append(addr_info)
84 if bad_addrs_infos:
85 for bad_addr_info in bad_addrs_infos:
86 addr_infos.remove(bad_addr_info)
87 return
88 # Slow path in case addr is formatted differently
89 match_addr = _addr_tuple_to_ip_address(addr)
90 for addr_info in addr_infos:
91 if match_addr == _addr_tuple_to_ip_address(addr_info[-1]):
92 bad_addrs_infos.append(addr_info)
93 if bad_addrs_infos:
94 for bad_addr_info in bad_addrs_infos:
95 addr_infos.remove(bad_addr_info)
96 return
97 raise ValueError(f"Address {addr} not found in addr_infos")