1from __future__ import annotations
2
3import warnings
4from collections import OrderedDict, deque
5from dataclasses import dataclass, field
6from types import TracebackType
7from typing import Generic, NamedTuple, TypeVar
8
9from .. import (
10 BrokenResourceError,
11 ClosedResourceError,
12 EndOfStream,
13 WouldBlock,
14)
15from .._core._testing import TaskInfo, get_current_task
16from ..abc import Event, ObjectReceiveStream, ObjectSendStream
17from ..lowlevel import checkpoint
18
19T_Item = TypeVar("T_Item")
20T_co = TypeVar("T_co", covariant=True)
21T_contra = TypeVar("T_contra", contravariant=True)
22
23
24class MemoryObjectStreamStatistics(NamedTuple):
25 current_buffer_used: int #: number of items stored in the buffer
26 #: maximum number of items that can be stored on this stream (or :data:`math.inf`)
27 max_buffer_size: float
28 open_send_streams: int #: number of unclosed clones of the send stream
29 open_receive_streams: int #: number of unclosed clones of the receive stream
30 #: number of tasks blocked on :meth:`MemoryObjectSendStream.send`
31 tasks_waiting_send: int
32 #: number of tasks blocked on :meth:`MemoryObjectReceiveStream.receive`
33 tasks_waiting_receive: int
34
35
36@dataclass(eq=False)
37class MemoryObjectItemReceiver(Generic[T_Item]):
38 task_info: TaskInfo = field(init=False, default_factory=get_current_task)
39 item: T_Item = field(init=False)
40
41 def __repr__(self) -> str:
42 # When item is not defined, we get following error with default __repr__:
43 # AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item'
44 item = getattr(self, "item", None)
45 return f"{self.__class__.__name__}(task_info={self.task_info}, item={item!r})"
46
47
48@dataclass(eq=False)
49class MemoryObjectStreamState(Generic[T_Item]):
50 max_buffer_size: float = field()
51 buffer: deque[T_Item] = field(init=False, default_factory=deque)
52 open_send_channels: int = field(init=False, default=0)
53 open_receive_channels: int = field(init=False, default=0)
54 waiting_receivers: OrderedDict[Event, MemoryObjectItemReceiver[T_Item]] = field(
55 init=False, default_factory=OrderedDict
56 )
57 waiting_senders: OrderedDict[Event, T_Item] = field(
58 init=False, default_factory=OrderedDict
59 )
60
61 def statistics(self) -> MemoryObjectStreamStatistics:
62 return MemoryObjectStreamStatistics(
63 len(self.buffer),
64 self.max_buffer_size,
65 self.open_send_channels,
66 self.open_receive_channels,
67 len(self.waiting_senders),
68 len(self.waiting_receivers),
69 )
70
71
72@dataclass(eq=False)
73class MemoryObjectReceiveStream(Generic[T_co], ObjectReceiveStream[T_co]):
74 _state: MemoryObjectStreamState[T_co]
75 _closed: bool = field(init=False, default=False)
76
77 def __post_init__(self) -> None:
78 self._state.open_receive_channels += 1
79
80 def receive_nowait(self) -> T_co:
81 """
82 Receive the next item if it can be done without waiting.
83
84 :return: the received item
85 :raises ~anyio.ClosedResourceError: if this send stream has been closed
86 :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been
87 closed from the sending end
88 :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks
89 waiting to send
90
91 """
92 if self._closed:
93 raise ClosedResourceError
94
95 if self._state.waiting_senders:
96 # Get the item from the next sender
97 send_event, item = self._state.waiting_senders.popitem(last=False)
98 self._state.buffer.append(item)
99 send_event.set()
100
101 if self._state.buffer:
102 return self._state.buffer.popleft()
103 elif not self._state.open_send_channels:
104 raise EndOfStream
105
106 raise WouldBlock
107
108 async def receive(self) -> T_co:
109 await checkpoint()
110 try:
111 return self.receive_nowait()
112 except WouldBlock:
113 # Add ourselves in the queue
114 receive_event = Event()
115 receiver = MemoryObjectItemReceiver[T_co]()
116 self._state.waiting_receivers[receive_event] = receiver
117
118 try:
119 await receive_event.wait()
120 finally:
121 self._state.waiting_receivers.pop(receive_event, None)
122
123 try:
124 return receiver.item
125 except AttributeError:
126 raise EndOfStream from None
127
128 def clone(self) -> MemoryObjectReceiveStream[T_co]:
129 """
130 Create a clone of this receive stream.
131
132 Each clone can be closed separately. Only when all clones have been closed will
133 the receiving end of the memory stream be considered closed by the sending ends.
134
135 :return: the cloned stream
136
137 """
138 if self._closed:
139 raise ClosedResourceError
140
141 return MemoryObjectReceiveStream(_state=self._state)
142
143 def close(self) -> None:
144 """
145 Close the stream.
146
147 This works the exact same way as :meth:`aclose`, but is provided as a special
148 case for the benefit of synchronous callbacks.
149
150 """
151 if not self._closed:
152 self._closed = True
153 self._state.open_receive_channels -= 1
154 if self._state.open_receive_channels == 0:
155 send_events = list(self._state.waiting_senders.keys())
156 for event in send_events:
157 event.set()
158
159 async def aclose(self) -> None:
160 self.close()
161
162 def statistics(self) -> MemoryObjectStreamStatistics:
163 """
164 Return statistics about the current state of this stream.
165
166 .. versionadded:: 3.0
167 """
168 return self._state.statistics()
169
170 def __enter__(self) -> MemoryObjectReceiveStream[T_co]:
171 return self
172
173 def __exit__(
174 self,
175 exc_type: type[BaseException] | None,
176 exc_val: BaseException | None,
177 exc_tb: TracebackType | None,
178 ) -> None:
179 self.close()
180
181 def __del__(self) -> None:
182 if not self._closed:
183 warnings.warn(
184 f"Unclosed <{self.__class__.__name__} at {id(self):x}>",
185 ResourceWarning,
186 source=self,
187 )
188
189
190@dataclass(eq=False)
191class MemoryObjectSendStream(Generic[T_contra], ObjectSendStream[T_contra]):
192 _state: MemoryObjectStreamState[T_contra]
193 _closed: bool = field(init=False, default=False)
194
195 def __post_init__(self) -> None:
196 self._state.open_send_channels += 1
197
198 def send_nowait(self, item: T_contra) -> None:
199 """
200 Send an item immediately if it can be done without waiting.
201
202 :param item: the item to send
203 :raises ~anyio.ClosedResourceError: if this send stream has been closed
204 :raises ~anyio.BrokenResourceError: if the stream has been closed from the
205 receiving end
206 :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting
207 to receive
208
209 """
210 if self._closed:
211 raise ClosedResourceError
212 if not self._state.open_receive_channels:
213 raise BrokenResourceError
214
215 while self._state.waiting_receivers:
216 receive_event, receiver = self._state.waiting_receivers.popitem(last=False)
217 if not receiver.task_info.has_pending_cancellation():
218 receiver.item = item
219 receive_event.set()
220 return
221
222 if len(self._state.buffer) < self._state.max_buffer_size:
223 self._state.buffer.append(item)
224 else:
225 raise WouldBlock
226
227 async def send(self, item: T_contra) -> None:
228 """
229 Send an item to the stream.
230
231 If the buffer is full, this method blocks until there is again room in the
232 buffer or the item can be sent directly to a receiver.
233
234 :param item: the item to send
235 :raises ~anyio.ClosedResourceError: if this send stream has been closed
236 :raises ~anyio.BrokenResourceError: if the stream has been closed from the
237 receiving end
238
239 """
240 await checkpoint()
241 try:
242 self.send_nowait(item)
243 except WouldBlock:
244 # Wait until there's someone on the receiving end
245 send_event = Event()
246 self._state.waiting_senders[send_event] = item
247 try:
248 await send_event.wait()
249 except BaseException:
250 self._state.waiting_senders.pop(send_event, None)
251 raise
252
253 if send_event in self._state.waiting_senders:
254 del self._state.waiting_senders[send_event]
255 raise BrokenResourceError from None
256
257 def clone(self) -> MemoryObjectSendStream[T_contra]:
258 """
259 Create a clone of this send stream.
260
261 Each clone can be closed separately. Only when all clones have been closed will
262 the sending end of the memory stream be considered closed by the receiving ends.
263
264 :return: the cloned stream
265
266 """
267 if self._closed:
268 raise ClosedResourceError
269
270 return MemoryObjectSendStream(_state=self._state)
271
272 def close(self) -> None:
273 """
274 Close the stream.
275
276 This works the exact same way as :meth:`aclose`, but is provided as a special
277 case for the benefit of synchronous callbacks.
278
279 """
280 if not self._closed:
281 self._closed = True
282 self._state.open_send_channels -= 1
283 if self._state.open_send_channels == 0:
284 receive_events = list(self._state.waiting_receivers.keys())
285 self._state.waiting_receivers.clear()
286 for event in receive_events:
287 event.set()
288
289 async def aclose(self) -> None:
290 self.close()
291
292 def statistics(self) -> MemoryObjectStreamStatistics:
293 """
294 Return statistics about the current state of this stream.
295
296 .. versionadded:: 3.0
297 """
298 return self._state.statistics()
299
300 def __enter__(self) -> MemoryObjectSendStream[T_contra]:
301 return self
302
303 def __exit__(
304 self,
305 exc_type: type[BaseException] | None,
306 exc_val: BaseException | None,
307 exc_tb: TracebackType | None,
308 ) -> None:
309 self.close()
310
311 def __del__(self) -> None:
312 if not self._closed:
313 warnings.warn(
314 f"Unclosed <{self.__class__.__name__} at {id(self):x}>",
315 ResourceWarning,
316 source=self,
317 )