Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_streams.py: 84%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3from abc import ABCMeta, abstractmethod
4from collections.abc import Callable
5from typing import Any, Generic, TypeAlias, TypeVar
7from .._core._exceptions import EndOfStream
8from .._core._typedattr import TypedAttributeProvider
9from ._resources import AsyncResource
10from ._tasks import TaskGroup
12T_Item = TypeVar("T_Item")
13T_co = TypeVar("T_co", covariant=True)
14T_contra = TypeVar("T_contra", contravariant=True)
17class UnreliableObjectReceiveStream(
18 Generic[T_co], AsyncResource, TypedAttributeProvider
19):
20 """
21 An interface for receiving objects.
23 This interface makes no guarantees that the received messages arrive in the order in
24 which they were sent, or that no messages are missed.
26 Asynchronously iterating over objects of this type will yield objects matching the
27 given type parameter.
28 """
30 def __aiter__(self) -> UnreliableObjectReceiveStream[T_co]:
31 return self
33 async def __anext__(self) -> T_co:
34 try:
35 return await self.receive()
36 except EndOfStream:
37 raise StopAsyncIteration from None
39 @abstractmethod
40 async def receive(self) -> T_co:
41 """
42 Receive the next item.
44 :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly
45 closed
46 :raises ~anyio.EndOfStream: if this stream has been closed from the other end
47 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
48 due to external causes
49 """
52class UnreliableObjectSendStream(
53 Generic[T_contra], AsyncResource, TypedAttributeProvider
54):
55 """
56 An interface for sending objects.
58 This interface makes no guarantees that the messages sent will reach the
59 recipient(s) in the same order in which they were sent, or at all.
60 """
62 @abstractmethod
63 async def send(self, item: T_contra) -> None:
64 """
65 Send an item to the peer(s).
67 :param item: the item to send
68 :raises ~anyio.ClosedResourceError: if the send stream has been explicitly
69 closed
70 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
71 due to external causes
72 """
75class UnreliableObjectStream(
76 UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item]
77):
78 """
79 A bidirectional message stream which does not guarantee the order or reliability of
80 message delivery.
81 """
84class ObjectReceiveStream(UnreliableObjectReceiveStream[T_co]):
85 """
86 A receive message stream which guarantees that messages are received in the same
87 order in which they were sent, and that no messages are missed.
88 """
91class ObjectSendStream(UnreliableObjectSendStream[T_contra]):
92 """
93 A send message stream which guarantees that messages are delivered in the same order
94 in which they were sent, without missing any messages in the middle.
95 """
98class ObjectStream(
99 ObjectReceiveStream[T_Item],
100 ObjectSendStream[T_Item],
101 UnreliableObjectStream[T_Item],
102):
103 """
104 A bidirectional message stream which guarantees the order and reliability of message
105 delivery.
106 """
108 @abstractmethod
109 async def send_eof(self) -> None:
110 """
111 Send an end-of-file indication to the peer.
113 You should not try to send any further data to this stream after calling this
114 method. This method is idempotent (does nothing on successive calls).
115 """
118class ByteReceiveStream(AsyncResource, TypedAttributeProvider):
119 """
120 An interface for receiving bytes from a single peer.
122 Iterating this byte stream will yield a byte string of arbitrary length, but no more
123 than 65536 bytes.
124 """
126 def __aiter__(self) -> ByteReceiveStream:
127 return self
129 async def __anext__(self) -> bytes:
130 try:
131 return await self.receive()
132 except EndOfStream:
133 raise StopAsyncIteration from None
135 @abstractmethod
136 async def receive(self, max_bytes: int = 65536) -> bytes:
137 """
138 Receive at most ``max_bytes`` bytes from the peer.
140 .. note:: Implementers of this interface should not return an empty
141 :class:`bytes` object, and users should ignore them.
143 :param max_bytes: maximum number of bytes to receive
144 :return: the received bytes
145 :raises ~anyio.EndOfStream: if this stream has been closed from the other end
146 """
149class ByteSendStream(AsyncResource, TypedAttributeProvider):
150 """An interface for sending bytes to a single peer."""
152 @abstractmethod
153 async def send(self, item: bytes) -> None:
154 """
155 Send the given bytes to the peer.
157 :param item: the bytes to send
158 """
161class ByteStream(ByteReceiveStream, ByteSendStream):
162 """A bidirectional byte stream."""
164 @abstractmethod
165 async def send_eof(self) -> None:
166 """
167 Send an end-of-file indication to the peer.
169 You should not try to send any further data to this stream after calling this
170 method. This method is idempotent (does nothing on successive calls).
171 """
174#: Type alias for all unreliable bytes-oriented receive streams.
175AnyUnreliableByteReceiveStream: TypeAlias = (
176 UnreliableObjectReceiveStream[bytes] | ByteReceiveStream
177)
178#: Type alias for all unreliable bytes-oriented send streams.
179AnyUnreliableByteSendStream: TypeAlias = (
180 UnreliableObjectSendStream[bytes] | ByteSendStream
181)
182#: Type alias for all unreliable bytes-oriented streams.
183AnyUnreliableByteStream: TypeAlias = UnreliableObjectStream[bytes] | ByteStream
184#: Type alias for all bytes-oriented receive streams.
185AnyByteReceiveStream: TypeAlias = ObjectReceiveStream[bytes] | ByteReceiveStream
186#: Type alias for all bytes-oriented send streams.
187AnyByteSendStream: TypeAlias = ObjectSendStream[bytes] | ByteSendStream
188#: Type alias for all bytes-oriented streams.
189AnyByteStream: TypeAlias = ObjectStream[bytes] | ByteStream
192class Listener(Generic[T_co], AsyncResource, TypedAttributeProvider):
193 """An interface for objects that let you accept incoming connections."""
195 @abstractmethod
196 async def serve(
197 self, handler: Callable[[T_co], Any], task_group: TaskGroup | None = None
198 ) -> None:
199 """
200 Accept incoming connections as they come in and start tasks to handle them.
202 :param handler: a callable that will be used to handle each accepted connection
203 :param task_group: the task group that will be used to start tasks for handling
204 each accepted connection (if omitted, an ad-hoc task group will be created)
205 """
208class ObjectStreamConnectable(Generic[T_co], metaclass=ABCMeta):
209 @abstractmethod
210 async def connect(self) -> ObjectStream[T_co]:
211 """
212 Connect to the remote endpoint.
214 :return: an object stream connected to the remote end
215 :raises ConnectionFailed: if the connection fails
216 """
219class ByteStreamConnectable(metaclass=ABCMeta):
220 @abstractmethod
221 async def connect(self) -> ByteStream:
222 """
223 Connect to the remote endpoint.
225 :return: a bytestream connected to the remote end
226 :raises ConnectionFailed: if the connection fails
227 """
230#: Type alias for all connectables returning bytestreams or bytes-oriented object streams
231AnyByteStreamConnectable: TypeAlias = (
232 ObjectStreamConnectable[bytes] | ByteStreamConnectable
233)