Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/streams/memory.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

152 statements  

1from __future__ import annotations 

2 

3import warnings 

4from collections import OrderedDict, deque 

5from dataclasses import dataclass, field 

6from types import TracebackType 

7from typing import Generic, NamedTuple, TypeVar 

8 

9from .. import ( 

10 BrokenResourceError, 

11 ClosedResourceError, 

12 EndOfStream, 

13 WouldBlock, 

14) 

15from .._core._testing import TaskInfo, get_current_task 

16from ..abc import Event, ObjectReceiveStream, ObjectSendStream 

17from ..lowlevel import checkpoint 

18 

19T_Item = TypeVar("T_Item") 

20T_co = TypeVar("T_co", covariant=True) 

21T_contra = TypeVar("T_contra", contravariant=True) 

22 

23 

24class MemoryObjectStreamStatistics(NamedTuple): 

25 current_buffer_used: int #: number of items stored in the buffer 

26 #: maximum number of items that can be stored on this stream (or :data:`math.inf`) 

27 max_buffer_size: float 

28 open_send_streams: int #: number of unclosed clones of the send stream 

29 open_receive_streams: int #: number of unclosed clones of the receive stream 

30 #: number of tasks blocked on :meth:`MemoryObjectSendStream.send` 

31 tasks_waiting_send: int 

32 #: number of tasks blocked on :meth:`MemoryObjectReceiveStream.receive` 

33 tasks_waiting_receive: int 

34 

35 

36@dataclass(eq=False) 

37class MemoryObjectItemReceiver(Generic[T_Item]): 

38 task_info: TaskInfo = field(init=False, default_factory=get_current_task) 

39 item: T_Item = field(init=False) 

40 

41 def __repr__(self) -> str: 

42 # When item is not defined, we get following error with default __repr__: 

43 # AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item' 

44 item = getattr(self, "item", None) 

45 return f"{self.__class__.__name__}(task_info={self.task_info}, item={item!r})" 

46 

47 

48@dataclass(eq=False) 

49class MemoryObjectStreamState(Generic[T_Item]): 

50 max_buffer_size: float = field() 

51 buffer: deque[T_Item] = field(init=False, default_factory=deque) 

52 open_send_channels: int = field(init=False, default=0) 

53 open_receive_channels: int = field(init=False, default=0) 

54 waiting_receivers: OrderedDict[Event, MemoryObjectItemReceiver[T_Item]] = field( 

55 init=False, default_factory=OrderedDict 

56 ) 

57 waiting_senders: OrderedDict[Event, T_Item] = field( 

58 init=False, default_factory=OrderedDict 

59 ) 

60 

61 def statistics(self) -> MemoryObjectStreamStatistics: 

62 return MemoryObjectStreamStatistics( 

63 len(self.buffer), 

64 self.max_buffer_size, 

65 self.open_send_channels, 

66 self.open_receive_channels, 

67 len(self.waiting_senders), 

68 len(self.waiting_receivers), 

69 ) 

70 

71 

72@dataclass(eq=False) 

73class MemoryObjectReceiveStream(Generic[T_co], ObjectReceiveStream[T_co]): 

74 _state: MemoryObjectStreamState[T_co] 

75 _closed: bool = field(init=False, default=False) 

76 

77 def __post_init__(self) -> None: 

78 self._state.open_receive_channels += 1 

79 

80 def receive_nowait(self) -> T_co: 

81 """ 

82 Receive the next item if it can be done without waiting. 

83 

84 :return: the received item 

85 :raises ~anyio.ClosedResourceError: if this send stream has been closed 

86 :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been 

87 closed from the sending end 

88 :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks 

89 waiting to send 

90 

91 """ 

92 if self._closed: 

93 raise ClosedResourceError 

94 

95 if self._state.waiting_senders: 

96 # Get the item from the next sender 

97 send_event, item = self._state.waiting_senders.popitem(last=False) 

98 self._state.buffer.append(item) 

99 send_event.set() 

100 

101 if self._state.buffer: 

102 return self._state.buffer.popleft() 

103 elif not self._state.open_send_channels: 

104 raise EndOfStream 

105 

106 raise WouldBlock 

107 

108 async def receive(self) -> T_co: 

109 await checkpoint() 

110 try: 

111 return self.receive_nowait() 

112 except WouldBlock: 

113 # Add ourselves in the queue 

114 receive_event = Event() 

115 receiver = MemoryObjectItemReceiver[T_co]() 

116 self._state.waiting_receivers[receive_event] = receiver 

117 

118 try: 

119 await receive_event.wait() 

120 finally: 

121 self._state.waiting_receivers.pop(receive_event, None) 

122 

123 try: 

124 return receiver.item 

125 except AttributeError: 

126 raise EndOfStream from None 

127 

128 def clone(self) -> MemoryObjectReceiveStream[T_co]: 

129 """ 

130 Create a clone of this receive stream. 

131 

132 Each clone can be closed separately. Only when all clones have been closed will 

133 the receiving end of the memory stream be considered closed by the sending ends. 

134 

135 :return: the cloned stream 

136 

137 """ 

138 if self._closed: 

139 raise ClosedResourceError 

140 

141 return MemoryObjectReceiveStream(_state=self._state) 

142 

143 def close(self) -> None: 

144 """ 

145 Close the stream. 

146 

147 This works the exact same way as :meth:`aclose`, but is provided as a special 

148 case for the benefit of synchronous callbacks. 

149 

150 """ 

