1from __future__ import annotations
2
3from abc import abstractmethod
4from collections.abc import Callable
5from typing import Any, Generic, TypeVar, Union
6
7from .._core._exceptions import EndOfStream
8from .._core._typedattr import TypedAttributeProvider
9from ._resources import AsyncResource
10from ._tasks import TaskGroup
11
12T_Item = TypeVar("T_Item")
13T_co = TypeVar("T_co", covariant=True)
14T_contra = TypeVar("T_contra", contravariant=True)
15
16
17class UnreliableObjectReceiveStream(
18 Generic[T_co], AsyncResource, TypedAttributeProvider
19):
20 """
21 An interface for receiving objects.
22
23 This interface makes no guarantees that the received messages arrive in the order in
24 which they were sent, or that no messages are missed.
25
26 Asynchronously iterating over objects of this type will yield objects matching the
27 given type parameter.
28 """
29
30 def __aiter__(self) -> UnreliableObjectReceiveStream[T_co]:
31 return self
32
33 async def __anext__(self) -> T_co:
34 try:
35 return await self.receive()
36 except EndOfStream:
37 raise StopAsyncIteration
38
39 @abstractmethod
40 async def receive(self) -> T_co:
41 """
42 Receive the next item.
43
44 :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly
45 closed
46 :raises ~anyio.EndOfStream: if this stream has been closed from the other end
47 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
48 due to external causes
49 """
50
51
52class UnreliableObjectSendStream(
53 Generic[T_contra], AsyncResource, TypedAttributeProvider
54):
55 """
56 An interface for sending objects.
57
58 This interface makes no guarantees that the messages sent will reach the
59 recipient(s) in the same order in which they were sent, or at all.
60 """
61
62 @abstractmethod
63 async def send(self, item: T_contra) -> None:
64 """
65 Send an item to the peer(s).
66
67 :param item: the item to send
68 :raises ~anyio.ClosedResourceError: if the send stream has been explicitly
69 closed
70 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
71 due to external causes
72 """
73
74
75class UnreliableObjectStream(
76 UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item]
77):
78 """
79 A bidirectional message stream which does not guarantee the order or reliability of
80 message delivery.
81 """
82
83
84class ObjectReceiveStream(UnreliableObjectReceiveStream[T_co]):
85 """
86 A receive message stream which guarantees that messages are received in the same
87 order in which they were sent, and that no messages are missed.
88 """
89
90
91class ObjectSendStream(UnreliableObjectSendStream[T_contra]):
92 """
93 A send message stream which guarantees that messages are delivered in the same order
94 in which they were sent, without missing any messages in the middle.
95 """
96
97
98class ObjectStream(
99 ObjectReceiveStream[T_Item],
100 ObjectSendStream[T_Item],
101 UnreliableObjectStream[T_Item],
102):
103 """
104 A bidirectional message stream which guarantees the order and reliability of message
105 delivery.
106 """
107
108 @abstractmethod
109 async def send_eof(self) -> None:
110 """
111 Send an end-of-file indication to the peer.
112
113 You should not try to send any further data to this stream after calling this
114 method. This method is idempotent (does nothing on successive calls).
115 """
116
117
118class ByteReceiveStream(AsyncResource, TypedAttributeProvider):
119 """
120 An interface for receiving bytes from a single peer.
121
122 Iterating this byte stream will yield a byte string of arbitrary length, but no more
123 than 65536 bytes.
124 """
125
126 def __aiter__(self) -> ByteReceiveStream:
127 return self
128
129 async def __anext__(self) -> bytes:
130 try:
131 return await self.receive()
132 except EndOfStream:
133 raise StopAsyncIteration
134
135 @abstractmethod
136 async def receive(self, max_bytes: int = 65536) -> bytes:
137 """
138 Receive at most ``max_bytes`` bytes from the peer.
139
140 .. note:: Implementers of this interface should not return an empty
141 :class:`bytes` object, and users should ignore them.
142
143 :param max_bytes: maximum number of bytes to receive
144 :return: the received bytes
145 :raises ~anyio.EndOfStream: if this stream has been closed from the other end
146 """
147
148
149class ByteSendStream(AsyncResource, TypedAttributeProvider):
150 """An interface for sending bytes to a single peer."""
151
152 @abstractmethod
153 async def send(self, item: bytes) -> None:
154 """
155 Send the given bytes to the peer.
156
157 :param item: the bytes to send
158 """
159
160
161class ByteStream(ByteReceiveStream, ByteSendStream):
162 """A bidirectional byte stream."""
163
164 @abstractmethod
165 async def send_eof(self) -> None:
166 """
167 Send an end-of-file indication to the peer.
168
169 You should not try to send any further data to this stream after calling this
170 method. This method is idempotent (does nothing on successive calls).
171 """
172
173
174#: Type alias for all unreliable bytes-oriented receive streams.
175AnyUnreliableByteReceiveStream = Union[
176 UnreliableObjectReceiveStream[bytes], ByteReceiveStream
177]
178#: Type alias for all unreliable bytes-oriented send streams.
179AnyUnreliableByteSendStream = Union[UnreliableObjectSendStream[bytes], ByteSendStream]
180#: Type alias for all unreliable bytes-oriented streams.
181AnyUnreliableByteStream = Union[UnreliableObjectStream[bytes], ByteStream]
182#: Type alias for all bytes-oriented receive streams.
183AnyByteReceiveStream = Union[ObjectReceiveStream[bytes], ByteReceiveStream]
184#: Type alias for all bytes-oriented send streams.
185AnyByteSendStream = Union[ObjectSendStream[bytes], ByteSendStream]
186#: Type alias for all bytes-oriented streams.
187AnyByteStream = Union[ObjectStream[bytes], ByteStream]
188
189
190class Listener(Generic[T_co], AsyncResource, TypedAttributeProvider):
191 """An interface for objects that let you accept incoming connections."""
192
193 @abstractmethod
194 async def serve(
195 self, handler: Callable[[T_co], Any], task_group: TaskGroup | None = None
196 ) -> None:
197 """
198 Accept incoming connections as they come in and start tasks to handle them.
199
200 :param handler: a callable that will be used to handle each accepted connection
201 :param task_group: the task group that will be used to start tasks for handling
202 each accepted connection (if omitted, an ad-hoc task group will be created)
203 """