Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_sockets.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import errno
4import socket
5from abc import abstractmethod
6from collections.abc import Callable, Collection, Mapping
7from contextlib import AsyncExitStack
8from io import IOBase
9from ipaddress import IPv4Address, IPv6Address
10from socket import AddressFamily
11from typing import Any, TypeAlias, TypeVar
13from .._core._eventloop import get_async_backend
14from .._core._typedattr import (
15 TypedAttributeProvider,
16 TypedAttributeSet,
17 typed_attribute,
18)
19from ._streams import ByteStream, Listener, UnreliableObjectStream
20from ._tasks import TaskGroup
22IPAddressType: TypeAlias = str | IPv4Address | IPv6Address
23IPSockAddrType: TypeAlias = tuple[str, int]
24SockAddrType: TypeAlias = IPSockAddrType | str
25UDPPacketType: TypeAlias = tuple[bytes, IPSockAddrType]
26UNIXDatagramPacketType: TypeAlias = tuple[bytes, str]
27T_Retval = TypeVar("T_Retval")
30def _validate_socket(
31 sock_or_fd: socket.socket | int,
32 sock_type: socket.SocketKind,
33 addr_family: socket.AddressFamily = socket.AF_UNSPEC,
34 *,
35 require_connected: bool = False,
36 require_bound: bool = False,
37) -> socket.socket:
38 if isinstance(sock_or_fd, int):
39 try:
40 sock = socket.socket(fileno=sock_or_fd)
41 except OSError as exc:
42 if exc.errno == errno.ENOTSOCK:
43 raise ValueError(
44 "the file descriptor does not refer to a socket"
45 ) from exc
46 elif require_connected:
47 raise ValueError("the socket must be connected") from exc
48 elif require_bound:
49 raise ValueError("the socket must be bound to a local address") from exc
50 else:
51 raise
52 elif isinstance(sock_or_fd, socket.socket):
53 sock = sock_or_fd
54 else:
55 raise TypeError(
56 f"expected an int or socket, got {type(sock_or_fd).__qualname__} instead"
57 )
59 try:
60 if require_connected:
61 try:
62 sock.getpeername()
63 except OSError as exc:
64 raise ValueError("the socket must be connected") from exc
66 if require_bound:
67 try:
68 if sock.family in (socket.AF_INET, socket.AF_INET6):
69 bound_addr = sock.getsockname()[1]
70 else:
71 bound_addr = sock.getsockname()
72 except OSError:
73 bound_addr = None
75 if not bound_addr:
76 raise ValueError("the socket must be bound to a local address")
78 if addr_family != socket.AF_UNSPEC and sock.family != addr_family:
79 raise ValueError(
80 f"address family mismatch: expected {addr_family.name}, got "
81 f"{sock.family.name}"
82 )
84 if sock.type != sock_type:
85 raise ValueError(
86 f"socket type mismatch: expected {sock_type.name}, got {sock.type.name}"
87 )
88 except BaseException:
89 # Avoid ResourceWarning from the locally constructed socket object
90 if isinstance(sock_or_fd, int):
91 sock.detach()
93 raise
95 sock.setblocking(False)
96 return sock
99class SocketAttribute(TypedAttributeSet):
100 """
101 .. attribute:: family
102 :type: socket.AddressFamily
104 the address family of the underlying socket
106 .. attribute:: local_address
107 :type: tuple[str, int] | str
109 the local address the underlying socket is connected to
111 .. attribute:: local_port
112 :type: int
114 for IP based sockets, the local port the underlying socket is bound to
116 .. attribute:: raw_socket
117 :type: socket.socket
119 the underlying stdlib socket object
121 .. attribute:: remote_address
122 :type: tuple[str, int] | str
124 the remote address the underlying socket is connected to
126 .. attribute:: remote_port
127 :type: int
129 for IP based sockets, the remote port the underlying socket is connected to
130 """
132 family: AddressFamily = typed_attribute()
133 local_address: SockAddrType = typed_attribute()
134 local_port: int = typed_attribute()
135 raw_socket: socket.socket = typed_attribute()
136 remote_address: SockAddrType = typed_attribute()
137 remote_port: int = typed_attribute()
140class _SocketProvider(TypedAttributeProvider):
141 @property
142 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
143 from .._core._sockets import convert_ipv6_sockaddr as convert
145 attributes: dict[Any, Callable[[], Any]] = {
146 SocketAttribute.family: lambda: self._raw_socket.family,
147 SocketAttribute.local_address: lambda: convert(
148 self._raw_socket.getsockname()
149 ),
150 SocketAttribute.raw_socket: lambda: self._raw_socket,
151 }
152 try:
153 peername: tuple[str, int] | None = convert(self._raw_socket.getpeername())
154 except OSError:
155 peername = None
157 # Provide the remote address for connected sockets
158 if peername is not None:
159 attributes[SocketAttribute.remote_address] = lambda: peername
161 # Provide local and remote ports for IP based sockets
162 if self._raw_socket.family in (AddressFamily.AF_INET, AddressFamily.AF_INET6):
163 attributes[SocketAttribute.local_port] = lambda: (
164 self._raw_socket.getsockname()[1]
165 )
166 if peername is not None:
167 remote_port = peername[1]
168 attributes[SocketAttribute.remote_port] = lambda: remote_port
170 return attributes
172 @property
173 @abstractmethod
174 def _raw_socket(self) -> socket.socket:
175 pass
178class SocketStream(ByteStream, _SocketProvider):
179 """
180 Transports bytes over a socket.
182 Supports all relevant extra attributes from :class:`~SocketAttribute`.
183 """
185 @classmethod
186 async def from_socket(cls, sock_or_fd: socket.socket | int) -> SocketStream:
187 """
188 Wrap an existing socket object or file descriptor as a socket stream.
190 The newly created socket wrapper takes ownership of the socket being passed in.
191 The existing socket must already be connected.
193 :param sock_or_fd: a socket object or file descriptor
194 :return: a socket stream
196 """
197 sock = _validate_socket(sock_or_fd, socket.SOCK_STREAM, require_connected=True)
198 return await get_async_backend().wrap_stream_socket(sock)
201class UNIXSocketStream(SocketStream):
202 @classmethod
203 async def from_socket(cls, sock_or_fd: socket.socket | int) -> UNIXSocketStream:
204 """
205 Wrap an existing socket object or file descriptor as a UNIX socket stream.
207 The newly created socket wrapper takes ownership of the socket being passed in.
208 The existing socket must already be connected.
210 :param sock_or_fd: a socket object or file descriptor
211 :return: a UNIX socket stream
213 """
214 sock = _validate_socket(
215 sock_or_fd, socket.SOCK_STREAM, socket.AF_UNIX, require_connected=True
216 )
217 return await get_async_backend().wrap_unix_stream_socket(sock)
219 @abstractmethod
220 async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
221 """
222 Send file descriptors along with a message to the peer.
224 :param message: a non-empty bytestring
225 :param fds: a collection of files (either numeric file descriptors or open file
226 or socket objects)
227 """
229 @abstractmethod
230 async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
231 """
232 Receive file descriptors along with a message from the peer.
234 :param msglen: length of the message to expect from the peer
235 :param maxfds: maximum number of file descriptors to expect from the peer
236 :return: a tuple of (message, file descriptors)
237 """
240class SocketListener(Listener[SocketStream], _SocketProvider):
241 """
242 Listens to incoming socket connections.
244 Supports all relevant extra attributes from :class:`~SocketAttribute`.
245 """
247 @classmethod
248 async def from_socket(
249 cls,
250 sock_or_fd: socket.socket | int,
251 ) -> SocketListener:
252 """
253 Wrap an existing socket object or file descriptor as a socket listener.
255 The newly created listener takes ownership of the socket being passed in.
257 :param sock_or_fd: a socket object or file descriptor
258 :return: a socket listener
260 """
261 sock = _validate_socket(sock_or_fd, socket.SOCK_STREAM, require_bound=True)
262 return await get_async_backend().wrap_listener_socket(sock)
264 @abstractmethod
265 async def accept(self) -> SocketStream:
266 """Accept an incoming connection."""
268 async def serve(
269 self,
270 handler: Callable[[SocketStream], Any],
271 task_group: TaskGroup | None = None,
272 ) -> None:
273 from .. import create_task_group
275 async with AsyncExitStack() as stack:
276 if task_group is None:
277 task_group = await stack.enter_async_context(create_task_group())
279 while True:
280 stream = await self.accept()
281 task_group.start_soon(handler, stream)
284class UDPSocket(UnreliableObjectStream[UDPPacketType], _SocketProvider):
285 """
286 Represents an unconnected UDP socket.
288 Supports all relevant extra attributes from :class:`~SocketAttribute`.
289 """
291 @classmethod
292 async def from_socket(cls, sock_or_fd: socket.socket | int) -> UDPSocket:
293 """
294 Wrap an existing socket object or file descriptor as a UDP socket.
296 The newly created socket wrapper takes ownership of the socket being passed in.
297 The existing socket must be bound to a local address.
299 :param sock_or_fd: a socket object or file descriptor
300 :return: a UDP socket
302 """
303 sock = _validate_socket(sock_or_fd, socket.SOCK_DGRAM, require_bound=True)
304 return await get_async_backend().wrap_udp_socket(sock)
306 async def sendto(self, data: bytes, host: str, port: int) -> None:
307 """
308 Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, (host, port))).
310 """
311 return await self.send((data, (host, port)))
314class ConnectedUDPSocket(UnreliableObjectStream[bytes], _SocketProvider):
315 """
316 Represents an connected UDP socket.
318 Supports all relevant extra attributes from :class:`~SocketAttribute`.
319 """
321 @classmethod
322 async def from_socket(cls, sock_or_fd: socket.socket | int) -> ConnectedUDPSocket:
323 """
324 Wrap an existing socket object or file descriptor as a connected UDP socket.
326 The newly created socket wrapper takes ownership of the socket being passed in.
327 The existing socket must already be connected.
329 :param sock_or_fd: a socket object or file descriptor
330 :return: a connected UDP socket
332 """
333 sock = _validate_socket(
334 sock_or_fd,
335 socket.SOCK_DGRAM,
336 require_connected=True,
337 )
338 return await get_async_backend().wrap_connected_udp_socket(sock)
341class UNIXDatagramSocket(
342 UnreliableObjectStream[UNIXDatagramPacketType], _SocketProvider
343):
344 """
345 Represents an unconnected Unix datagram socket.
347 Supports all relevant extra attributes from :class:`~SocketAttribute`.
348 """
350 @classmethod
351 async def from_socket(
352 cls,
353 sock_or_fd: socket.socket | int,
354 ) -> UNIXDatagramSocket:
355 """
356 Wrap an existing socket object or file descriptor as a UNIX datagram
357 socket.
359 The newly created socket wrapper takes ownership of the socket being passed in.
361 :param sock_or_fd: a socket object or file descriptor
362 :return: a UNIX datagram socket
364 """
365 sock = _validate_socket(sock_or_fd, socket.SOCK_DGRAM, socket.AF_UNIX)
366 return await get_async_backend().wrap_unix_datagram_socket(sock)
368 async def sendto(self, data: bytes, path: str) -> None:
369 """Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, path))."""
370 return await self.send((data, path))
373class ConnectedUNIXDatagramSocket(UnreliableObjectStream[bytes], _SocketProvider):
374 """
375 Represents a connected Unix datagram socket.
377 Supports all relevant extra attributes from :class:`~SocketAttribute`.
378 """
380 @classmethod
381 async def from_socket(
382 cls,
383 sock_or_fd: socket.socket | int,
384 ) -> ConnectedUNIXDatagramSocket:
385 """
386 Wrap an existing socket object or file descriptor as a connected UNIX datagram
387 socket.
389 The newly created socket wrapper takes ownership of the socket being passed in.
390 The existing socket must already be connected.
392 :param sock_or_fd: a socket object or file descriptor
393 :return: a connected UNIX datagram socket
395 """
396 sock = _validate_socket(
397 sock_or_fd, socket.SOCK_DGRAM, socket.AF_UNIX, require_connected=True
398 )
399 return await get_async_backend().wrap_connected_unix_datagram_socket(sock)