Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/abc/_sockets.py: 67%

64 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1from __future__ import annotations 

2 

3import socket 

4from abc import abstractmethod 

5from contextlib import AsyncExitStack 

6from io import IOBase 

7from ipaddress import IPv4Address, IPv6Address 

8from socket import AddressFamily 

9from typing import ( 

10 Any, 

11 Callable, 

12 Collection, 

13 Mapping, 

14 Tuple, 

15 TypeVar, 

16 Union, 

17) 

18 

19from .._core._tasks import create_task_group 

20from .._core._typedattr import ( 

21 TypedAttributeProvider, 

22 TypedAttributeSet, 

23 typed_attribute, 

24) 

25from ._streams import ByteStream, Listener, UnreliableObjectStream 

26from ._tasks import TaskGroup 

27 

28IPAddressType = Union[str, IPv4Address, IPv6Address] 

29IPSockAddrType = Tuple[str, int] 

30SockAddrType = Union[IPSockAddrType, str] 

31UDPPacketType = Tuple[bytes, IPSockAddrType] 

32T_Retval = TypeVar("T_Retval") 

33 

34 

35class SocketAttribute(TypedAttributeSet): 

36 #: the address family of the underlying socket 

37 family: AddressFamily = typed_attribute() 

38 #: the local socket address of the underlying socket 

39 local_address: SockAddrType = typed_attribute() 

40 #: for IP addresses, the local port the underlying socket is bound to 

41 local_port: int = typed_attribute() 

42 #: the underlying stdlib socket object 

43 raw_socket: socket.socket = typed_attribute() 

44 #: the remote address the underlying socket is connected to 

45 remote_address: SockAddrType = typed_attribute() 

46 #: for IP addresses, the remote port the underlying socket is connected to 

47 remote_port: int = typed_attribute() 

48 

49 

50class _SocketProvider(TypedAttributeProvider): 

51 @property 

52 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: 

53 from .._core._sockets import convert_ipv6_sockaddr as convert 

54 

55 attributes: dict[Any, Callable[[], Any]] = { 

56 SocketAttribute.family: lambda: self._raw_socket.family, 

57 SocketAttribute.local_address: lambda: convert( 

58 self._raw_socket.getsockname() 

59 ), 

60 SocketAttribute.raw_socket: lambda: self._raw_socket, 

61 } 

62 try: 

63 peername: tuple[str, int] | None = convert(self._raw_socket.getpeername()) 

64 except OSError: 

65 peername = None 

66 

67 # Provide the remote address for connected sockets 

68 if peername is not None: 

69 attributes[SocketAttribute.remote_address] = lambda: peername 

70 

71 # Provide local and remote ports for IP based sockets 

72 if self._raw_socket.family in (AddressFamily.AF_INET, AddressFamily.AF_INET6): 

73 attributes[ 

74 SocketAttribute.local_port 

75 ] = lambda: self._raw_socket.getsockname()[1] 

76 if peername is not None: 

77 remote_port = peername[1] 

78 attributes[SocketAttribute.remote_port] = lambda: remote_port 

79 

80 return attributes 

81 

82 @property 

83 @abstractmethod 

84 def _raw_socket(self) -> socket.socket: 

85 pass 

86 

87 

88class SocketStream(ByteStream, _SocketProvider): 

89 """ 

90 Transports bytes over a socket. 

91 

92 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

93 """ 

94 

95 

96class UNIXSocketStream(SocketStream): 

97 @abstractmethod 

98 async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None: 

99 """ 

100 Send file descriptors along with a message to the peer. 

101 

102 :param message: a non-empty bytestring 

103 :param fds: a collection of files (either numeric file descriptors or open file or socket 

104 objects) 

105 """ 

106 

107 @abstractmethod 

108 async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]: 

109 """ 

110 Receive file descriptors along with a message from the peer. 

111 

112 :param msglen: length of the message to expect from the peer 

113 :param maxfds: maximum number of file descriptors to expect from the peer 

114 :return: a tuple of (message, file descriptors) 

115 """ 

116 

117 

118class SocketListener(Listener[SocketStream], _SocketProvider): 

119 """ 

120 Listens to incoming socket connections. 

121 

122 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

123 """ 

124 

125 @abstractmethod 

126 async def accept(self) -> SocketStream: 

127 """Accept an incoming connection.""" 

128 

129 async def serve( 

130 self, 

131 handler: Callable[[SocketStream], Any], 

132 task_group: TaskGroup | None = None, 

133 ) -> None: 

134 async with AsyncExitStack() as exit_stack: 

135 if task_group is None: 

136 task_group = await exit_stack.enter_async_context(create_task_group()) 

137 

138 while True: 

139 stream = await self.accept() 

140 task_group.start_soon(handler, stream) 

141 

142 

143class UDPSocket(UnreliableObjectStream[UDPPacketType], _SocketProvider): 

144 """ 

145 Represents an unconnected UDP socket. 

146 

147 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

148 """ 

149 

150 async def sendto(self, data: bytes, host: str, port: int) -> None: 

151 """Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, (host, port))).""" 

152 return await self.send((data, (host, port))) 

153 

154 

155class ConnectedUDPSocket(UnreliableObjectStream[bytes], _SocketProvider): 

156 """ 

157 Represents an connected UDP socket. 

158 

159 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

160 """