Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/abc/_streams.py: 82%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

55 statements  

1from __future__ import annotations 

2 

3from abc import abstractmethod 

4from collections.abc import Callable 

5from typing import Any, Generic, TypeVar, Union 

6 

7from .._core._exceptions import EndOfStream 

8from .._core._typedattr import TypedAttributeProvider 

9from ._resources import AsyncResource 

10from ._tasks import TaskGroup 

11 

12T_Item = TypeVar("T_Item") 

13T_co = TypeVar("T_co", covariant=True) 

14T_contra = TypeVar("T_contra", contravariant=True) 

15 

16 

17class UnreliableObjectReceiveStream( 

18 Generic[T_co], AsyncResource, TypedAttributeProvider 

19): 

20 """ 

21 An interface for receiving objects. 

22 

23 This interface makes no guarantees that the received messages arrive in the order in 

24 which they were sent, or that no messages are missed. 

25 

26 Asynchronously iterating over objects of this type will yield objects matching the 

27 given type parameter. 

28 """ 

29 

30 def __aiter__(self) -> UnreliableObjectReceiveStream[T_co]: 

31 return self 

32 

33 async def __anext__(self) -> T_co: 

34 try: 

35 return await self.receive() 

36 except EndOfStream: 

37 raise StopAsyncIteration 

38 

39 @abstractmethod 

40 async def receive(self) -> T_co: 

41 """ 

42 Receive the next item. 

43 

44 :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly 

45 closed 

46 :raises ~anyio.EndOfStream: if this stream has been closed from the other end 

47 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable 

48 due to external causes 

49 """ 

50 

51 

52class UnreliableObjectSendStream( 

53 Generic[T_contra], AsyncResource, TypedAttributeProvider 

54): 

55 """ 

56 An interface for sending objects. 

57 

58 This interface makes no guarantees that the messages sent will reach the 

59 recipient(s) in the same order in which they were sent, or at all. 

60 """ 

61 

62 @abstractmethod 

63 async def send(self, item: T_contra) -> None: 

64 """ 

65 Send an item to the peer(s). 

66 

67 :param item: the item to send 

68 :raises ~anyio.ClosedResourceError: if the send stream has been explicitly 

69 closed 

70 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable 

71 due to external causes 

72 """ 

73 

74 

75class UnreliableObjectStream( 

76 UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item] 

77): 

78 """ 

79 A bidirectional message stream which does not guarantee the order or reliability of 

80 message delivery. 

81 """ 

82 

83 

84class ObjectReceiveStream(UnreliableObjectReceiveStream[T_co]): 

85 """ 

86 A receive message stream which guarantees that messages are received in the same 

87 order in which they were sent, and that no messages are missed. 

88 """ 

89 

90 

91class ObjectSendStream(UnreliableObjectSendStream[T_contra]): 

92 """ 

93 A send message stream which guarantees that messages are delivered in the same order 

94 in which they were sent, without missing any messages in the middle. 

95 """ 

96 

97 

98class ObjectStream( 

99 ObjectReceiveStream[T_Item], 

100 ObjectSendStream[T_Item], 

101 UnreliableObjectStream[T_Item], 

102): 

103 """ 

104 A bidirectional message stream which guarantees the order and reliability of message 

105 delivery. 

106 """ 

107 

108 @abstractmethod 

109 async def send_eof(self) -> None: 

110 """ 

111 Send an end-of-file indication to the peer. 

112 

113 You should not try to send any further data to this stream after calling this 

114 method. This method is idempotent (does nothing on successive calls). 

115 """ 

116 

117 

118class ByteReceiveStream(AsyncResource, TypedAttributeProvider): 

119 """ 

120 An interface for receiving bytes from a single peer. 

121 

122 Iterating this byte stream will yield a byte string of arbitrary length, but no more 

123 than 65536 bytes. 

124 """ 

125 

126 def __aiter__(self) -> ByteReceiveStream: 

127 return self 

128 

129 async def __anext__(self) -> bytes: 

130 try: 

131 return await self.receive() 

132 except EndOfStream: 

133 raise StopAsyncIteration 

134 

135 @abstractmethod 

136 async def receive(self, max_bytes: int = 65536) -> bytes: 

137 """ 

138 Receive at most ``max_bytes`` bytes from the peer. 

139 

140 .. note:: Implementers of this interface should not return an empty 

141 :class:`bytes` object, and users should ignore them. 

142 

143 :param max_bytes: maximum number of bytes to receive 

144 :return: the received bytes 

145 :raises ~anyio.EndOfStream: if this stream has been closed from the other end 

146 """ 

147 

148 

149class ByteSendStream(AsyncResource, TypedAttributeProvider): 

150 """An interface for sending bytes to a single peer.""" 

151 

152 @abstractmethod 

153 async def send(self, item: bytes) -> None: 

154 """ 

155 Send the given bytes to the peer. 

156 

157 :param item: the bytes to send 

158 """ 

159 

160 

161class ByteStream(ByteReceiveStream, ByteSendStream): 

162 """A bidirectional byte stream.""" 

163 

164 @abstractmethod 

165 async def send_eof(self) -> None: 

166 """ 

167 Send an end-of-file indication to the peer. 

168 

169 You should not try to send any further data to this stream after calling this 

170 method. This method is idempotent (does nothing on successive calls). 

171 """ 

172 

173 

174#: Type alias for all unreliable bytes-oriented receive streams. 

175AnyUnreliableByteReceiveStream = Union[ 

176 UnreliableObjectReceiveStream[bytes], ByteReceiveStream 

177] 

178#: Type alias for all unreliable bytes-oriented send streams. 

179AnyUnreliableByteSendStream = Union[UnreliableObjectSendStream[bytes], ByteSendStream] 

180#: Type alias for all unreliable bytes-oriented streams. 

181AnyUnreliableByteStream = Union[UnreliableObjectStream[bytes], ByteStream] 

182#: Type alias for all bytes-oriented receive streams. 

183AnyByteReceiveStream = Union[ObjectReceiveStream[bytes], ByteReceiveStream] 

184#: Type alias for all bytes-oriented send streams. 

185AnyByteSendStream = Union[ObjectSendStream[bytes], ByteSendStream] 

186#: Type alias for all bytes-oriented streams. 

187AnyByteStream = Union[ObjectStream[bytes], ByteStream] 

188 

189 

190class Listener(Generic[T_co], AsyncResource, TypedAttributeProvider): 

191 """An interface for objects that let you accept incoming connections.""" 

192 

193 @abstractmethod 

194 async def serve( 

195 self, handler: Callable[[T_co], Any], task_group: TaskGroup | None = None 

196 ) -> None: 

197 """ 

198 Accept incoming connections as they come in and start tasks to handle them. 

199 

200 :param handler: a callable that will be used to handle each accepted connection 

201 :param task_group: the task group that will be used to start tasks for handling 

202 each accepted connection (if omitted, an ad-hoc task group will be created) 

203 """