Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/abc/_streams.py: 81%
54 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1from __future__ import annotations
3from abc import abstractmethod
4from typing import Any, Callable, Generic, TypeVar, Union
6from .._core._exceptions import EndOfStream
7from .._core._typedattr import TypedAttributeProvider
8from ._resources import AsyncResource
9from ._tasks import TaskGroup
11T_Item = TypeVar("T_Item")
12T_co = TypeVar("T_co", covariant=True)
13T_contra = TypeVar("T_contra", contravariant=True)
16class UnreliableObjectReceiveStream(
17 Generic[T_co], AsyncResource, TypedAttributeProvider
18):
19 """
20 An interface for receiving objects.
22 This interface makes no guarantees that the received messages arrive in the order in which they
23 were sent, or that no messages are missed.
25 Asynchronously iterating over objects of this type will yield objects matching the given type
26 parameter.
27 """
29 def __aiter__(self) -> UnreliableObjectReceiveStream[T_co]:
30 return self
32 async def __anext__(self) -> T_co:
33 try:
34 return await self.receive()
35 except EndOfStream:
36 raise StopAsyncIteration
38 @abstractmethod
39 async def receive(self) -> T_co:
40 """
41 Receive the next item.
43 :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly
44 closed
45 :raises ~anyio.EndOfStream: if this stream has been closed from the other end
46 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
47 due to external causes
48 """
51class UnreliableObjectSendStream(
52 Generic[T_contra], AsyncResource, TypedAttributeProvider
53):
54 """
55 An interface for sending objects.
57 This interface makes no guarantees that the messages sent will reach the recipient(s) in the
58 same order in which they were sent, or at all.
59 """
61 @abstractmethod
62 async def send(self, item: T_contra) -> None:
63 """
64 Send an item to the peer(s).
66 :param item: the item to send
67 :raises ~anyio.ClosedResourceError: if the send stream has been explicitly
68 closed
69 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
70 due to external causes
71 """
74class UnreliableObjectStream(
75 UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item]
76):
77 """
78 A bidirectional message stream which does not guarantee the order or reliability of message
79 delivery.
80 """
83class ObjectReceiveStream(UnreliableObjectReceiveStream[T_co]):
84 """
85 A receive message stream which guarantees that messages are received in the same order in
86 which they were sent, and that no messages are missed.
87 """
90class ObjectSendStream(UnreliableObjectSendStream[T_contra]):
91 """
92 A send message stream which guarantees that messages are delivered in the same order in which
93 they were sent, without missing any messages in the middle.
94 """
97class ObjectStream(
98 ObjectReceiveStream[T_Item],
99 ObjectSendStream[T_Item],
100 UnreliableObjectStream[T_Item],
101):
102 """
103 A bidirectional message stream which guarantees the order and reliability of message delivery.
104 """
106 @abstractmethod
107 async def send_eof(self) -> None:
108 """
109 Send an end-of-file indication to the peer.
111 You should not try to send any further data to this stream after calling this method.
112 This method is idempotent (does nothing on successive calls).
113 """
116class ByteReceiveStream(AsyncResource, TypedAttributeProvider):
117 """
118 An interface for receiving bytes from a single peer.
120 Iterating this byte stream will yield a byte string of arbitrary length, but no more than
121 65536 bytes.
122 """
124 def __aiter__(self) -> ByteReceiveStream:
125 return self
127 async def __anext__(self) -> bytes:
128 try:
129 return await self.receive()
130 except EndOfStream:
131 raise StopAsyncIteration
133 @abstractmethod
134 async def receive(self, max_bytes: int = 65536) -> bytes:
135 """
136 Receive at most ``max_bytes`` bytes from the peer.
138 .. note:: Implementors of this interface should not return an empty :class:`bytes` object,
139 and users should ignore them.
141 :param max_bytes: maximum number of bytes to receive
142 :return: the received bytes
143 :raises ~anyio.EndOfStream: if this stream has been closed from the other end
144 """
147class ByteSendStream(AsyncResource, TypedAttributeProvider):
148 """An interface for sending bytes to a single peer."""
150 @abstractmethod
151 async def send(self, item: bytes) -> None:
152 """
153 Send the given bytes to the peer.
155 :param item: the bytes to send
156 """
159class ByteStream(ByteReceiveStream, ByteSendStream):
160 """A bidirectional byte stream."""
162 @abstractmethod
163 async def send_eof(self) -> None:
164 """
165 Send an end-of-file indication to the peer.
167 You should not try to send any further data to this stream after calling this method.
168 This method is idempotent (does nothing on successive calls).
169 """
172#: Type alias for all unreliable bytes-oriented receive streams.
173AnyUnreliableByteReceiveStream = Union[
174 UnreliableObjectReceiveStream[bytes], ByteReceiveStream
175]
176#: Type alias for all unreliable bytes-oriented send streams.
177AnyUnreliableByteSendStream = Union[UnreliableObjectSendStream[bytes], ByteSendStream]
178#: Type alias for all unreliable bytes-oriented streams.
179AnyUnreliableByteStream = Union[UnreliableObjectStream[bytes], ByteStream]
180#: Type alias for all bytes-oriented receive streams.
181AnyByteReceiveStream = Union[ObjectReceiveStream[bytes], ByteReceiveStream]
182#: Type alias for all bytes-oriented send streams.
183AnyByteSendStream = Union[ObjectSendStream[bytes], ByteSendStream]
184#: Type alias for all bytes-oriented streams.
185AnyByteStream = Union[ObjectStream[bytes], ByteStream]
188class Listener(Generic[T_co], AsyncResource, TypedAttributeProvider):
189 """An interface for objects that let you accept incoming connections."""
191 @abstractmethod
192 async def serve(
193 self,
194 handler: Callable[[T_co], Any],
195 task_group: TaskGroup | None = None,
196 ) -> None:
197 """
198 Accept incoming connections as they come in and start tasks to handle them.
200 :param handler: a callable that will be used to handle each accepted connection
201 :param task_group: the task group that will be used to start tasks for handling each
202 accepted connection (if omitted, an ad-hoc task group will be created)
203 """