1from __future__ import annotations
2
3import sys
4from abc import ABCMeta, abstractmethod
5from collections.abc import Callable
6from typing import Any, Generic, TypeVar, Union
7
8from .._core._exceptions import EndOfStream
9from .._core._typedattr import TypedAttributeProvider
10from ._resources import AsyncResource
11from ._tasks import TaskGroup
12
13if sys.version_info >= (3, 10):
14 from typing import TypeAlias
15else:
16 from typing_extensions import TypeAlias
17
18T_Item = TypeVar("T_Item")
19T_co = TypeVar("T_co", covariant=True)
20T_contra = TypeVar("T_contra", contravariant=True)
21
22
23class UnreliableObjectReceiveStream(
24 Generic[T_co], AsyncResource, TypedAttributeProvider
25):
26 """
27 An interface for receiving objects.
28
29 This interface makes no guarantees that the received messages arrive in the order in
30 which they were sent, or that no messages are missed.
31
32 Asynchronously iterating over objects of this type will yield objects matching the
33 given type parameter.
34 """
35
36 def __aiter__(self) -> UnreliableObjectReceiveStream[T_co]:
37 return self
38
39 async def __anext__(self) -> T_co:
40 try:
41 return await self.receive()
42 except EndOfStream:
43 raise StopAsyncIteration from None
44
45 @abstractmethod
46 async def receive(self) -> T_co:
47 """
48 Receive the next item.
49
50 :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly
51 closed
52 :raises ~anyio.EndOfStream: if this stream has been closed from the other end
53 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
54 due to external causes
55 """
56
57
58class UnreliableObjectSendStream(
59 Generic[T_contra], AsyncResource, TypedAttributeProvider
60):
61 """
62 An interface for sending objects.
63
64 This interface makes no guarantees that the messages sent will reach the
65 recipient(s) in the same order in which they were sent, or at all.
66 """
67
68 @abstractmethod
69 async def send(self, item: T_contra) -> None:
70 """
71 Send an item to the peer(s).
72
73 :param item: the item to send
74 :raises ~anyio.ClosedResourceError: if the send stream has been explicitly
75 closed
76 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
77 due to external causes
78 """
79
80
81class UnreliableObjectStream(
82 UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item]
83):
84 """
85 A bidirectional message stream which does not guarantee the order or reliability of
86 message delivery.
87 """
88
89
90class ObjectReceiveStream(UnreliableObjectReceiveStream[T_co]):
91 """
92 A receive message stream which guarantees that messages are received in the same
93 order in which they were sent, and that no messages are missed.
94 """
95
96
97class ObjectSendStream(UnreliableObjectSendStream[T_contra]):
98 """
99 A send message stream which guarantees that messages are delivered in the same order
100 in which they were sent, without missing any messages in the middle.
101 """
102
103
104class ObjectStream(
105 ObjectReceiveStream[T_Item],
106 ObjectSendStream[T_Item],
107 UnreliableObjectStream[T_Item],
108):
109 """
110 A bidirectional message stream which guarantees the order and reliability of message
111 delivery.
112 """
113
114 @abstractmethod
115 async def send_eof(self) -> None:
116 """
117 Send an end-of-file indication to the peer.
118
119 You should not try to send any further data to this stream after calling this
120 method. This method is idempotent (does nothing on successive calls).
121 """
122
123
124class ByteReceiveStream(AsyncResource, TypedAttributeProvider):
125 """
126 An interface for receiving bytes from a single peer.
127
128 Iterating this byte stream will yield a byte string of arbitrary length, but no more
129 than 65536 bytes.
130 """
131
132 def __aiter__(self) -> ByteReceiveStream:
133 return self
134
135 async def __anext__(self) -> bytes:
136 try:
137 return await self.receive()
138 except EndOfStream:
139 raise StopAsyncIteration from None
140
141 @abstractmethod
142 async def receive(self, max_bytes: int = 65536) -> bytes:
143 """
144 Receive at most ``max_bytes`` bytes from the peer.
145
146 .. note:: Implementers of this interface should not return an empty
147 :class:`bytes` object, and users should ignore them.
148
149 :param max_bytes: maximum number of bytes to receive
150 :return: the received bytes
151 :raises ~anyio.EndOfStream: if this stream has been closed from the other end
152 """
153
154
155class ByteSendStream(AsyncResource, TypedAttributeProvider):
156 """An interface for sending bytes to a single peer."""
157
158 @abstractmethod
159 async def send(self, item: bytes) -> None:
160 """
161 Send the given bytes to the peer.
162
163 :param item: the bytes to send
164 """
165
166
167class ByteStream(ByteReceiveStream, ByteSendStream):
168 """A bidirectional byte stream."""
169
170 @abstractmethod
171 async def send_eof(self) -> None:
172 """
173 Send an end-of-file indication to the peer.
174
175 You should not try to send any further data to this stream after calling this
176 method. This method is idempotent (does nothing on successive calls).
177 """
178
179
180#: Type alias for all unreliable bytes-oriented receive streams.
181AnyUnreliableByteReceiveStream: TypeAlias = Union[
182 UnreliableObjectReceiveStream[bytes], ByteReceiveStream
183]
184#: Type alias for all unreliable bytes-oriented send streams.
185AnyUnreliableByteSendStream: TypeAlias = Union[
186 UnreliableObjectSendStream[bytes], ByteSendStream
187]
188#: Type alias for all unreliable bytes-oriented streams.
189AnyUnreliableByteStream: TypeAlias = Union[UnreliableObjectStream[bytes], ByteStream]
190#: Type alias for all bytes-oriented receive streams.
191AnyByteReceiveStream: TypeAlias = Union[ObjectReceiveStream[bytes], ByteReceiveStream]
192#: Type alias for all bytes-oriented send streams.
193AnyByteSendStream: TypeAlias = Union[ObjectSendStream[bytes], ByteSendStream]
194#: Type alias for all bytes-oriented streams.
195AnyByteStream: TypeAlias = Union[ObjectStream[bytes], ByteStream]
196
197
198class Listener(Generic[T_co], AsyncResource, TypedAttributeProvider):
199 """An interface for objects that let you accept incoming connections."""
200
201 @abstractmethod
202 async def serve(
203 self, handler: Callable[[T_co], Any], task_group: TaskGroup | None = None
204 ) -> None:
205 """
206 Accept incoming connections as they come in and start tasks to handle them.
207
208 :param handler: a callable that will be used to handle each accepted connection
209 :param task_group: the task group that will be used to start tasks for handling
210 each accepted connection (if omitted, an ad-hoc task group will be created)
211 """
212
213
214class ObjectStreamConnectable(Generic[T_co], metaclass=ABCMeta):
215 @abstractmethod
216 async def connect(self) -> ObjectStream[T_co]:
217 """
218 Connect to the remote endpoint.
219
220 :return: an object stream connected to the remote end
221 :raises ConnectionFailed: if the connection fails
222 """
223
224
225class ByteStreamConnectable(metaclass=ABCMeta):
226 @abstractmethod
227 async def connect(self) -> ByteStream:
228 """
229 Connect to the remote endpoint.
230
231 :return: a bytestream connected to the remote end
232 :raises ConnectionFailed: if the connection fails
233 """
234
235
236#: Type alias for all connectables returning bytestreams or bytes-oriented object streams
237AnyByteStreamConnectable: TypeAlias = Union[
238 ObjectStreamConnectable[bytes], ByteStreamConnectable
239]