Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/streams/memory.py: 38%

135 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1from collections import OrderedDict, deque 

2from dataclasses import dataclass, field 

3from types import TracebackType 

4from typing import Deque, Generic, List, NamedTuple, Optional, Type, TypeVar 

5 

6from .. import ( 

7 BrokenResourceError, 

8 ClosedResourceError, 

9 EndOfStream, 

10 WouldBlock, 

11 get_cancelled_exc_class, 

12) 

13from .._core._compat import DeprecatedAwaitable 

14from ..abc import Event, ObjectReceiveStream, ObjectSendStream 

15from ..lowlevel import checkpoint 

16 

17T_Item = TypeVar("T_Item") 

18 

19 

20class MemoryObjectStreamStatistics(NamedTuple): 

21 current_buffer_used: int #: number of items stored in the buffer 

22 #: maximum number of items that can be stored on this stream (or :data:`math.inf`) 

23 max_buffer_size: float 

24 open_send_streams: int #: number of unclosed clones of the send stream 

25 open_receive_streams: int #: number of unclosed clones of the receive stream 

26 tasks_waiting_send: int #: number of tasks blocked on :meth:`MemoryObjectSendStream.send` 

27 #: number of tasks blocked on :meth:`MemoryObjectReceiveStream.receive` 

28 tasks_waiting_receive: int 

29 

30 

31@dataclass(eq=False) 

32class MemoryObjectStreamState(Generic[T_Item]): 

33 max_buffer_size: float = field() 

34 buffer: Deque[T_Item] = field(init=False, default_factory=deque) 

35 open_send_channels: int = field(init=False, default=0) 

36 open_receive_channels: int = field(init=False, default=0) 

37 waiting_receivers: "OrderedDict[Event, List[T_Item]]" = field( 

38 init=False, default_factory=OrderedDict 

39 ) 

40 waiting_senders: "OrderedDict[Event, T_Item]" = field( 

41 init=False, default_factory=OrderedDict 

42 ) 

43 

44 def statistics(self) -> MemoryObjectStreamStatistics: 

45 return MemoryObjectStreamStatistics( 

46 len(self.buffer), 

47 self.max_buffer_size, 

48 self.open_send_channels, 

49 self.open_receive_channels, 

50 len(self.waiting_senders), 

51 len(self.waiting_receivers), 

52 ) 

53 

54 

55@dataclass(eq=False) 

56class MemoryObjectReceiveStream(Generic[T_Item], ObjectReceiveStream[T_Item]): 

57 _state: MemoryObjectStreamState[T_Item] 

58 _closed: bool = field(init=False, default=False) 

59 

60 def __post_init__(self) -> None: 

61 self._state.open_receive_channels += 1 

62 

63 def receive_nowait(self) -> T_Item: 

64 """ 

65 Receive the next item if it can be done without waiting. 

66 

67 :return: the received item 

68 :raises ~anyio.ClosedResourceError: if this send stream has been closed 

69 :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been 

70 closed from the sending end 

71 :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks 

72 waiting to send 

73 

74 """ 

75 if self._closed: 

76 raise ClosedResourceError 

77 

78 if self._state.waiting_senders: 

79 # Get the item from the next sender 

80 send_event, item = self._state.waiting_senders.popitem(last=False) 

81 self._state.buffer.append(item) 

82 send_event.set() 

83 

84 if self._state.buffer: 

85 return self._state.buffer.popleft() 

86 elif not self._state.open_send_channels: 

87 raise EndOfStream 

88 

89 raise WouldBlock 

90 

91 async def receive(self) -> T_Item: 

92 await checkpoint() 

93 try: 

94 return self.receive_nowait() 

95 except WouldBlock: 

96 # Add ourselves in the queue 

97 receive_event = Event() 

98 container: List[T_Item] = [] 

99 self._state.waiting_receivers[receive_event] = container 

100 

101 try: 

102 await receive_event.wait() 

103 except get_cancelled_exc_class(): 

104 # Ignore the immediate cancellation if we already received an item, so as not to 

105 # lose it 

106 if not container: 

107 raise 

108 finally: 

109 self._state.waiting_receivers.pop(receive_event, None) 

110 

111 if container: 

112 return container[0] 

113 else: 

114 raise EndOfStream 

115 

116 def clone(self) -> "MemoryObjectReceiveStream[T_Item]": 

117 """ 

118 Create a clone of this receive stream. 

119 

120 Each clone can be closed separately. Only when all clones have been closed will the 

121 receiving end of the memory stream be considered closed by the sending ends. 

122 

123 :return: the cloned stream 

124 

125 """ 

126 if self._closed: 

127 raise ClosedResourceError 

128 

129 return MemoryObjectReceiveStream(_state=self._state) 

130 

