Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/streams/memory.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

153 statements  

1from __future__ import annotations 

2 

3__all__ = ( 

4 "MemoryObjectReceiveStream", 

5 "MemoryObjectSendStream", 

6 "MemoryObjectStreamStatistics", 

7) 

8 

9import warnings 

10from collections import OrderedDict, deque 

11from dataclasses import dataclass, field 

12from types import TracebackType 

13from typing import Generic, NamedTuple, TypeVar 

14 

15from .. import ( 

16 BrokenResourceError, 

17 ClosedResourceError, 

18 EndOfStream, 

19 WouldBlock, 

20) 

21from .._core._testing import TaskInfo, get_current_task 

22from ..abc import Event, ObjectReceiveStream, ObjectSendStream 

23from ..lowlevel import checkpoint 

24 

25T_Item = TypeVar("T_Item") 

26T_co = TypeVar("T_co", covariant=True) 

27T_contra = TypeVar("T_contra", contravariant=True) 

28 

29 

30class MemoryObjectStreamStatistics(NamedTuple): 

31 current_buffer_used: int #: number of items stored in the buffer 

32 #: maximum number of items that can be stored on this stream (or :data:`math.inf`) 

33 max_buffer_size: float 

34 open_send_streams: int #: number of unclosed clones of the send stream 

35 open_receive_streams: int #: number of unclosed clones of the receive stream 

36 #: number of tasks blocked on :meth:`MemoryObjectSendStream.send` 

37 tasks_waiting_send: int 

38 #: number of tasks blocked on :meth:`MemoryObjectReceiveStream.receive` 

39 tasks_waiting_receive: int 

40 

41 

42@dataclass(eq=False) 

43class _MemoryObjectItemReceiver(Generic[T_Item]): 

44 task_info: TaskInfo = field(init=False, default_factory=get_current_task) 

45 item: T_Item = field(init=False) 

46 

47 def __repr__(self) -> str: 

48 # When item is not defined, we get following error with default __repr__: 

49 # AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item' 

50 item = getattr(self, "item", None) 

51 return f"{self.__class__.__name__}(task_info={self.task_info}, item={item!r})" 

52 

53 

54@dataclass(eq=False) 

55class _MemoryObjectStreamState(Generic[T_Item]): 

56 max_buffer_size: float = field() 

57 buffer: deque[T_Item] = field(init=False, default_factory=deque) 

58 open_send_channels: int = field(init=False, default=0) 

59 open_receive_channels: int = field(init=False, default=0) 

60 waiting_receivers: OrderedDict[Event, _MemoryObjectItemReceiver[T_Item]] = field( 

61 init=False, default_factory=OrderedDict 

62 ) 

63 waiting_senders: OrderedDict[Event, T_Item] = field( 

64 init=False, default_factory=OrderedDict 

65 ) 

66 

67 def statistics(self) -> MemoryObjectStreamStatistics: 

68 return MemoryObjectStreamStatistics( 

69 len(self.buffer), 

70 self.max_buffer_size, 

71 self.open_send_channels, 

72 self.open_receive_channels, 

73 len(self.waiting_senders), 

74 len(self.waiting_receivers), 

75 ) 

76 

77 

78@dataclass(eq=False) 

79class MemoryObjectReceiveStream(Generic[T_co], ObjectReceiveStream[T_co]): 

80 _state: _MemoryObjectStreamState[T_co] 

81 _closed: bool = field(init=False, default=False) 

82 

83 def __post_init__(self) -> None: 

84 self._state.open_receive_channels += 1 

85 

86 def receive_nowait(self) -> T_co: 

87 """ 

88 Receive the next item if it can be done without waiting. 

89 

90 :return: the received item 

91 :raises ~anyio.ClosedResourceError: if this send stream has been closed 

92 :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been 

93 closed from the sending end 

94 :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks 

95 waiting to send 

96 

97 """ 

98 if self._closed: 

99 raise ClosedResourceError 

100 

101 if self._state.waiting_senders: 

102 # Get the item from the next sender 

103 send_event, item = self._state.waiting_senders.popitem(last=False) 

104 self._state.buffer.append(item) 

105 send_event.set() 

106 

107 if self._state.buffer: 

108 return self._state.buffer.popleft() 

109 elif not self._state.open_send_channels: 

110 raise EndOfStream 

111 

112 raise WouldBlock 

113 

114 async def receive(self) -> T_co: 

115 await checkpoint() 

116 try: 

117 return self.receive_nowait() 

118 except WouldBlock: 

119 # Add ourselves in the queue 

120 receive_event = Event() 

121 receiver = _MemoryObjectItemReceiver[T_co]() 

122 self._state.waiting_receivers[receive_event] = receiver 

123 

124 try: 

125 await receive_event.wait() 

126 finally: 

127 self._state.waiting_receivers.pop(receive_event, None) 

128 

129 try: 

130 return receiver.item 

131 except AttributeError: 

132 raise EndOfStream from None 

133 

134 def clone(self) -> MemoryObjectReceiveStream[T_co]: 

135 """ 

136 Create a clone of this receive stream. 

137 

138 Each clone can be closed separately. Only when all clones have been closed will 

139 the receiving end of the memory stream be considered closed by the sending ends. 

140 

141 :return: the cloned stream 

142 

143 """ 

144 if self._closed: 

145 raise ClosedResourceError 

146 

147 return MemoryObjectReceiveStream(_state=self._state) 

148 

149 def close(self) -> None: 

