Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_streams.py: 83%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

66 statements  

1from __future__ import annotations 

2 

3import sys 

4from abc import ABCMeta, abstractmethod 

5from collections.abc import Callable 

6from typing import Any, Generic, TypeVar, Union 

7 

8from .._core._exceptions import EndOfStream 

9from .._core._typedattr import TypedAttributeProvider 

10from ._resources import AsyncResource 

11from ._tasks import TaskGroup 

12 

13if sys.version_info >= (3, 10): 

14 from typing import TypeAlias 

15else: 

16 from typing_extensions import TypeAlias 

17 

18T_Item = TypeVar("T_Item") 

19T_co = TypeVar("T_co", covariant=True) 

20T_contra = TypeVar("T_contra", contravariant=True) 

21 

22 

23class UnreliableObjectReceiveStream( 

24 Generic[T_co], AsyncResource, TypedAttributeProvider 

25): 

26 """ 

27 An interface for receiving objects. 

28 

29 This interface makes no guarantees that the received messages arrive in the order in 

30 which they were sent, or that no messages are missed. 

31 

32 Asynchronously iterating over objects of this type will yield objects matching the 

33 given type parameter. 

34 """ 

35 

36 def __aiter__(self) -> UnreliableObjectReceiveStream[T_co]: 

37 return self 

38 

39 async def __anext__(self) -> T_co: 

40 try: 

41 return await self.receive() 

42 except EndOfStream: 

43 raise StopAsyncIteration from None 

44 

45 @abstractmethod 

46 async def receive(self) -> T_co: 

47 """ 

48 Receive the next item. 

49 

50 :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly 

51 closed 

52 :raises ~anyio.EndOfStream: if this stream has been closed from the other end 

53 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable 

54 due to external causes 

55 """ 

56 

57 

58class UnreliableObjectSendStream( 

59 Generic[T_contra], AsyncResource, TypedAttributeProvider 

60): 

61 """ 

62 An interface for sending objects. 

63 

64 This interface makes no guarantees that the messages sent will reach the 

65 recipient(s) in the same order in which they were sent, or at all. 

66 """ 

67 

68 @abstractmethod 

69 async def send(self, item: T_contra) -> None: 

70 """ 

71 Send an item to the peer(s). 

72 

73 :param item: the item to send 

74 :raises ~anyio.ClosedResourceError: if the send stream has been explicitly 

75 closed 

76 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable 

77 due to external causes 

78 """ 

79 

80 

81class UnreliableObjectStream( 

82 UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item] 

83): 

84 """ 

85 A bidirectional message stream which does not guarantee the order or reliability of 

86 message delivery. 

87 """ 

88 

89 

90class ObjectReceiveStream(UnreliableObjectReceiveStream[T_co]): 

91 """ 

92 A receive message stream which guarantees that messages are received in the same 

93 order in which they were sent, and that no messages are missed. 

94 """ 

95 

96 

97class ObjectSendStream(UnreliableObjectSendStream[T_contra]): 

98 """ 

99 A send message stream which guarantees that messages are delivered in the same order 

100 in which they were sent, without missing any messages in the middle. 

101 """ 

102 

103 

104class ObjectStream( 

105 ObjectReceiveStream[T_Item], 

106 ObjectSendStream[T_Item], 

107 UnreliableObjectStream[T_Item], 

108): 

109 """ 

110 A bidirectional message stream which guarantees the order and reliability of message 

111 delivery. 

112 """ 

113 

114 @abstractmethod 

115 async def send_eof(self) -> None: 

116 """ 

117 Send an end-of-file indication to the peer. 

118 

119 You should not try to send any further data to this stream after calling this 

120 method. This method is idempotent (does nothing on successive calls). 

121 """ 

122 

123 

124class ByteReceiveStream(AsyncResource, TypedAttributeProvider): 

125 """ 

126 An interface for receiving bytes from a single peer. 

127 

128 Iterating this byte stream will yield a byte string of arbitrary length, but no more 

129 than 65536 bytes. 

130 """ 

131 

132 def __aiter__(self) -> ByteReceiveStream: 

133 return self 

134 

135 async def __anext__(self) -> bytes: 

