Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/abc/_sockets.py: 67%
64 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1from __future__ import annotations
3import socket
4from abc import abstractmethod
5from contextlib import AsyncExitStack
6from io import IOBase
7from ipaddress import IPv4Address, IPv6Address
8from socket import AddressFamily
9from typing import (
10 Any,
11 Callable,
12 Collection,
13 Mapping,
14 Tuple,
15 TypeVar,
16 Union,
17)
19from .._core._tasks import create_task_group
20from .._core._typedattr import (
21 TypedAttributeProvider,
22 TypedAttributeSet,
23 typed_attribute,
24)
25from ._streams import ByteStream, Listener, UnreliableObjectStream
26from ._tasks import TaskGroup
28IPAddressType = Union[str, IPv4Address, IPv6Address]
29IPSockAddrType = Tuple[str, int]
30SockAddrType = Union[IPSockAddrType, str]
31UDPPacketType = Tuple[bytes, IPSockAddrType]
32T_Retval = TypeVar("T_Retval")
35class SocketAttribute(TypedAttributeSet):
36 #: the address family of the underlying socket
37 family: AddressFamily = typed_attribute()
38 #: the local socket address of the underlying socket
39 local_address: SockAddrType = typed_attribute()
40 #: for IP addresses, the local port the underlying socket is bound to
41 local_port: int = typed_attribute()
42 #: the underlying stdlib socket object
43 raw_socket: socket.socket = typed_attribute()
44 #: the remote address the underlying socket is connected to
45 remote_address: SockAddrType = typed_attribute()
46 #: for IP addresses, the remote port the underlying socket is connected to
47 remote_port: int = typed_attribute()
50class _SocketProvider(TypedAttributeProvider):
51 @property
52 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
53 from .._core._sockets import convert_ipv6_sockaddr as convert
55 attributes: dict[Any, Callable[[], Any]] = {
56 SocketAttribute.family: lambda: self._raw_socket.family,
57 SocketAttribute.local_address: lambda: convert(
58 self._raw_socket.getsockname()
59 ),
60 SocketAttribute.raw_socket: lambda: self._raw_socket,
61 }
62 try:
63 peername: tuple[str, int] | None = convert(self._raw_socket.getpeername())
64 except OSError:
65 peername = None
67 # Provide the remote address for connected sockets
68 if peername is not None:
69 attributes[SocketAttribute.remote_address] = lambda: peername
71 # Provide local and remote ports for IP based sockets
72 if self._raw_socket.family in (AddressFamily.AF_INET, AddressFamily.AF_INET6):
73 attributes[
74 SocketAttribute.local_port
75 ] = lambda: self._raw_socket.getsockname()[1]
76 if peername is not None:
77 remote_port = peername[1]
78 attributes[SocketAttribute.remote_port] = lambda: remote_port
80 return attributes
82 @property
83 @abstractmethod
84 def _raw_socket(self) -> socket.socket:
85 pass
88class SocketStream(ByteStream, _SocketProvider):
89 """
90 Transports bytes over a socket.
92 Supports all relevant extra attributes from :class:`~SocketAttribute`.
93 """
96class UNIXSocketStream(SocketStream):
97 @abstractmethod
98 async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
99 """
100 Send file descriptors along with a message to the peer.
102 :param message: a non-empty bytestring
103 :param fds: a collection of files (either numeric file descriptors or open file or socket
104 objects)
105 """
107 @abstractmethod
108 async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
109 """
110 Receive file descriptors along with a message from the peer.
112 :param msglen: length of the message to expect from the peer
113 :param maxfds: maximum number of file descriptors to expect from the peer
114 :return: a tuple of (message, file descriptors)
115 """
118class SocketListener(Listener[SocketStream], _SocketProvider):
119 """
120 Listens to incoming socket connections.
122 Supports all relevant extra attributes from :class:`~SocketAttribute`.
123 """
125 @abstractmethod
126 async def accept(self) -> SocketStream:
127 """Accept an incoming connection."""
129 async def serve(
130 self,
131 handler: Callable[[SocketStream], Any],
132 task_group: TaskGroup | None = None,
133 ) -> None:
134 async with AsyncExitStack() as exit_stack:
135 if task_group is None:
136 task_group = await exit_stack.enter_async_context(create_task_group())
138 while True:
139 stream = await self.accept()
140 task_group.start_soon(handler, stream)
143class UDPSocket(UnreliableObjectStream[UDPPacketType], _SocketProvider):
144 """
145 Represents an unconnected UDP socket.
147 Supports all relevant extra attributes from :class:`~SocketAttribute`.
148 """
150 async def sendto(self, data: bytes, host: str, port: int) -> None:
151 """Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, (host, port)))."""
152 return await self.send((data, (host, port)))
155class ConnectedUDPSocket(UnreliableObjectStream[bytes], _SocketProvider):
156 """
157 Represents an connected UDP socket.
159 Supports all relevant extra attributes from :class:`~SocketAttribute`.
160 """