150 """ 

151 Close the stream. 

152 

153 This works the exact same way as :meth:`aclose`, but is provided as a special 

154 case for the benefit of synchronous callbacks. 

155 

156 """ 

157 if not self._closed: 

158 self._closed = True 

159 self._state.open_receive_channels -= 1 

160 if self._state.open_receive_channels == 0: 

161 send_events = list(self._state.waiting_senders.keys()) 

162 for event in send_events: 

163 event.set() 

164 

165 async def aclose(self) -> None: 

166 self.close() 

167 

168 def statistics(self) -> MemoryObjectStreamStatistics: 

169 """ 

170 Return statistics about the current state of this stream. 

171 

172 .. versionadded:: 3.0 

173 """ 

174 return self._state.statistics() 

175 

176 def __enter__(self) -> MemoryObjectReceiveStream[T_co]: 

177 return self 

178 

179 def __exit__( 

180 self, 

181 exc_type: type[BaseException] | None, 

182 exc_val: BaseException | None, 

183 exc_tb: TracebackType | None, 

184 ) -> None: 

185 self.close() 

186 

187 def __del__(self) -> None: 

188 if not self._closed: 

189 warnings.warn( 

190 f"Unclosed <{self.__class__.__name__} at {id(self):x}>", 

191 ResourceWarning, 

192 stacklevel=1, 

193 source=self, 

194 ) 

195 

196 

197@dataclass(eq=False) 

198class MemoryObjectSendStream(Generic[T_contra], ObjectSendStream[T_contra]): 

199 _state: _MemoryObjectStreamState[T_contra] 

200 _closed: bool = field(init=False, default=False) 

201 

202 def __post_init__(self) -> None: 

203 self._state.open_send_channels += 1 

204 

205 def send_nowait(self, item: T_contra) -> None: 

206 """ 

207 Send an item immediately if it can be done without waiting. 

208 

209 :param item: the item to send 

210 :raises ~anyio.ClosedResourceError: if this send stream has been closed 

211 :raises ~anyio.BrokenResourceError: if the stream has been closed from the 

212 receiving end 

213 :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting 

214 to receive 

215 

216 """ 

217 if self._closed: 

218 raise ClosedResourceError 

219 if not self._state.open_receive_channels: 

220 raise BrokenResourceError 

221 

222 while self._state.waiting_receivers: 

223 receive_event, receiver = self._state.waiting_receivers.popitem(last=False) 

224 if not receiver.task_info.has_pending_cancellation(): 

225 receiver.item = item 

226 receive_event.set() 

227 return 

228 

229 if len(self._state.buffer) < self._state.max_buffer_size: 

230 self._state.buffer.append(item) 

231 else: 

232 raise WouldBlock 

233 

234 async def send(self, item: T_contra) -> None: 

235 """ 

236 Send an item to the stream. 

237 

238 If the buffer is full, this method blocks until there is again room in the 

239 buffer or the item can be sent directly to a receiver. 

240 

241 :param item: the item to send 

242 :raises ~anyio.ClosedResourceError: if this send stream has been closed 

243 :raises ~anyio.BrokenResourceError: if the stream has been closed from the 

244 receiving end 

245 

246 """ 

247 await checkpoint() 

248 try: 

249 self.send_nowait(item) 

250 except WouldBlock: 

251 # Wait until there's someone on the receiving end 

252 send_event = Event() 

253 self._state.waiting_senders[send_event] = item 

254 try: 

255 await send_event.wait() 

256 except BaseException: 

257 self._state.waiting_senders.pop(send_event, None) 

258 raise 

259 

260 if send_event in self._state.waiting_senders: 

261 del self._state.waiting_senders[send_event] 

262 raise BrokenResourceError from None 

263 

264 def clone(self) -> MemoryObjectSendStream[T_contra]: 

265 """ 

266 Create a clone of this send stream. 

267 

268 Each clone can be closed separately. Only when all clones have been closed will 

269 the sending end of the memory stream be considered closed by the receiving ends. 

270 

271 :return: the cloned stream 

272 

273 """ 

274 if self._closed: 

275 raise ClosedResourceError 

276 

277 return MemoryObjectSendStream(_state=self._state) 

278 

279 def close(self) -> None: 

280 """ 

281 Close the stream. 

282 

283 This works the exact same way as :meth:`aclose`, but is provided as a special 

284 case for the benefit of synchronous callbacks. 

285 

286 """ 

287 if not self._closed: 

288 self._closed = True 

289 self._state.open_send_channels -= 1 

290 if self._state.open_send_channels == 0: 

291 receive_events = list(self._state.waiting_receivers.keys()) 

292 self._state.waiting_receivers.clear() 

293 for event in receive_events: 

294 event.set() 

295 

296 async def aclose(self) -> None: 

297 self.close() 

298 

299 def statistics(self) -> MemoryObjectStreamStatistics: 

300 """ 

301 Return statistics about the current state of this stream. 

302 

303 .. versionadded:: 3.0 

304 """ 

305 return self._state.statistics() 

306 

307 def __enter__(self) -> MemoryObjectSendStream[T_contra]: 

308 return self 

309 

310 def __exit__( 

311 self, 

312 exc_type: type[BaseException] | None, 

313 exc_val: BaseException | None, 

314 exc_tb: TracebackType | None, 

315 ) -> None: 

316 self.close() 

317 

318 def __del__(self) -> None: 

319 if not self._closed: 

320 warnings.warn( 

321 f"Unclosed <{self.__class__.__name__} at {id(self):x}>", 

322 ResourceWarning, 

323 stacklevel=1, 

324 source=self, 

325 )