Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/abc/_sockets.py: 64%

69 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import socket 

2from abc import abstractmethod 

3from io import IOBase 

4from ipaddress import IPv4Address, IPv6Address 

5from socket import AddressFamily 

6from types import TracebackType 

7from typing import ( 

8 Any, 

9 AsyncContextManager, 

10 Callable, 

11 Collection, 

12 Dict, 

13 List, 

14 Mapping, 

15 Optional, 

16 Tuple, 

17 Type, 

18 TypeVar, 

19 Union, 

20) 

21 

22from .._core._typedattr import ( 

23 TypedAttributeProvider, 

24 TypedAttributeSet, 

25 typed_attribute, 

26) 

27from ._streams import ByteStream, Listener, T_Stream, UnreliableObjectStream 

28from ._tasks import TaskGroup 

29 

30IPAddressType = Union[str, IPv4Address, IPv6Address] 

31IPSockAddrType = Tuple[str, int] 

32SockAddrType = Union[IPSockAddrType, str] 

33UDPPacketType = Tuple[bytes, IPSockAddrType] 

34T_Retval = TypeVar("T_Retval") 

35 

36 

37class _NullAsyncContextManager: 

38 async def __aenter__(self) -> None: 

39 pass 

40 

41 async def __aexit__( 

42 self, 

43 exc_type: Optional[Type[BaseException]], 

44 exc_val: Optional[BaseException], 

45 exc_tb: Optional[TracebackType], 

46 ) -> Optional[bool]: 

47 return None 

48 

49 

50class SocketAttribute(TypedAttributeSet): 

51 #: the address family of the underlying socket 

52 family: AddressFamily = typed_attribute() 

53 #: the local socket address of the underlying socket 

54 local_address: SockAddrType = typed_attribute() 

55 #: for IP addresses, the local port the underlying socket is bound to 

56 local_port: int = typed_attribute() 

57 #: the underlying stdlib socket object 

58 raw_socket: socket.socket = typed_attribute() 

59 #: the remote address the underlying socket is connected to 

60 remote_address: SockAddrType = typed_attribute() 

61 #: for IP addresses, the remote port the underlying socket is connected to 

62 remote_port: int = typed_attribute() 

63 

64 

65class _SocketProvider(TypedAttributeProvider): 

66 @property 

67 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: 

68 from .._core._sockets import convert_ipv6_sockaddr as convert 

69 

70 attributes: Dict[Any, Callable[[], Any]] = { 

71 SocketAttribute.family: lambda: self._raw_socket.family, 

72 SocketAttribute.local_address: lambda: convert( 

73 self._raw_socket.getsockname() 

74 ), 

75 SocketAttribute.raw_socket: lambda: self._raw_socket, 

76 } 

77 try: 

78 peername: Optional[Tuple[str, int]] = convert( 

79 self._raw_socket.getpeername() 

80 ) 

81 except OSError: 

82 peername = None 

83 

84 # Provide the remote address for connected sockets 

85 if peername is not None: 

86 attributes[SocketAttribute.remote_address] = lambda: peername 

87 

88 # Provide local and remote ports for IP based sockets 

89 if self._raw_socket.family in (AddressFamily.AF_INET, AddressFamily.AF_INET6): 

90 attributes[ 

91 SocketAttribute.local_port 

92 ] = lambda: self._raw_socket.getsockname()[1] 

93 if peername is not None: 

94 remote_port = peername[1] 

95 attributes[SocketAttribute.remote_port] = lambda: remote_port 

96 

97 return attributes 

98 

99 @property 

100 @abstractmethod 

101 def _raw_socket(self) -> socket.socket: 

102 pass 

103 

104 

105class SocketStream(ByteStream, _SocketProvider): 

106 """ 

107 Transports bytes over a socket. 

108 

109 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

110 """ 

111 

112 

113class UNIXSocketStream(SocketStream): 

114 @abstractmethod 

115 async def send_fds( 

116 self, message: bytes, fds: Collection[Union[int, IOBase]] 

117 ) -> None: 

118 """ 

119 Send file descriptors along with a message to the peer. 

120 

121 :param message: a non-empty bytestring 

122 :param fds: a collection of files (either numeric file descriptors or open file or socket 

123 objects) 

124 """ 

125 

126 @abstractmethod 

127 async def receive_fds(self, msglen: int, maxfds: int) -> Tuple[bytes, List[int]]: 

128 """ 

129 Receive file descriptors along with a message from the peer. 

130 

131 :param msglen: length of the message to expect from the peer 

132 :param maxfds: maximum number of file descriptors to expect from the peer 

133 :return: a tuple of (message, file descriptors) 

134 """ 

135 

136 

137class SocketListener(Listener[SocketStream], _SocketProvider): 

138 """ 

139 Listens to incoming socket connections. 

140 

141 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

142 """ 

143 

144 @abstractmethod 

145 async def accept(self) -> SocketStream: 

146 """Accept an incoming connection.""" 

147 

148 async def serve( 

149 self, handler: Callable[[T_Stream], Any], task_group: Optional[TaskGroup] = None 

150 ) -> None: 

151 from .. import create_task_group 

152 

153 context_manager: AsyncContextManager 

154 if task_group is None: 

155 task_group = context_manager = create_task_group() 

156 else: 

157 # Can be replaced with AsyncExitStack once on py3.7+ 

158 context_manager = _NullAsyncContextManager() 

159 

160 async with context_manager: 

161 while True: 

162 stream = await self.accept() 

163 task_group.start_soon(handler, stream) 

164 

165 

166class UDPSocket(UnreliableObjectStream[UDPPacketType], _SocketProvider): 

167 """ 

168 Represents an unconnected UDP socket. 

169 

170 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

171 """ 

172 

173 async def sendto(self, data: bytes, host: str, port: int) -> None: 

174 """Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, (host, port))).""" 

175 return await self.send((data, (host, port))) 

176 

177 

178class ConnectedUDPSocket(UnreliableObjectStream[bytes], _SocketProvider): 

179 """ 

180 Represents an connected UDP socket. 

181 

182 Supports all relevant extra attributes from :class:`~SocketAttribute`. 

183 """