Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohappyeyeballs/utils.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

58 statements  

1"""Utility functions for aiohappyeyeballs.""" 

2 

3import ipaddress 

4import socket 

5from typing import Dict, List, Optional, Tuple, Union 

6 

7from .types import AddrInfoType 

8 

9 

10def addr_to_addr_infos( 

11 addr: Optional[ 

12 Union[Tuple[str, int, int, int], Tuple[str, int, int], Tuple[str, int]] 

13 ], 

14) -> Optional[List[AddrInfoType]]: 

15 """Convert an address tuple to a list of addr_info tuples.""" 

16 if addr is None: 

17 return None 

18 host = addr[0] 

19 port = addr[1] 

20 is_ipv6 = ":" in host 

21 if is_ipv6: 

22 flowinfo = 0 

23 scopeid = 0 

24 addr_len = len(addr) 

25 if addr_len >= 4: 

26 scopeid = addr[3] # type: ignore[misc] 

27 if addr_len >= 3: 

28 flowinfo = addr[2] # type: ignore[misc] 

29 addr = (host, port, flowinfo, scopeid) 

30 family = socket.AF_INET6 

31 else: 

32 addr = (host, port) 

33 family = socket.AF_INET 

34 return [(family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)] 

35 

36 

37def pop_addr_infos_interleave( 

38 addr_infos: List[AddrInfoType], interleave: Optional[int] = None 

39) -> None: 

40 """ 

41 Pop addr_info from the list of addr_infos by family up to interleave times. 

42 

43 The interleave parameter is used to know how many addr_infos for 

44 each family should be popped of the top of the list. 

45 """ 

46 seen: Dict[int, int] = {} 

47 if interleave is None: 

48 interleave = 1 

49 to_remove: List[AddrInfoType] = [] 

50 for addr_info in addr_infos: 

51 family = addr_info[0] 

52 if family not in seen: 

53 seen[family] = 0 

54 if seen[family] < interleave: 

55 to_remove.append(addr_info) 

56 seen[family] += 1 

57 for addr_info in to_remove: 

58 addr_infos.remove(addr_info) 

59 

60 

61def _addr_tuple_to_ip_address( 

62 addr: Union[Tuple[str, int], Tuple[str, int, int, int]], 

63) -> Union[ 

64 Tuple[ipaddress.IPv4Address, int], Tuple[ipaddress.IPv6Address, int, int, int] 

65]: 

66 """Convert an address tuple to an IPv4Address.""" 

67 return (ipaddress.ip_address(addr[0]), *addr[1:]) 

68 

69 

70def remove_addr_infos( 

71 addr_infos: List[AddrInfoType], 

72 addr: Union[Tuple[str, int], Tuple[str, int, int, int]], 

73) -> None: 

74 """ 

75 Remove an address from the list of addr_infos. 

76 

77 The addr value is typically the return value of 

78 sock.getpeername(). 

79 """ 

80 bad_addrs_infos: List[AddrInfoType] = [] 

81 for addr_info in addr_infos: 

82 if addr_info[-1] == addr: 

83 bad_addrs_infos.append(addr_info) 

84 if bad_addrs_infos: 

85 for bad_addr_info in bad_addrs_infos: 

86 addr_infos.remove(bad_addr_info) 

87 return 

88 # Slow path in case addr is formatted differently 

89 match_addr = _addr_tuple_to_ip_address(addr) 

90 for addr_info in addr_infos: 

91 if match_addr == _addr_tuple_to_ip_address(addr_info[-1]): 

92 bad_addrs_infos.append(addr_info) 

93 if bad_addrs_infos: 

94 for bad_addr_info in bad_addrs_infos: 

95 addr_infos.remove(bad_addr_info) 

96 return 

97 raise ValueError(f"Address {addr} not found in addr_infos")