Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/abc/_streams.py: 81%

52 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1from abc import abstractmethod 

2from typing import Any, Callable, Generic, Optional, TypeVar, Union 

3 

4from .._core._exceptions import EndOfStream 

5from .._core._typedattr import TypedAttributeProvider 

6from ._resources import AsyncResource 

7from ._tasks import TaskGroup 

8 

9T_Item = TypeVar("T_Item") 

10T_Stream = TypeVar("T_Stream") 

11 

12 

13class UnreliableObjectReceiveStream( 

14 Generic[T_Item], AsyncResource, TypedAttributeProvider 

15): 

16 """ 

17 An interface for receiving objects. 

18 

19 This interface makes no guarantees that the received messages arrive in the order in which they 

20 were sent, or that no messages are missed. 

21 

22 Asynchronously iterating over objects of this type will yield objects matching the given type 

23 parameter. 

24 """ 

25 

26 def __aiter__(self) -> "UnreliableObjectReceiveStream[T_Item]": 

27 return self 

28 

29 async def __anext__(self) -> T_Item: 

30 try: 

31 return await self.receive() 

32 except EndOfStream: 

33 raise StopAsyncIteration 

34 

35 @abstractmethod 

36 async def receive(self) -> T_Item: 

37 """ 

38 Receive the next item. 

39 

40 :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly 

41 closed 

42 :raises ~anyio.EndOfStream: if this stream has been closed from the other end 

43 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable 

44 due to external causes 

45 """ 

46 

47 

48class UnreliableObjectSendStream( 

49 Generic[T_Item], AsyncResource, TypedAttributeProvider 

50): 

51 """ 

52 An interface for sending objects. 

53 

54 This interface makes no guarantees that the messages sent will reach the recipient(s) in the 

55 same order in which they were sent, or at all. 

56 """ 

57 

58 @abstractmethod 

59 async def send(self, item: T_Item) -> None: 

60 """ 

61 Send an item to the peer(s). 

62 

63 :param item: the item to send 

64 :raises ~anyio.ClosedResourceError: if the send stream has been explicitly 

65 closed 

66 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable 

67 due to external causes 

68 """ 

69 

70 

71class UnreliableObjectStream( 

72 UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item] 

73): 

74 """ 

75 A bidirectional message stream which does not guarantee the order or reliability of message 

76 delivery. 

77 """ 

78 

79 

80class ObjectReceiveStream(UnreliableObjectReceiveStream[T_Item]): 

81 """ 

82 A receive message stream which guarantees that messages are received in the same order in 

83 which they were sent, and that no messages are missed. 

84 """ 

85 

86 

87class ObjectSendStream(UnreliableObjectSendStream[T_Item]): 

88 """ 

89 A send message stream which guarantees that messages are delivered in the same order in which 

90 they were sent, without missing any messages in the middle. 

91 """ 

92 

93 

94class ObjectStream( 

95 ObjectReceiveStream[T_Item], 

96 ObjectSendStream[T_Item], 

97 UnreliableObjectStream[T_Item], 

98): 

99 """ 

100 A bidirectional message stream which guarantees the order and reliability of message delivery. 

101 """ 

102 

103 @abstractmethod 

104 async def send_eof(self) -> None: 

105 """ 

106 Send an end-of-file indication to the peer. 

107 

108 You should not try to send any further data to this stream after calling this method. 

109 This method is idempotent (does nothing on successive calls). 

110 """ 

111 

112 

113class ByteReceiveStream(AsyncResource, TypedAttributeProvider): 

114 """ 

115 An interface for receiving bytes from a single peer. 

116 

117 Iterating this byte stream will yield a byte string of arbitrary length, but no more than 

118 65536 bytes. 

119 """ 

120 

121 def __aiter__(self) -> "ByteReceiveStream": 

122 return self 

123 

124 async def __anext__(self) -> bytes: 

125 try: 

126 return await self.receive() 

127 except EndOfStream: 

128 raise StopAsyncIteration 

129 

130 @abstractmethod 

131 async def receive(self, max_bytes: int = 65536) -> bytes: 

132 """ 

133 Receive at most ``max_bytes`` bytes from the peer. 

134 

135 .. note:: Implementors of this interface should not return an empty :class:`bytes` object, 

136 and users should ignore them. 

137 

138 :param max_bytes: maximum number of bytes to receive 

139 :return: the received bytes 

140 :raises ~anyio.EndOfStream: if this stream has been closed from the other end 

141 """ 

142 

143 

144class ByteSendStream(AsyncResource, TypedAttributeProvider): 

145 """An interface for sending bytes to a single peer.""" 

146 

147 @abstractmethod 

148 async def send(self, item: bytes) -> None: 

149 """ 

150 Send the given bytes to the peer. 

151 

152 :param item: the bytes to send 

153 """ 

154 

155 

156class ByteStream(ByteReceiveStream, ByteSendStream): 

157 """A bidirectional byte stream.""" 

158 

159 @abstractmethod 

160 async def send_eof(self) -> None: 

161 """ 

162 Send an end-of-file indication to the peer. 

163 

164 You should not try to send any further data to this stream after calling this method. 

165 This method is idempotent (does nothing on successive calls). 

166 """ 

167 

168 

169#: Type alias for all unreliable bytes-oriented receive streams. 

170AnyUnreliableByteReceiveStream = Union[ 

171 UnreliableObjectReceiveStream[bytes], ByteReceiveStream 

172] 

173#: Type alias for all unreliable bytes-oriented send streams. 

174AnyUnreliableByteSendStream = Union[UnreliableObjectSendStream[bytes], ByteSendStream] 

175#: Type alias for all unreliable bytes-oriented streams. 

176AnyUnreliableByteStream = Union[UnreliableObjectStream[bytes], ByteStream] 

177#: Type alias for all bytes-oriented receive streams. 

178AnyByteReceiveStream = Union[ObjectReceiveStream[bytes], ByteReceiveStream] 

179#: Type alias for all bytes-oriented send streams. 

180AnyByteSendStream = Union[ObjectSendStream[bytes], ByteSendStream] 

181#: Type alias for all bytes-oriented streams. 

182AnyByteStream = Union[ObjectStream[bytes], ByteStream] 

183 

184 

185class Listener(Generic[T_Stream], AsyncResource, TypedAttributeProvider): 

186 """An interface for objects that let you accept incoming connections.""" 

187 

188 @abstractmethod 

189 async def serve( 

190 self, handler: Callable[[T_Stream], Any], task_group: Optional[TaskGroup] = None 

191 ) -> None: 

192 """ 

193 Accept incoming connections as they come in and start tasks to handle them. 

194 

195 :param handler: a callable that will be used to handle each accepted connection 

196 :param task_group: the task group that will be used to start tasks for handling each 

197 accepted connection (if omitted, an ad-hoc task group will be created) 

198 """