131 def close(self) -> None: 

132 """ 

133 Close the stream. 

134 

135 This works the exact same way as :meth:`aclose`, but is provided as a special case for the 

136 benefit of synchronous callbacks. 

137 

138 """ 

139 if not self._closed: 

140 self._closed = True 

141 self._state.open_receive_channels -= 1 

142 if self._state.open_receive_channels == 0: 

143 send_events = list(self._state.waiting_senders.keys()) 

144 for event in send_events: 

145 event.set() 

146 

147 async def aclose(self) -> None: 

148 self.close() 

149 

150 def statistics(self) -> MemoryObjectStreamStatistics: 

151 """ 

152 Return statistics about the current state of this stream. 

153 

154 .. versionadded:: 3.0 

155 """ 

156 return self._state.statistics() 

157 

158 def __enter__(self) -> "MemoryObjectReceiveStream[T_Item]": 

159 return self 

160 

161 def __exit__( 

162 self, 

163 exc_type: Optional[Type[BaseException]], 

164 exc_val: Optional[BaseException], 

165 exc_tb: Optional[TracebackType], 

166 ) -> None: 

167 self.close() 

168 

169 

170@dataclass(eq=False) 

171class MemoryObjectSendStream(Generic[T_Item], ObjectSendStream[T_Item]): 

172 _state: MemoryObjectStreamState[T_Item] 

173 _closed: bool = field(init=False, default=False) 

174 

175 def __post_init__(self) -> None: 

176 self._state.open_send_channels += 1 

177 

178 def send_nowait(self, item: T_Item) -> DeprecatedAwaitable: 

179 """ 

180 Send an item immediately if it can be done without waiting. 

181 

182 :param item: the item to send 

183 :raises ~anyio.ClosedResourceError: if this send stream has been closed 

184 :raises ~anyio.BrokenResourceError: if the stream has been closed from the 

185 receiving end 

186 :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting 

187 to receive 

188 

189 """ 

190 if self._closed: 

191 raise ClosedResourceError 

192 if not self._state.open_receive_channels: 

193 raise BrokenResourceError 

194 

195 if self._state.waiting_receivers: 

196 receive_event, container = self._state.waiting_receivers.popitem(last=False) 

197 container.append(item) 

198 receive_event.set() 

199 elif len(self._state.buffer) < self._state.max_buffer_size: 

200 self._state.buffer.append(item) 

201 else: 

202 raise WouldBlock 

203 

204 return DeprecatedAwaitable(self.send_nowait) 

205 

206 async def send(self, item: T_Item) -> None: 

207 await checkpoint() 

208 try: 

209 self.send_nowait(item) 

210 except WouldBlock: 

211 # Wait until there's someone on the receiving end 

212 send_event = Event() 

213 self._state.waiting_senders[send_event] = item 

214 try: 

215 await send_event.wait() 

216 except BaseException: 

217 self._state.waiting_senders.pop(send_event, None) # type: ignore[arg-type] 

218 raise 

219 

220 if self._state.waiting_senders.pop(send_event, None): # type: ignore[arg-type] 

221 raise BrokenResourceError 

222 

223 def clone(self) -> "MemoryObjectSendStream[T_Item]": 

224 """ 

225 Create a clone of this send stream. 

226 

227 Each clone can be closed separately. Only when all clones have been closed will the 

228 sending end of the memory stream be considered closed by the receiving ends. 

229 

230 :return: the cloned stream 

231 

232 """ 

233 if self._closed: 

234 raise ClosedResourceError 

235 

236 return MemoryObjectSendStream(_state=self._state) 

237 

238 def close(self) -> None: 

239 """ 

240 Close the stream. 

241 

242 This works the exact same way as :meth:`aclose`, but is provided as a special case for the 

243 benefit of synchronous callbacks. 

244 

245 """ 

246 if not self._closed: 

247 self._closed = True 

248 self._state.open_send_channels -= 1 

249 if self._state.open_send_channels == 0: 

250 receive_events = list(self._state.waiting_receivers.keys()) 

251 self._state.waiting_receivers.clear() 

252 for event in receive_events: 

253 event.set() 

254 

255 async def aclose(self) -> None: 

256 self.close() 

257 

258 def statistics(self) -> MemoryObjectStreamStatistics: 

259 """ 

260 Return statistics about the current state of this stream. 

261 

262 .. versionadded:: 3.0 

263 """ 

264 return self._state.statistics() 

265 

266 def __enter__(self) -> "MemoryObjectSendStream[T_Item]": 

267 return self 

268 

269 def __exit__( 

270 self, 

271 exc_type: Optional[Type[BaseException]], 

272 exc_val: Optional[BaseException], 

273 exc_tb: Optional[TracebackType], 

274 ) -> None: 

275 self.close()