136 try: 

137 return await self.receive() 

138 except EndOfStream: 

139 raise StopAsyncIteration from None 

140 

141 @abstractmethod 

142 async def receive(self, max_bytes: int = 65536) -> bytes: 

143 """ 

144 Receive at most ``max_bytes`` bytes from the peer. 

145 

146 .. note:: Implementers of this interface should not return an empty 

147 :class:`bytes` object, and users should ignore them. 

148 

149 :param max_bytes: maximum number of bytes to receive 

150 :return: the received bytes 

151 :raises ~anyio.EndOfStream: if this stream has been closed from the other end 

152 """ 

153 

154 

155class ByteSendStream(AsyncResource, TypedAttributeProvider): 

156 """An interface for sending bytes to a single peer.""" 

157 

158 @abstractmethod 

159 async def send(self, item: bytes) -> None: 

160 """ 

161 Send the given bytes to the peer. 

162 

163 :param item: the bytes to send 

164 """ 

165 

166 

167class ByteStream(ByteReceiveStream, ByteSendStream): 

168 """A bidirectional byte stream.""" 

169 

170 @abstractmethod 

171 async def send_eof(self) -> None: 

172 """ 

173 Send an end-of-file indication to the peer. 

174 

175 You should not try to send any further data to this stream after calling this 

176 method. This method is idempotent (does nothing on successive calls). 

177 """ 

178 

179 

180#: Type alias for all unreliable bytes-oriented receive streams. 

181AnyUnreliableByteReceiveStream: TypeAlias = Union[ 

182 UnreliableObjectReceiveStream[bytes], ByteReceiveStream 

183] 

184#: Type alias for all unreliable bytes-oriented send streams. 

185AnyUnreliableByteSendStream: TypeAlias = Union[ 

186 UnreliableObjectSendStream[bytes], ByteSendStream 

187] 

188#: Type alias for all unreliable bytes-oriented streams. 

189AnyUnreliableByteStream: TypeAlias = Union[UnreliableObjectStream[bytes], ByteStream] 

190#: Type alias for all bytes-oriented receive streams. 

191AnyByteReceiveStream: TypeAlias = Union[ObjectReceiveStream[bytes], ByteReceiveStream] 

192#: Type alias for all bytes-oriented send streams. 

193AnyByteSendStream: TypeAlias = Union[ObjectSendStream[bytes], ByteSendStream] 

194#: Type alias for all bytes-oriented streams. 

195AnyByteStream: TypeAlias = Union[ObjectStream[bytes], ByteStream] 

196 

197 

198class Listener(Generic[T_co], AsyncResource, TypedAttributeProvider): 

199 """An interface for objects that let you accept incoming connections.""" 

200 

201 @abstractmethod 

202 async def serve( 

203 self, handler: Callable[[T_co], Any], task_group: TaskGroup | None = None 

204 ) -> None: 

205 """ 

206 Accept incoming connections as they come in and start tasks to handle them. 

207 

208 :param handler: a callable that will be used to handle each accepted connection 

209 :param task_group: the task group that will be used to start tasks for handling 

210 each accepted connection (if omitted, an ad-hoc task group will be created) 

211 """ 

212 

213 

214class ObjectStreamConnectable(Generic[T_co], metaclass=ABCMeta): 

215 @abstractmethod 

216 async def connect(self) -> ObjectStream[T_co]: 

217 """ 

218 Connect to the remote endpoint. 

219 

220 :return: an object stream connected to the remote end 

221 :raises ConnectionFailed: if the connection fails 

222 """ 

223 

224 

225class ByteStreamConnectable(metaclass=ABCMeta): 

226 @abstractmethod 

227 async def connect(self) -> ByteStream: 

228 """ 

229 Connect to the remote endpoint. 

230 

231 :return: a bytestream connected to the remote end 

232 :raises ConnectionFailed: if the connection fails 

233 """ 

234 

235 

236#: Type alias for all connectables returning bytestreams or bytes-oriented object streams 

237AnyByteStreamConnectable: TypeAlias = Union[ 

238 ObjectStreamConnectable[bytes], ByteStreamConnectable 

239]