Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/streams/memory.py: 39%

138 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1from __future__ import annotations 

2 

3from collections import OrderedDict, deque 

4from dataclasses import dataclass, field 

5from types import TracebackType 

6from typing import Generic, NamedTuple, TypeVar 

7 

8from .. import ( 

9 BrokenResourceError, 

10 ClosedResourceError, 

11 EndOfStream, 

12 WouldBlock, 

13 get_cancelled_exc_class, 

14) 

15from .._core._compat import DeprecatedAwaitable 

16from ..abc import Event, ObjectReceiveStream, ObjectSendStream 

17from ..lowlevel import checkpoint 

18 

19T_Item = TypeVar("T_Item") 

20T_co = TypeVar("T_co", covariant=True) 

21T_contra = TypeVar("T_contra", contravariant=True) 

22 

23 

24class MemoryObjectStreamStatistics(NamedTuple): 

25 current_buffer_used: int #: number of items stored in the buffer 

26 #: maximum number of items that can be stored on this stream (or :data:`math.inf`) 

27 max_buffer_size: float 

28 open_send_streams: int #: number of unclosed clones of the send stream 

29 open_receive_streams: int #: number of unclosed clones of the receive stream 

30 tasks_waiting_send: int #: number of tasks blocked on :meth:`MemoryObjectSendStream.send` 

31 #: number of tasks blocked on :meth:`MemoryObjectReceiveStream.receive` 

32 tasks_waiting_receive: int 

33 

34 

35@dataclass(eq=False) 

36class MemoryObjectStreamState(Generic[T_Item]): 

37 max_buffer_size: float = field() 

38 buffer: deque[T_Item] = field(init=False, default_factory=deque) 

39 open_send_channels: int = field(init=False, default=0) 

40 open_receive_channels: int = field(init=False, default=0) 

41 waiting_receivers: OrderedDict[Event, list[T_Item]] = field( 

42 init=False, default_factory=OrderedDict 

43 ) 

44 waiting_senders: OrderedDict[Event, T_Item] = field( 

45 init=False, default_factory=OrderedDict 

46 ) 

47 

48 def statistics(self) -> MemoryObjectStreamStatistics: 

49 return MemoryObjectStreamStatistics( 

50 len(self.buffer), 

51 self.max_buffer_size, 

52 self.open_send_channels, 

53 self.open_receive_channels, 

54 len(self.waiting_senders), 

55 len(self.waiting_receivers), 

56 ) 

57 

58 

59@dataclass(eq=False) 

60class MemoryObjectReceiveStream(Generic[T_co], ObjectReceiveStream[T_co]): 

61 _state: MemoryObjectStreamState[T_co] 

62 _closed: bool = field(init=False, default=False) 

63 

64 def __post_init__(self) -> None: 

65 self._state.open_receive_channels += 1 

66 

67 def receive_nowait(self) -> T_co: 

68 """ 

69 Receive the next item if it can be done without waiting. 

70 

71 :return: the received item 

72 :raises ~anyio.ClosedResourceError: if this send stream has been closed 

73 :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been 

74 closed from the sending end 

75 :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks 

76 waiting to send 

77 

78 """ 

79 if self._closed: 

80 raise ClosedResourceError 

81 

82 if self._state.waiting_senders: 

83 # Get the item from the next sender 

84 send_event, item = self._state.waiting_senders.popitem(last=False) 

85 self._state.buffer.append(item) 

86 send_event.set() 

87 

88 if self._state.buffer: 

89 return self._state.buffer.popleft() 

90 elif not self._state.open_send_channels: 

91 raise EndOfStream 

92 

93 raise WouldBlock 

94 

95 async def receive(self) -> T_co: 

96 await checkpoint() 

97 try: 

98 return self.receive_nowait() 

99 except WouldBlock: 

100 # Add ourselves in the queue 

101 receive_event = Event() 

102 container: list[T_co] = [] 

103 self._state.waiting_receivers[receive_event] = container 

104 

105 try: 

106 await receive_event.wait() 

107 except get_cancelled_exc_class(): 

108 # Ignore the immediate cancellation if we already received an item, so as not to 

109 # lose it 

110 if not container: 

111 raise 

112 finally: 

113 self._state.waiting_receivers.pop(receive_event, None) 

114 

115 if container: 

116 return container[0] 

117 else: 

118 raise EndOfStream 

119 

120 def clone(self) -> MemoryObjectReceiveStream[T_co]: 

121 """ 

122 Create a clone of this receive stream. 

123 

124 Each clone can be closed separately. Only when all clones have been closed will the 

125 receiving end of the memory stream be considered closed by the sending ends. 

126 

127 :return: the cloned stream 

128 

129 """ 

130 if self._closed: 

131 raise ClosedResourceError 

132 

