1"""Utility functions for aiohappyeyeballs.""" 
    2 
    3import ipaddress 
    4import socket 
    5from typing import Dict, List, Optional, Tuple, Union 
    6 
    7from .types import AddrInfoType 
    8 
    9 
    10def addr_to_addr_infos( 
    11    addr: Optional[ 
    12        Union[Tuple[str, int, int, int], Tuple[str, int, int], Tuple[str, int]] 
    13    ], 
    14) -> Optional[List[AddrInfoType]]: 
    15    """Convert an address tuple to a list of addr_info tuples.""" 
    16    if addr is None: 
    17        return None 
    18    host = addr[0] 
    19    port = addr[1] 
    20    is_ipv6 = ":" in host 
    21    if is_ipv6: 
    22        flowinfo = 0 
    23        scopeid = 0 
    24        addr_len = len(addr) 
    25        if addr_len >= 4: 
    26            scopeid = addr[3]  # type: ignore[misc] 
    27        if addr_len >= 3: 
    28            flowinfo = addr[2]  # type: ignore[misc] 
    29        addr = (host, port, flowinfo, scopeid) 
    30        family = socket.AF_INET6 
    31    else: 
    32        addr = (host, port) 
    33        family = socket.AF_INET 
    34    return [(family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)] 
    35 
    36 
    37def pop_addr_infos_interleave( 
    38    addr_infos: List[AddrInfoType], interleave: Optional[int] = None 
    39) -> None: 
    40    """ 
    41    Pop addr_info from the list of addr_infos by family up to interleave times. 
    42 
    43    The interleave parameter is used to know how many addr_infos for 
    44    each family should be popped of the top of the list. 
    45    """ 
    46    seen: Dict[int, int] = {} 
    47    if interleave is None: 
    48        interleave = 1 
    49    to_remove: List[AddrInfoType] = [] 
    50    for addr_info in addr_infos: 
    51        family = addr_info[0] 
    52        if family not in seen: 
    53            seen[family] = 0 
    54        if seen[family] < interleave: 
    55            to_remove.append(addr_info) 
    56        seen[family] += 1 
    57    for addr_info in to_remove: 
    58        addr_infos.remove(addr_info) 
    59 
    60 
    61def _addr_tuple_to_ip_address( 
    62    addr: Union[Tuple[str, int], Tuple[str, int, int, int]], 
    63) -> Union[ 
    64    Tuple[ipaddress.IPv4Address, int], Tuple[ipaddress.IPv6Address, int, int, int] 
    65]: 
    66    """Convert an address tuple to an IPv4Address.""" 
    67    return (ipaddress.ip_address(addr[0]), *addr[1:]) 
    68 
    69 
    70def remove_addr_infos( 
    71    addr_infos: List[AddrInfoType], 
    72    addr: Union[Tuple[str, int], Tuple[str, int, int, int]], 
    73) -> None: 
    74    """ 
    75    Remove an address from the list of addr_infos. 
    76 
    77    The addr value is typically the return value of 
    78    sock.getpeername(). 
    79    """ 
    80    bad_addrs_infos: List[AddrInfoType] = [] 
    81    for addr_info in addr_infos: 
    82        if addr_info[-1] == addr: 
    83            bad_addrs_infos.append(addr_info) 
    84    if bad_addrs_infos: 
    85        for bad_addr_info in bad_addrs_infos: 
    86            addr_infos.remove(bad_addr_info) 
    87        return 
    88    # Slow path in case addr is formatted differently 
    89    match_addr = _addr_tuple_to_ip_address(addr) 
    90    for addr_info in addr_infos: 
    91        if match_addr == _addr_tuple_to_ip_address(addr_info[-1]): 
    92            bad_addrs_infos.append(addr_info) 
    93    if bad_addrs_infos: 
    94        for bad_addr_info in bad_addrs_infos: 
    95            addr_infos.remove(bad_addr_info) 
    96        return 
    97    raise ValueError(f"Address {addr} not found in addr_infos")