151 if not self._closed: 

152 self._closed = True 

153 self._state.open_receive_channels -= 1 

154 if self._state.open_receive_channels == 0: 

155 send_events = list(self._state.waiting_senders.keys()) 

156 for event in send_events: 

157 event.set() 

158 

159 async def aclose(self) -> None: 

160 self.close() 

161 

162 def statistics(self) -> MemoryObjectStreamStatistics: 

163 """ 

164 Return statistics about the current state of this stream. 

165 

166 .. versionadded:: 3.0 

167 """ 

168 return self._state.statistics() 

169 

170 def __enter__(self) -> MemoryObjectReceiveStream[T_co]: 

171 return self 

172 

173 def __exit__( 

174 self, 

175 exc_type: type[BaseException] | None, 

176 exc_val: BaseException | None, 

177 exc_tb: TracebackType | None, 

178 ) -> None: 

179 self.close() 

180 

181 def __del__(self) -> None: 

182 if not self._closed: 

183 warnings.warn( 

184 f"Unclosed <{self.__class__.__name__} at {id(self):x}>", 

185 ResourceWarning, 

186 stacklevel=1, 

187 source=self, 

188 ) 

189 

190 

191@dataclass(eq=False) 

192class MemoryObjectSendStream(Generic[T_contra], ObjectSendStream[T_contra]): 

193 _state: MemoryObjectStreamState[T_contra] 

194 _closed: bool = field(init=False, default=False) 

195 

196 def __post_init__(self) -> None: 

197 self._state.open_send_channels += 1 

198 

199 def send_nowait(self, item: T_contra) -> None: 

200 """ 

201 Send an item immediately if it can be done without waiting. 

202 

203 :param item: the item to send 

204 :raises ~anyio.ClosedResourceError: if this send stream has been closed 

205 :raises ~anyio.BrokenResourceError: if the stream has been closed from the 

206 receiving end 

207 :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting 

208 to receive 

209 

210 """ 

211 if self._closed: 

212 raise ClosedResourceError 

213 if not self._state.open_receive_channels: 

214 raise BrokenResourceError 

215 

216 while self._state.waiting_receivers: 

217 receive_event, receiver = self._state.waiting_receivers.popitem(last=False) 

218 if not receiver.task_info.has_pending_cancellation(): 

219 receiver.item = item 

220 receive_event.set() 

221 return 

222 

223 if len(self._state.buffer) < self._state.max_buffer_size: 

224 self._state.buffer.append(item) 

225 else: 

226 raise WouldBlock 

227 

228 async def send(self, item: T_contra) -> None: 

229 """ 

230 Send an item to the stream. 

231 

232 If the buffer is full, this method blocks until there is again room in the 

233 buffer or the item can be sent directly to a receiver. 

234 

235 :param item: the item to send 

236 :raises ~anyio.ClosedResourceError: if this send stream has been closed 

237 :raises ~anyio.BrokenResourceError: if the stream has been closed from the 

238 receiving end 

239 

240 """ 

241 await checkpoint() 

242 try: 

243 self.send_nowait(item) 

244 except WouldBlock: 

245 # Wait until there's someone on the receiving end 

246 send_event = Event() 

247 self._state.waiting_senders[send_event] = item 

248 try: 

249 await send_event.wait() 

250 except BaseException: 

251 self._state.waiting_senders.pop(send_event, None) 

252 raise 

253 

254 if send_event in self._state.waiting_senders: 

255 del self._state.waiting_senders[send_event] 

256 raise BrokenResourceError from None 

257 

258 def clone(self) -> MemoryObjectSendStream[T_contra]: 

259 """ 

260 Create a clone of this send stream. 

261 

262 Each clone can be closed separately. Only when all clones have been closed will 

263 the sending end of the memory stream be considered closed by the receiving ends. 

264 

265 :return: the cloned stream 

266 

267 """ 

268 if self._closed: 

269 raise ClosedResourceError 

270 

271 return MemoryObjectSendStream(_state=self._state) 

272 

273 def close(self) -> None: 

274 """ 

275 Close the stream. 

276 

277 This works the exact same way as :meth:`aclose`, but is provided as a special 

278 case for the benefit of synchronous callbacks. 

279 

280 """ 

281 if not self._closed: 

282 self._closed = True 

283 self._state.open_send_channels -= 1 

284 if self._state.open_send_channels == 0: 

285 receive_events = list(self._state.waiting_receivers.keys()) 

286 self._state.waiting_receivers.clear() 

287 for event in receive_events: 

288 event.set() 

289 

290 async def aclose(self) -> None: 

291 self.close() 

292 

293 def statistics(self) -> MemoryObjectStreamStatistics: 

294 """ 

295 Return statistics about the current state of this stream. 

296 

297 .. versionadded:: 3.0 

298 """ 

299 return self._state.statistics() 

300 

301 def __enter__(self) -> MemoryObjectSendStream[T_contra]: 

302 return self 

303 

304 def __exit__( 

305 self, 

306 exc_type: type[BaseException] | None, 

307 exc_val: BaseException | None, 

308 exc_tb: TracebackType | None, 

309 ) -> None: 

310 self.close() 

311 

312 def __del__(self) -> None: 

313 if not self._closed: 

314 warnings.warn( 

315 f"Unclosed <{self.__class__.__name__} at {id(self):x}>", 

316 ResourceWarning, 

317 stacklevel=1, 

318 source=self, 

319 )