133 return MemoryObjectReceiveStream(_state=self._state) 

134 

135 def close(self) -> None: 

136 """ 

137 Close the stream. 

138 

139 This works the exact same way as :meth:`aclose`, but is provided as a special case for the 

140 benefit of synchronous callbacks. 

141 

142 """ 

143 if not self._closed: 

144 self._closed = True 

145 self._state.open_receive_channels -= 1 

146 if self._state.open_receive_channels == 0: 

147 send_events = list(self._state.waiting_senders.keys()) 

148 for event in send_events: 

149 event.set() 

150 

151 async def aclose(self) -> None: 

152 self.close() 

153 

154 def statistics(self) -> MemoryObjectStreamStatistics: 

155 """ 

156 Return statistics about the current state of this stream. 

157 

158 .. versionadded:: 3.0 

159 """ 

160 return self._state.statistics() 

161 

162 def __enter__(self) -> MemoryObjectReceiveStream[T_co]: 

163 return self 

164 

165 def __exit__( 

166 self, 

167 exc_type: type[BaseException] | None, 

168 exc_val: BaseException | None, 

169 exc_tb: TracebackType | None, 

170 ) -> None: 

171 self.close() 

172 

173 

174@dataclass(eq=False) 

175class MemoryObjectSendStream(Generic[T_contra], ObjectSendStream[T_contra]): 

176 _state: MemoryObjectStreamState[T_contra] 

177 _closed: bool = field(init=False, default=False) 

178 

179 def __post_init__(self) -> None: 

180 self._state.open_send_channels += 1 

181 

182 def send_nowait(self, item: T_contra) -> DeprecatedAwaitable: 

183 """ 

184 Send an item immediately if it can be done without waiting. 

185 

186 :param item: the item to send 

187 :raises ~anyio.ClosedResourceError: if this send stream has been closed 

188 :raises ~anyio.BrokenResourceError: if the stream has been closed from the 

189 receiving end 

190 :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting 

191 to receive 

192 

193 """ 

194 if self._closed: 

195 raise ClosedResourceError 

196 if not self._state.open_receive_channels: 

197 raise BrokenResourceError 

198 

199 if self._state.waiting_receivers: 

200 receive_event, container = self._state.waiting_receivers.popitem(last=False) 

201 container.append(item) 

202 receive_event.set() 

203 elif len(self._state.buffer) < self._state.max_buffer_size: 

204 self._state.buffer.append(item) 

205 else: 

206 raise WouldBlock 

207 

208 return DeprecatedAwaitable(self.send_nowait) 

209 

210 async def send(self, item: T_contra) -> None: 

211 await checkpoint() 

212 try: 

213 self.send_nowait(item) 

214 except WouldBlock: 

215 # Wait until there's someone on the receiving end 

216 send_event = Event() 

217 self._state.waiting_senders[send_event] = item 

218 try: 

219 await send_event.wait() 

220 except BaseException: 

221 self._state.waiting_senders.pop(send_event, None) # type: ignore[arg-type] 

222 raise 

223 

224 if self._state.waiting_senders.pop(send_event, None): # type: ignore[arg-type] 

225 raise BrokenResourceError 

226 

227 def clone(self) -> MemoryObjectSendStream[T_contra]: 

228 """ 

229 Create a clone of this send stream. 

230 

231 Each clone can be closed separately. Only when all clones have been closed will the 

232 sending end of the memory stream be considered closed by the receiving ends. 

233 

234 :return: the cloned stream 

235 

236 """ 

237 if self._closed: 

238 raise ClosedResourceError 

239 

240 return MemoryObjectSendStream(_state=self._state) 

241 

242 def close(self) -> None: 

243 """ 

244 Close the stream. 

245 

246 This works the exact same way as :meth:`aclose`, but is provided as a special case for the 

247 benefit of synchronous callbacks. 

248 

249 """ 

250 if not self._closed: 

251 self._closed = True 

252 self._state.open_send_channels -= 1 

253 if self._state.open_send_channels == 0: 

254 receive_events = list(self._state.waiting_receivers.keys()) 

255 self._state.waiting_receivers.clear() 

256 for event in receive_events: 

257 event.set() 

258 

259 async def aclose(self) -> None: 

260 self.close() 

261 

262 def statistics(self) -> MemoryObjectStreamStatistics: 

263 """ 

264 Return statistics about the current state of this stream. 

265 

266 .. versionadded:: 3.0 

267 """ 

268 return self._state.statistics() 

269 

270 def __enter__(self) -> MemoryObjectSendStream[T_contra]: 

271 return self 

272 

273 def __exit__( 

274 self, 

275 exc_type: type[BaseException] | None, 

276 exc_val: BaseException | None, 

277 exc_tb: TracebackType | None, 

278 ) -> None: 

279 self.close()