Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/abc/_streams.py: 81%
52 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1from abc import abstractmethod
2from typing import Any, Callable, Generic, Optional, TypeVar, Union
4from .._core._exceptions import EndOfStream
5from .._core._typedattr import TypedAttributeProvider
6from ._resources import AsyncResource
7from ._tasks import TaskGroup
9T_Item = TypeVar("T_Item")
10T_Stream = TypeVar("T_Stream")
13class UnreliableObjectReceiveStream(
14 Generic[T_Item], AsyncResource, TypedAttributeProvider
15):
16 """
17 An interface for receiving objects.
19 This interface makes no guarantees that the received messages arrive in the order in which they
20 were sent, or that no messages are missed.
22 Asynchronously iterating over objects of this type will yield objects matching the given type
23 parameter.
24 """
26 def __aiter__(self) -> "UnreliableObjectReceiveStream[T_Item]":
27 return self
29 async def __anext__(self) -> T_Item:
30 try:
31 return await self.receive()
32 except EndOfStream:
33 raise StopAsyncIteration
35 @abstractmethod
36 async def receive(self) -> T_Item:
37 """
38 Receive the next item.
40 :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly
41 closed
42 :raises ~anyio.EndOfStream: if this stream has been closed from the other end
43 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
44 due to external causes
45 """
48class UnreliableObjectSendStream(
49 Generic[T_Item], AsyncResource, TypedAttributeProvider
50):
51 """
52 An interface for sending objects.
54 This interface makes no guarantees that the messages sent will reach the recipient(s) in the
55 same order in which they were sent, or at all.
56 """
58 @abstractmethod
59 async def send(self, item: T_Item) -> None:
60 """
61 Send an item to the peer(s).
63 :param item: the item to send
64 :raises ~anyio.ClosedResourceError: if the send stream has been explicitly
65 closed
66 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
67 due to external causes
68 """
71class UnreliableObjectStream(
72 UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item]
73):
74 """
75 A bidirectional message stream which does not guarantee the order or reliability of message
76 delivery.
77 """
80class ObjectReceiveStream(UnreliableObjectReceiveStream[T_Item]):
81 """
82 A receive message stream which guarantees that messages are received in the same order in
83 which they were sent, and that no messages are missed.
84 """
87class ObjectSendStream(UnreliableObjectSendStream[T_Item]):
88 """
89 A send message stream which guarantees that messages are delivered in the same order in which
90 they were sent, without missing any messages in the middle.
91 """
94class ObjectStream(
95 ObjectReceiveStream[T_Item],
96 ObjectSendStream[T_Item],
97 UnreliableObjectStream[T_Item],
98):
99 """
100 A bidirectional message stream which guarantees the order and reliability of message delivery.
101 """
103 @abstractmethod
104 async def send_eof(self) -> None:
105 """
106 Send an end-of-file indication to the peer.
108 You should not try to send any further data to this stream after calling this method.
109 This method is idempotent (does nothing on successive calls).
110 """
113class ByteReceiveStream(AsyncResource, TypedAttributeProvider):
114 """
115 An interface for receiving bytes from a single peer.
117 Iterating this byte stream will yield a byte string of arbitrary length, but no more than
118 65536 bytes.
119 """
121 def __aiter__(self) -> "ByteReceiveStream":
122 return self
124 async def __anext__(self) -> bytes:
125 try:
126 return await self.receive()
127 except EndOfStream:
128 raise StopAsyncIteration
130 @abstractmethod
131 async def receive(self, max_bytes: int = 65536) -> bytes:
132 """
133 Receive at most ``max_bytes`` bytes from the peer.
135 .. note:: Implementors of this interface should not return an empty :class:`bytes` object,
136 and users should ignore them.
138 :param max_bytes: maximum number of bytes to receive
139 :return: the received bytes
140 :raises ~anyio.EndOfStream: if this stream has been closed from the other end
141 """
144class ByteSendStream(AsyncResource, TypedAttributeProvider):
145 """An interface for sending bytes to a single peer."""
147 @abstractmethod
148 async def send(self, item: bytes) -> None:
149 """
150 Send the given bytes to the peer.
152 :param item: the bytes to send
153 """
156class ByteStream(ByteReceiveStream, ByteSendStream):
157 """A bidirectional byte stream."""
159 @abstractmethod
160 async def send_eof(self) -> None:
161 """
162 Send an end-of-file indication to the peer.
164 You should not try to send any further data to this stream after calling this method.
165 This method is idempotent (does nothing on successive calls).
166 """
169#: Type alias for all unreliable bytes-oriented receive streams.
170AnyUnreliableByteReceiveStream = Union[
171 UnreliableObjectReceiveStream[bytes], ByteReceiveStream
172]
173#: Type alias for all unreliable bytes-oriented send streams.
174AnyUnreliableByteSendStream = Union[UnreliableObjectSendStream[bytes], ByteSendStream]
175#: Type alias for all unreliable bytes-oriented streams.
176AnyUnreliableByteStream = Union[UnreliableObjectStream[bytes], ByteStream]
177#: Type alias for all bytes-oriented receive streams.
178AnyByteReceiveStream = Union[ObjectReceiveStream[bytes], ByteReceiveStream]
179#: Type alias for all bytes-oriented send streams.
180AnyByteSendStream = Union[ObjectSendStream[bytes], ByteSendStream]
181#: Type alias for all bytes-oriented streams.
182AnyByteStream = Union[ObjectStream[bytes], ByteStream]
185class Listener(Generic[T_Stream], AsyncResource, TypedAttributeProvider):
186 """An interface for objects that let you accept incoming connections."""
188 @abstractmethod
189 async def serve(
190 self, handler: Callable[[T_Stream], Any], task_group: Optional[TaskGroup] = None
191 ) -> None:
192 """
193 Accept incoming connections as they come in and start tasks to handle them.
195 :param handler: a callable that will be used to handle each accepted connection
196 :param task_group: the task group that will be used to start tasks for handling each
197 accepted connection (if omitted, an ad-hoc task group will be created)
198 """