Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_streams.py: 84%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

62 statements  

1from __future__ import annotations 

2 

3from abc import ABCMeta, abstractmethod 

4from collections.abc import Callable 

5from typing import Any, Generic, TypeAlias, TypeVar 

6 

7from .._core._exceptions import EndOfStream 

8from .._core._typedattr import TypedAttributeProvider 

9from ._resources import AsyncResource 

10from ._tasks import TaskGroup 

11 

12T_Item = TypeVar("T_Item") 

13T_co = TypeVar("T_co", covariant=True) 

14T_contra = TypeVar("T_contra", contravariant=True) 

15 

16 

17class UnreliableObjectReceiveStream( 

18 Generic[T_co], AsyncResource, TypedAttributeProvider 

19): 

20 """ 

21 An interface for receiving objects. 

22 

23 This interface makes no guarantees that the received messages arrive in the order in 

24 which they were sent, or that no messages are missed. 

25 

26 Asynchronously iterating over objects of this type will yield objects matching the 

27 given type parameter. 

28 """ 

29 

30 def __aiter__(self) -> UnreliableObjectReceiveStream[T_co]: 

31 return self 

32 

33 async def __anext__(self) -> T_co: 

34 try: 

35 return await self.receive() 

36 except EndOfStream: 

37 raise StopAsyncIteration from None 

38 

39 @abstractmethod 

40 async def receive(self) -> T_co: 

41 """ 

42 Receive the next item. 

43 

44 :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly 

45 closed 

46 :raises ~anyio.EndOfStream: if this stream has been closed from the other end 

47 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable 

48 due to external causes 

49 """ 

50 

51 

52class UnreliableObjectSendStream( 

53 Generic[T_contra], AsyncResource, TypedAttributeProvider 

54): 

55 """ 

56 An interface for sending objects. 

57 

58 This interface makes no guarantees that the messages sent will reach the 

59 recipient(s) in the same order in which they were sent, or at all. 

60 """ 

61 

62 @abstractmethod 

63 async def send(self, item: T_contra) -> None: 

64 """ 

65 Send an item to the peer(s). 

66 

67 :param item: the item to send 

68 :raises ~anyio.ClosedResourceError: if the send stream has been explicitly 

69 closed 

70 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable 

71 due to external causes 

72 """ 

73 

74 

75class UnreliableObjectStream( 

76 UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item] 

77): 

78 """ 

79 A bidirectional message stream which does not guarantee the order or reliability of 

80 message delivery. 

81 """ 

82 

83 

84class ObjectReceiveStream(UnreliableObjectReceiveStream[T_co]): 

85 """ 

86 A receive message stream which guarantees that messages are received in the same 

87 order in which they were sent, and that no messages are missed. 

88 """ 

89 

90 

91class ObjectSendStream(UnreliableObjectSendStream[T_contra]): 

92 """ 

93 A send message stream which guarantees that messages are delivered in the same order 

94 in which they were sent, without missing any messages in the middle. 

95 """ 

96 

97 

98class ObjectStream( 

99 ObjectReceiveStream[T_Item], 

100 ObjectSendStream[T_Item], 

101 UnreliableObjectStream[T_Item], 

102): 

103 """ 

104 A bidirectional message stream which guarantees the order and reliability of message 

105 delivery. 

106 """ 

107 

108 @abstractmethod 

109 async def send_eof(self) -> None: 

110 """ 

111 Send an end-of-file indication to the peer. 

112 

113 You should not try to send any further data to this stream after calling this 

114 method. This method is idempotent (does nothing on successive calls). 

115 """ 

116 

117 

118class ByteReceiveStream(AsyncResource, TypedAttributeProvider): 

119 """ 

120 An interface for receiving bytes from a single peer. 

121 

122 Iterating this byte stream will yield a byte string of arbitrary length, but no more 

123 than 65536 bytes. 

124 """ 

125 

126 def __aiter__(self) -> ByteReceiveStream: 

127 return self 

128 

129 async def __anext__(self) -> bytes: 

130 try: 

131 return await self.receive() 

