Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/abc/_sockets.py: 64%
69 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import socket
2from abc import abstractmethod
3from io import IOBase
4from ipaddress import IPv4Address, IPv6Address
5from socket import AddressFamily
6from types import TracebackType
7from typing import (
8 Any,
9 AsyncContextManager,
10 Callable,
11 Collection,
12 Dict,
13 List,
14 Mapping,
15 Optional,
16 Tuple,
17 Type,
18 TypeVar,
19 Union,
20)
22from .._core._typedattr import (
23 TypedAttributeProvider,
24 TypedAttributeSet,
25 typed_attribute,
26)
27from ._streams import ByteStream, Listener, T_Stream, UnreliableObjectStream
28from ._tasks import TaskGroup
30IPAddressType = Union[str, IPv4Address, IPv6Address]
31IPSockAddrType = Tuple[str, int]
32SockAddrType = Union[IPSockAddrType, str]
33UDPPacketType = Tuple[bytes, IPSockAddrType]
34T_Retval = TypeVar("T_Retval")
37class _NullAsyncContextManager:
38 async def __aenter__(self) -> None:
39 pass
41 async def __aexit__(
42 self,
43 exc_type: Optional[Type[BaseException]],
44 exc_val: Optional[BaseException],
45 exc_tb: Optional[TracebackType],
46 ) -> Optional[bool]:
47 return None
50class SocketAttribute(TypedAttributeSet):
51 #: the address family of the underlying socket
52 family: AddressFamily = typed_attribute()
53 #: the local socket address of the underlying socket
54 local_address: SockAddrType = typed_attribute()
55 #: for IP addresses, the local port the underlying socket is bound to
56 local_port: int = typed_attribute()
57 #: the underlying stdlib socket object
58 raw_socket: socket.socket = typed_attribute()
59 #: the remote address the underlying socket is connected to
60 remote_address: SockAddrType = typed_attribute()
61 #: for IP addresses, the remote port the underlying socket is connected to
62 remote_port: int = typed_attribute()
65class _SocketProvider(TypedAttributeProvider):
66 @property
67 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
68 from .._core._sockets import convert_ipv6_sockaddr as convert
70 attributes: Dict[Any, Callable[[], Any]] = {
71 SocketAttribute.family: lambda: self._raw_socket.family,
72 SocketAttribute.local_address: lambda: convert(
73 self._raw_socket.getsockname()
74 ),
75 SocketAttribute.raw_socket: lambda: self._raw_socket,
76 }
77 try:
78 peername: Optional[Tuple[str, int]] = convert(
79 self._raw_socket.getpeername()
80 )
81 except OSError:
82 peername = None
84 # Provide the remote address for connected sockets
85 if peername is not None:
86 attributes[SocketAttribute.remote_address] = lambda: peername
88 # Provide local and remote ports for IP based sockets
89 if self._raw_socket.family in (AddressFamily.AF_INET, AddressFamily.AF_INET6):
90 attributes[
91 SocketAttribute.local_port
92 ] = lambda: self._raw_socket.getsockname()[1]
93 if peername is not None:
94 remote_port = peername[1]
95 attributes[SocketAttribute.remote_port] = lambda: remote_port
97 return attributes
99 @property
100 @abstractmethod
101 def _raw_socket(self) -> socket.socket:
102 pass
105class SocketStream(ByteStream, _SocketProvider):
106 """
107 Transports bytes over a socket.
109 Supports all relevant extra attributes from :class:`~SocketAttribute`.
110 """
113class UNIXSocketStream(SocketStream):
114 @abstractmethod
115 async def send_fds(
116 self, message: bytes, fds: Collection[Union[int, IOBase]]
117 ) -> None:
118 """
119 Send file descriptors along with a message to the peer.
121 :param message: a non-empty bytestring
122 :param fds: a collection of files (either numeric file descriptors or open file or socket
123 objects)
124 """
126 @abstractmethod
127 async def receive_fds(self, msglen: int, maxfds: int) -> Tuple[bytes, List[int]]:
128 """
129 Receive file descriptors along with a message from the peer.
131 :param msglen: length of the message to expect from the peer
132 :param maxfds: maximum number of file descriptors to expect from the peer
133 :return: a tuple of (message, file descriptors)
134 """
137class SocketListener(Listener[SocketStream], _SocketProvider):
138 """
139 Listens to incoming socket connections.
141 Supports all relevant extra attributes from :class:`~SocketAttribute`.
142 """
144 @abstractmethod
145 async def accept(self) -> SocketStream:
146 """Accept an incoming connection."""
148 async def serve(
149 self, handler: Callable[[T_Stream], Any], task_group: Optional[TaskGroup] = None
150 ) -> None:
151 from .. import create_task_group
153 context_manager: AsyncContextManager
154 if task_group is None:
155 task_group = context_manager = create_task_group()
156 else:
157 # Can be replaced with AsyncExitStack once on py3.7+
158 context_manager = _NullAsyncContextManager()
160 async with context_manager:
161 while True:
162 stream = await self.accept()
163 task_group.start_soon(handler, stream)
166class UDPSocket(UnreliableObjectStream[UDPPacketType], _SocketProvider):
167 """
168 Represents an unconnected UDP socket.
170 Supports all relevant extra attributes from :class:`~SocketAttribute`.
171 """
173 async def sendto(self, data: bytes, host: str, port: int) -> None:
174 """Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, (host, port)))."""
175 return await self.send((data, (host, port)))
178class ConnectedUDPSocket(UnreliableObjectStream[bytes], _SocketProvider):
179 """
180 Represents an connected UDP socket.
182 Supports all relevant extra attributes from :class:`~SocketAttribute`.
183 """