Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/abc/_streams.py: 81%

54 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1from __future__ import annotations 

2 

3from abc import abstractmethod 

4from typing import Any, Callable, Generic, TypeVar, Union 

5 

6from .._core._exceptions import EndOfStream 

7from .._core._typedattr import TypedAttributeProvider 

8from ._resources import AsyncResource 

9from ._tasks import TaskGroup 

10 

11T_Item = TypeVar("T_Item") 

12T_co = TypeVar("T_co", covariant=True) 

13T_contra = TypeVar("T_contra", contravariant=True) 

14 

15 

16class UnreliableObjectReceiveStream( 

17 Generic[T_co], AsyncResource, TypedAttributeProvider 

18): 

19 """ 

20 An interface for receiving objects. 

21 

22 This interface makes no guarantees that the received messages arrive in the order in which they 

23 were sent, or that no messages are missed. 

24 

25 Asynchronously iterating over objects of this type will yield objects matching the given type 

26 parameter. 

27 """ 

28 

29 def __aiter__(self) -> UnreliableObjectReceiveStream[T_co]: 

30 return self 

31 

32 async def __anext__(self) -> T_co: 

33 try: 

34 return await self.receive() 

35 except EndOfStream: 

36 raise StopAsyncIteration 

37 

38 @abstractmethod 

39 async def receive(self) -> T_co: 

40 """ 

41 Receive the next item. 

42 

43 :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly 

44 closed 

45 :raises ~anyio.EndOfStream: if this stream has been closed from the other end 

46 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable 

47 due to external causes 

48 """ 

49 

50 

51class UnreliableObjectSendStream( 

52 Generic[T_contra], AsyncResource, TypedAttributeProvider 

53): 

54 """ 

55 An interface for sending objects. 

56 

57 This interface makes no guarantees that the messages sent will reach the recipient(s) in the 

58 same order in which they were sent, or at all. 

59 """ 

60 

61 @abstractmethod 

62 async def send(self, item: T_contra) -> None: 

63 """ 

64 Send an item to the peer(s). 

65 

66 :param item: the item to send 

67 :raises ~anyio.ClosedResourceError: if the send stream has been explicitly 

68 closed 

69 :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable 

70 due to external causes 

71 """ 

72 

73 

74class UnreliableObjectStream( 

75 UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item] 

76): 

77 """ 

78 A bidirectional message stream which does not guarantee the order or reliability of message 

79 delivery. 

80 """ 

81 

82 

83class ObjectReceiveStream(UnreliableObjectReceiveStream[T_co]): 

84 """ 

85 A receive message stream which guarantees that messages are received in the same order in 

86 which they were sent, and that no messages are missed. 

87 """ 

88 

89 

90class ObjectSendStream(UnreliableObjectSendStream[T_contra]): 

91 """ 

92 A send message stream which guarantees that messages are delivered in the same order in which 

93 they were sent, without missing any messages in the middle. 

94 """ 

95 

96 

97class ObjectStream( 

98 ObjectReceiveStream[T_Item], 

99 ObjectSendStream[T_Item], 

100 UnreliableObjectStream[T_Item], 

101): 

102 """ 

103 A bidirectional message stream which guarantees the order and reliability of message delivery. 

104 """ 

105 

106 @abstractmethod 

107 async def send_eof(self) -> None: 

108 """ 

109 Send an end-of-file indication to the peer. 

110 

111 You should not try to send any further data to this stream after calling this method. 

112 This method is idempotent (does nothing on successive calls). 

113 """ 

114 

115 

116class ByteReceiveStream(AsyncResource, TypedAttributeProvider): 

117 """ 

118 An interface for receiving bytes from a single peer. 

119 

120 Iterating this byte stream will yield a byte string of arbitrary length, but no more than 

121 65536 bytes. 

122 """ 

123 

124 def __aiter__(self) -> ByteReceiveStream: 

125 return self 

126 

127 async def __anext__(self) -> bytes: 

128 try: 

129 return await self.receive() 

130 except EndOfStream: 

131 raise StopAsyncIteration 

132 

133 @abstractmethod 

134 async def receive(self, max_bytes: int = 65536) -> bytes: 

135 """ 

136 Receive at most ``max_bytes`` bytes from the peer. 

137 

138 .. note:: Implementors of this interface should not return an empty :class:`bytes` object, 

139 and users should ignore them. 

140 

141 :param max_bytes: maximum number of bytes to receive 

142 :return: the received bytes 

143 :raises ~anyio.EndOfStream: if this stream has been closed from the other end 

144 """ 

145 

146 

147class ByteSendStream(AsyncResource, TypedAttributeProvider): 

148 """An interface for sending bytes to a single peer.""" 

149 

150 @abstractmethod 

151 async def send(self, item: bytes) -> None: 

152 """ 

153 Send the given bytes to the peer. 

154 

155 :param item: the bytes to send 

156 """ 

157 

158 

159class ByteStream(ByteReceiveStream, ByteSendStream): 

160 """A bidirectional byte stream.""" 

161 

162 @abstractmethod 

163 async def send_eof(self) -> None: 

164 """ 

165 Send an end-of-file indication to the peer. 

166 

167 You should not try to send any further data to this stream after calling this method. 

168 This method is idempotent (does nothing on successive calls). 

169 """ 

170 

171 

172#: Type alias for all unreliable bytes-oriented receive streams. 

173AnyUnreliableByteReceiveStream = Union[ 

174 UnreliableObjectReceiveStream[bytes], ByteReceiveStream 

175] 

176#: Type alias for all unreliable bytes-oriented send streams. 

177AnyUnreliableByteSendStream = Union[UnreliableObjectSendStream[bytes], ByteSendStream] 

178#: Type alias for all unreliable bytes-oriented streams. 

179AnyUnreliableByteStream = Union[UnreliableObjectStream[bytes], ByteStream] 

180#: Type alias for all bytes-oriented receive streams. 

181AnyByteReceiveStream = Union[ObjectReceiveStream[bytes], ByteReceiveStream] 

182#: Type alias for all bytes-oriented send streams. 

183AnyByteSendStream = Union[ObjectSendStream[bytes], ByteSendStream] 

184#: Type alias for all bytes-oriented streams. 

185AnyByteStream = Union[ObjectStream[bytes], ByteStream] 

186 

187 

188class Listener(Generic[T_co], AsyncResource, TypedAttributeProvider): 

189 """An interface for objects that let you accept incoming connections.""" 

190 

191 @abstractmethod 

192 async def serve( 

193 self, 

194 handler: Callable[[T_co], Any], 

195 task_group: TaskGroup | None = None, 

196 ) -> None: 

197 """ 

198 Accept incoming connections as they come in and start tasks to handle them. 

199 

200 :param handler: a callable that will be used to handle each accepted connection 

201 :param task_group: the task group that will be used to start tasks for handling each 

202 accepted connection (if omitted, an ad-hoc task group will be created) 

203 """