132 except EndOfStream: 

133 raise StopAsyncIteration from None 

134 

135 @abstractmethod 

136 async def receive(self, max_bytes: int = 65536) -> bytes: 

137 """ 

138 Receive at most ``max_bytes`` bytes from the peer. 

139 

140 .. note:: Implementers of this interface should not return an empty 

141 :class:`bytes` object, and users should ignore them. 

142 

143 :param max_bytes: maximum number of bytes to receive 

144 :return: the received bytes 

145 :raises ~anyio.EndOfStream: if this stream has been closed from the other end 

146 """ 

147 

148 

149class ByteSendStream(AsyncResource, TypedAttributeProvider): 

150 """An interface for sending bytes to a single peer.""" 

151 

152 @abstractmethod 

153 async def send(self, item: bytes) -> None: 

154 """ 

155 Send the given bytes to the peer. 

156 

157 :param item: the bytes to send 

158 """ 

159 

160 

161class ByteStream(ByteReceiveStream, ByteSendStream): 

162 """A bidirectional byte stream.""" 

163 

164 @abstractmethod 

165 async def send_eof(self) -> None: 

166 """ 

167 Send an end-of-file indication to the peer. 

168 

169 You should not try to send any further data to this stream after calling this 

170 method. This method is idempotent (does nothing on successive calls). 

171 """ 

172 

173 

174#: Type alias for all unreliable bytes-oriented receive streams. 

175AnyUnreliableByteReceiveStream: TypeAlias = ( 

176 UnreliableObjectReceiveStream[bytes] | ByteReceiveStream 

177) 

178#: Type alias for all unreliable bytes-oriented send streams. 

179AnyUnreliableByteSendStream: TypeAlias = ( 

180 UnreliableObjectSendStream[bytes] | ByteSendStream 

181) 

182#: Type alias for all unreliable bytes-oriented streams. 

183AnyUnreliableByteStream: TypeAlias = UnreliableObjectStream[bytes] | ByteStream 

184#: Type alias for all bytes-oriented receive streams. 

185AnyByteReceiveStream: TypeAlias = ObjectReceiveStream[bytes] | ByteReceiveStream 

186#: Type alias for all bytes-oriented send streams. 

187AnyByteSendStream: TypeAlias = ObjectSendStream[bytes] | ByteSendStream 

188#: Type alias for all bytes-oriented streams. 

189AnyByteStream: TypeAlias = ObjectStream[bytes] | ByteStream 

190 

191 

192class Listener(Generic[T_co], AsyncResource, TypedAttributeProvider): 

193 """An interface for objects that let you accept incoming connections.""" 

194 

195 @abstractmethod 

196 async def serve( 

197 self, handler: Callable[[T_co], Any], task_group: TaskGroup | None = None 

198 ) -> None: 

199 """ 

200 Accept incoming connections as they come in and start tasks to handle them. 

201 

202 :param handler: a callable that will be used to handle each accepted connection 

203 :param task_group: the task group that will be used to start tasks for handling 

204 each accepted connection (if omitted, an ad-hoc task group will be created) 

205 """ 

206 

207 

208class ObjectStreamConnectable(Generic[T_co], metaclass=ABCMeta): 

209 @abstractmethod 

210 async def connect(self) -> ObjectStream[T_co]: 

211 """ 

212 Connect to the remote endpoint. 

213 

214 :return: an object stream connected to the remote end 

215 :raises ConnectionFailed: if the connection fails 

216 """ 

217 

218 

219class ByteStreamConnectable(metaclass=ABCMeta): 

220 @abstractmethod 

221 async def connect(self) -> ByteStream: 

222 """ 

223 Connect to the remote endpoint. 

224 

225 :return: a bytestream connected to the remote end 

226 :raises ConnectionFailed: if the connection fails 

227 """ 

228 

229 

230#: Type alias for all connectables returning bytestreams or bytes-oriented object streams 

231AnyByteStreamConnectable: TypeAlias = ( 

232 ObjectStreamConnectable[bytes] | ByteStreamConnectable 

233)