Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_sockets.py: 46%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import errno
4import socket
5import sys
6from abc import abstractmethod
7from collections.abc import Callable, Collection, Mapping
8from contextlib import AsyncExitStack
9from io import IOBase
10from ipaddress import IPv4Address, IPv6Address
11from socket import AddressFamily
12from typing import Any, TypeVar, Union
14from .._core._eventloop import get_async_backend
15from .._core._typedattr import (
16 TypedAttributeProvider,
17 TypedAttributeSet,
18 typed_attribute,
19)
20from ._streams import ByteStream, Listener, UnreliableObjectStream
21from ._tasks import TaskGroup
23if sys.version_info >= (3, 10):
24 from typing import TypeAlias
25else:
26 from typing_extensions import TypeAlias
28IPAddressType: TypeAlias = Union[str, IPv4Address, IPv6Address]
29IPSockAddrType: TypeAlias = tuple[str, int]
30SockAddrType: TypeAlias = Union[IPSockAddrType, str]
31UDPPacketType: TypeAlias = tuple[bytes, IPSockAddrType]
32UNIXDatagramPacketType: TypeAlias = tuple[bytes, str]
33T_Retval = TypeVar("T_Retval")
36def _validate_socket(
37 sock_or_fd: socket.socket | int,
38 sock_type: socket.SocketKind,
39 addr_family: socket.AddressFamily = socket.AF_UNSPEC,
40 *,
41 require_connected: bool = False,
42 require_bound: bool = False,
43) -> socket.socket:
44 if isinstance(sock_or_fd, int):
45 try:
46 sock = socket.socket(fileno=sock_or_fd)
47 except OSError as exc:
48 if exc.errno == errno.ENOTSOCK:
49 raise ValueError(
50 "the file descriptor does not refer to a socket"
51 ) from exc
52 elif require_connected:
53 raise ValueError("the socket must be connected") from exc
54 elif require_bound:
55 raise ValueError("the socket must be bound to a local address") from exc
56 else:
57 raise
58 elif isinstance(sock_or_fd, socket.socket):
59 sock = sock_or_fd
60 else:
61 raise TypeError(
62 f"expected an int or socket, got {type(sock_or_fd).__qualname__} instead"
63 )
65 try:
66 if require_connected:
67 try:
68 sock.getpeername()
69 except OSError as exc:
70 raise ValueError("the socket must be connected") from exc
72 if require_bound:
73 try:
74 if sock.family in (socket.AF_INET, socket.AF_INET6):
75 bound_addr = sock.getsockname()[1]
76 else:
77 bound_addr = sock.getsockname()
78 except OSError:
79 bound_addr = None
81 if not bound_addr:
82 raise ValueError("the socket must be bound to a local address")
84 if addr_family != socket.AF_UNSPEC and sock.family != addr_family:
85 raise ValueError(
86 f"address family mismatch: expected {addr_family.name}, got "
87 f"{sock.family.name}"
88 )
90 if sock.type != sock_type:
91 raise ValueError(
92 f"socket type mismatch: expected {sock_type.name}, got {sock.type.name}"
93 )
94 except BaseException:
95 # Avoid ResourceWarning from the locally constructed socket object
96 if isinstance(sock_or_fd, int):
97 sock.detach()
99 raise
101 sock.setblocking(False)
102 return sock
105class SocketAttribute(TypedAttributeSet):
106 """
107 .. attribute:: family
108 :type: socket.AddressFamily
110 the address family of the underlying socket
112 .. attribute:: local_address
113 :type: tuple[str, int] | str
115 the local address the underlying socket is connected to
117 .. attribute:: local_port
118 :type: int
120 for IP based sockets, the local port the underlying socket is bound to
122 .. attribute:: raw_socket
123 :type: socket.socket
125 the underlying stdlib socket object
127 .. attribute:: remote_address
128 :type: tuple[str, int] | str
130 the remote address the underlying socket is connected to
132 .. attribute:: remote_port
133 :type: int
135 for IP based sockets, the remote port the underlying socket is connected to
136 """
138 family: AddressFamily = typed_attribute()
139 local_address: SockAddrType = typed_attribute()
140 local_port: int = typed_attribute()
141 raw_socket: socket.socket = typed_attribute()
142 remote_address: SockAddrType = typed_attribute()
143 remote_port: int = typed_attribute()
146class _SocketProvider(TypedAttributeProvider):
147 @property
148 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
149 from .._core._sockets import convert_ipv6_sockaddr as convert
151 attributes: dict[Any, Callable[[], Any]] = {
152 SocketAttribute.family: lambda: self._raw_socket.family,
153 SocketAttribute.local_address: lambda: convert(
154 self._raw_socket.getsockname()
155 ),
156 SocketAttribute.raw_socket: lambda: self._raw_socket,
157 }
158 try:
159 peername: tuple[str, int] | None = convert(self._raw_socket.getpeername())
160 except OSError:
161 peername = None
163 # Provide the remote address for connected sockets
164 if peername is not None:
165 attributes[SocketAttribute.remote_address] = lambda: peername
167 # Provide local and remote ports for IP based sockets
168 if self._raw_socket.family in (AddressFamily.AF_INET, AddressFamily.AF_INET6):
169 attributes[SocketAttribute.local_port] = (
170 lambda: self._raw_socket.getsockname()[1]
171 )
172 if peername is not None:
173 remote_port = peername[1]
174 attributes[SocketAttribute.remote_port] = lambda: remote_port
176 return attributes
178 @property
179 @abstractmethod
180 def _raw_socket(self) -> socket.socket:
181 pass
184class SocketStream(ByteStream, _SocketProvider):
185 """
186 Transports bytes over a socket.
188 Supports all relevant extra attributes from :class:`~SocketAttribute`.
189 """
191 @classmethod
192 async def from_socket(cls, sock_or_fd: socket.socket | int) -> SocketStream:
193 """
194 Wrap an existing socket object or file descriptor as a socket stream.
196 The newly created socket wrapper takes ownership of the socket being passed in.
197 The existing socket must already be connected.
199 :param sock_or_fd: a socket object or file descriptor
200 :return: a socket stream
202 """
203 sock = _validate_socket(sock_or_fd, socket.SOCK_STREAM, require_connected=True)
204 return await get_async_backend().wrap_stream_socket(sock)
207class UNIXSocketStream(SocketStream):
208 @classmethod
209 async def from_socket(cls, sock_or_fd: socket.socket | int) -> UNIXSocketStream:
210 """
211 Wrap an existing socket object or file descriptor as a UNIX socket stream.
213 The newly created socket wrapper takes ownership of the socket being passed in.
214 The existing socket must already be connected.
216 :param sock_or_fd: a socket object or file descriptor
217 :return: a UNIX socket stream
219 """
220 sock = _validate_socket(
221 sock_or_fd, socket.SOCK_STREAM, socket.AF_UNIX, require_connected=True
222 )
223 return await get_async_backend().wrap_unix_stream_socket(sock)
225 @abstractmethod
226 async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
227 """
228 Send file descriptors along with a message to the peer.
230 :param message: a non-empty bytestring
231 :param fds: a collection of files (either numeric file descriptors or open file
232 or socket objects)
233 """
235 @abstractmethod
236 async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
237 """
238 Receive file descriptors along with a message from the peer.
240 :param msglen: length of the message to expect from the peer
241 :param maxfds: maximum number of file descriptors to expect from the peer
242 :return: a tuple of (message, file descriptors)
243 """
246class SocketListener(Listener[SocketStream], _SocketProvider):
247 """
248 Listens to incoming socket connections.
250 Supports all relevant extra attributes from :class:`~SocketAttribute`.
251 """
253 @classmethod
254 async def from_socket(
255 cls,
256 sock_or_fd: socket.socket | int,
257 ) -> SocketListener:
258 """
259 Wrap an existing socket object or file descriptor as a socket listener.
261 The newly created listener takes ownership of the socket being passed in.
263 :param sock_or_fd: a socket object or file descriptor
264 :return: a socket listener
266 """
267 sock = _validate_socket(sock_or_fd, socket.SOCK_STREAM, require_bound=True)
268 return await get_async_backend().wrap_listener_socket(sock)
270 @abstractmethod
271 async def accept(self) -> SocketStream:
272 """Accept an incoming connection."""
274 async def serve(
275 self,
276 handler: Callable[[SocketStream], Any],
277 task_group: TaskGroup | None = None,
278 ) -> None:
279 from .. import create_task_group
281 async with AsyncExitStack() as stack:
282 if task_group is None:
283 task_group = await stack.enter_async_context(create_task_group())
285 while True:
286 stream = await self.accept()
287 task_group.start_soon(handler, stream)
290class UDPSocket(UnreliableObjectStream[UDPPacketType], _SocketProvider):
291 """
292 Represents an unconnected UDP socket.
294 Supports all relevant extra attributes from :class:`~SocketAttribute`.
295 """
297 @classmethod
298 async def from_socket(cls, sock_or_fd: socket.socket | int) -> UDPSocket:
299 """
300 Wrap an existing socket object or file descriptor as a UDP socket.
302 The newly created socket wrapper takes ownership of the socket being passed in.
303 The existing socket must be bound to a local address.
305 :param sock_or_fd: a socket object or file descriptor
306 :return: a UDP socket
308 """
309 sock = _validate_socket(sock_or_fd, socket.SOCK_DGRAM, require_bound=True)
310 return await get_async_backend().wrap_udp_socket(sock)
312 async def sendto(self, data: bytes, host: str, port: int) -> None:
313 """
314 Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, (host, port))).
316 """
317 return await self.send((data, (host, port)))
320class ConnectedUDPSocket(UnreliableObjectStream[bytes], _SocketProvider):
321 """
322 Represents an connected UDP socket.
324 Supports all relevant extra attributes from :class:`~SocketAttribute`.
325 """
327 @classmethod
328 async def from_socket(cls, sock_or_fd: socket.socket | int) -> ConnectedUDPSocket:
329 """
330 Wrap an existing socket object or file descriptor as a connected UDP socket.
332 The newly created socket wrapper takes ownership of the socket being passed in.
333 The existing socket must already be connected.
335 :param sock_or_fd: a socket object or file descriptor
336 :return: a connected UDP socket
338 """
339 sock = _validate_socket(
340 sock_or_fd,
341 socket.SOCK_DGRAM,
342 require_connected=True,
343 )
344 return await get_async_backend().wrap_connected_udp_socket(sock)
347class UNIXDatagramSocket(
348 UnreliableObjectStream[UNIXDatagramPacketType], _SocketProvider
349):
350 """
351 Represents an unconnected Unix datagram socket.
353 Supports all relevant extra attributes from :class:`~SocketAttribute`.
354 """
356 @classmethod
357 async def from_socket(
358 cls,
359 sock_or_fd: socket.socket | int,
360 ) -> UNIXDatagramSocket:
361 """
362 Wrap an existing socket object or file descriptor as a UNIX datagram
363 socket.
365 The newly created socket wrapper takes ownership of the socket being passed in.
367 :param sock_or_fd: a socket object or file descriptor
368 :return: a UNIX datagram socket
370 """
371 sock = _validate_socket(sock_or_fd, socket.SOCK_DGRAM, socket.AF_UNIX)
372 return await get_async_backend().wrap_unix_datagram_socket(sock)
374 async def sendto(self, data: bytes, path: str) -> None:
375 """Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, path))."""
376 return await self.send((data, path))
379class ConnectedUNIXDatagramSocket(UnreliableObjectStream[bytes], _SocketProvider):
380 """
381 Represents a connected Unix datagram socket.
383 Supports all relevant extra attributes from :class:`~SocketAttribute`.
384 """
386 @classmethod
387 async def from_socket(
388 cls,
389 sock_or_fd: socket.socket | int,
390 ) -> ConnectedUNIXDatagramSocket:
391 """
392 Wrap an existing socket object or file descriptor as a connected UNIX datagram
393 socket.
395 The newly created socket wrapper takes ownership of the socket being passed in.
396 The existing socket must already be connected.
398 :param sock_or_fd: a socket object or file descriptor
399 :return: a connected UNIX datagram socket
401 """
402 sock = _validate_socket(
403 sock_or_fd, socket.SOCK_DGRAM, socket.AF_UNIX, require_connected=True
404 )
405 return await get_async_backend().wrap_connected_unix_datagram_socket(sock)