Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/streams/memory.py: 41%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3__all__ = (
4 "MemoryObjectReceiveStream",
5 "MemoryObjectSendStream",
6 "MemoryObjectStreamStatistics",
7)
9import warnings
10from collections import OrderedDict, deque
11from dataclasses import dataclass, field
12from types import TracebackType
13from typing import Generic, NamedTuple, TypeVar
15from .. import (
16 BrokenResourceError,
17 ClosedResourceError,
18 EndOfStream,
19 WouldBlock,
20)
21from .._core._testing import TaskInfo, get_current_task
22from ..abc import Event, ObjectReceiveStream, ObjectSendStream
23from ..lowlevel import checkpoint
25T_Item = TypeVar("T_Item")
26T_co = TypeVar("T_co", covariant=True)
27T_contra = TypeVar("T_contra", contravariant=True)
30class MemoryObjectStreamStatistics(NamedTuple):
31 current_buffer_used: int #: number of items stored in the buffer
32 #: maximum number of items that can be stored on this stream (or :data:`math.inf`)
33 max_buffer_size: float
34 open_send_streams: int #: number of unclosed clones of the send stream
35 open_receive_streams: int #: number of unclosed clones of the receive stream
36 #: number of tasks blocked on :meth:`MemoryObjectSendStream.send`
37 tasks_waiting_send: int
38 #: number of tasks blocked on :meth:`MemoryObjectReceiveStream.receive`
39 tasks_waiting_receive: int
42@dataclass(eq=False)
43class _MemoryObjectItemReceiver(Generic[T_Item]):
44 task_info: TaskInfo = field(init=False, default_factory=get_current_task)
45 item: T_Item = field(init=False)
47 def __repr__(self) -> str:
48 # When item is not defined, we get following error with default __repr__:
49 # AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item'
50 item = getattr(self, "item", None)
51 return f"{self.__class__.__name__}(task_info={self.task_info}, item={item!r})"
54@dataclass(eq=False)
55class _MemoryObjectStreamState(Generic[T_Item]):
56 max_buffer_size: float = field()
57 buffer: deque[T_Item] = field(init=False, default_factory=deque)
58 open_send_channels: int = field(init=False, default=0)
59 open_receive_channels: int = field(init=False, default=0)
60 waiting_receivers: OrderedDict[Event, _MemoryObjectItemReceiver[T_Item]] = field(
61 init=False, default_factory=OrderedDict
62 )
63 waiting_senders: OrderedDict[Event, T_Item] = field(
64 init=False, default_factory=OrderedDict
65 )
67 def statistics(self) -> MemoryObjectStreamStatistics:
68 return MemoryObjectStreamStatistics(
69 len(self.buffer),
70 self.max_buffer_size,
71 self.open_send_channels,
72 self.open_receive_channels,
73 len(self.waiting_senders),
74 len(self.waiting_receivers),
75 )
78@dataclass(eq=False)
79class MemoryObjectReceiveStream(Generic[T_co], ObjectReceiveStream[T_co]):
80 _state: _MemoryObjectStreamState[T_co]
81 _closed: bool = field(init=False, default=False)
83 def __post_init__(self) -> None:
84 self._state.open_receive_channels += 1
86 def receive_nowait(self) -> T_co:
87 """
88 Receive the next item if it can be done without waiting.
90 :return: the received item
91 :raises ~anyio.ClosedResourceError: if this send stream has been closed
92 :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been
93 closed from the sending end
94 :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks
95 waiting to send
97 """
98 if self._closed:
99 raise ClosedResourceError
101 if self._state.waiting_senders:
102 # Get the item from the next sender
103 send_event, item = self._state.waiting_senders.popitem(last=False)
104 self._state.buffer.append(item)
105 send_event.set()
107 if self._state.buffer:
108 return self._state.buffer.popleft()
109 elif not self._state.open_send_channels:
110 raise EndOfStream
112 raise WouldBlock
114 async def receive(self) -> T_co:
115 await checkpoint()
116 try:
117 return self.receive_nowait()
118 except WouldBlock:
119 # Add ourselves in the queue
120 receive_event = Event()
121 receiver = _MemoryObjectItemReceiver[T_co]()
122 self._state.waiting_receivers[receive_event] = receiver
124 try:
125 await receive_event.wait()
126 finally:
127 self._state.waiting_receivers.pop(receive_event, None)
129 try:
130 return receiver.item
131 except AttributeError:
132 raise EndOfStream from None
134 def clone(self) -> MemoryObjectReceiveStream[T_co]:
135 """
136 Create a clone of this receive stream.
138 Each clone can be closed separately. Only when all clones have been closed will
139 the receiving end of the memory stream be considered closed by the sending ends.
141 :return: the cloned stream
143 """
144 if self._closed:
145 raise ClosedResourceError
147 return MemoryObjectReceiveStream(_state=self._state)
149 def close(self) -> None:
150 """
151 Close the stream.
153 This works the exact same way as :meth:`aclose`, but is provided as a special
154 case for the benefit of synchronous callbacks.
156 """
157 if not self._closed:
158 self._closed = True
159 self._state.open_receive_channels -= 1
160 if self._state.open_receive_channels == 0:
161 send_events = list(self._state.waiting_senders.keys())
162 for event in send_events:
163 event.set()
165 async def aclose(self) -> None:
166 self.close()
168 def statistics(self) -> MemoryObjectStreamStatistics:
169 """
170 Return statistics about the current state of this stream.
172 .. versionadded:: 3.0
173 """
174 return self._state.statistics()
176 def __enter__(self) -> MemoryObjectReceiveStream[T_co]:
177 return self
179 def __exit__(
180 self,
181 exc_type: type[BaseException] | None,
182 exc_val: BaseException | None,
183 exc_tb: TracebackType | None,
184 ) -> None:
185 self.close()
187 def __del__(self) -> None:
188 if not self._closed:
189 warnings.warn(
190 f"Unclosed <{self.__class__.__name__} at {id(self):x}>",
191 ResourceWarning,
192 stacklevel=1,
193 source=self,
194 )
197@dataclass(eq=False)
198class MemoryObjectSendStream(Generic[T_contra], ObjectSendStream[T_contra]):
199 _state: _MemoryObjectStreamState[T_contra]
200 _closed: bool = field(init=False, default=False)
202 def __post_init__(self) -> None:
203 self._state.open_send_channels += 1
205 def send_nowait(self, item: T_contra) -> None:
206 """
207 Send an item immediately if it can be done without waiting.
209 :param item: the item to send
210 :raises ~anyio.ClosedResourceError: if this send stream has been closed
211 :raises ~anyio.BrokenResourceError: if the stream has been closed from the
212 receiving end
213 :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting
214 to receive
216 """
217 if self._closed:
218 raise ClosedResourceError
219 if not self._state.open_receive_channels:
220 raise BrokenResourceError
222 while self._state.waiting_receivers:
223 receive_event, receiver = self._state.waiting_receivers.popitem(last=False)
224 if not receiver.task_info.has_pending_cancellation():
225 receiver.item = item
226 receive_event.set()
227 return
229 if len(self._state.buffer) < self._state.max_buffer_size:
230 self._state.buffer.append(item)
231 else:
232 raise WouldBlock
234 async def send(self, item: T_contra) -> None:
235 """
236 Send an item to the stream.
238 If the buffer is full, this method blocks until there is again room in the
239 buffer or the item can be sent directly to a receiver.
241 :param item: the item to send
242 :raises ~anyio.ClosedResourceError: if this send stream has been closed
243 :raises ~anyio.BrokenResourceError: if the stream has been closed from the
244 receiving end
246 """
247 await checkpoint()
248 try:
249 self.send_nowait(item)
250 except WouldBlock:
251 # Wait until there's someone on the receiving end
252 send_event = Event()
253 self._state.waiting_senders[send_event] = item
254 try:
255 await send_event.wait()
256 except BaseException:
257 self._state.waiting_senders.pop(send_event, None)
258 raise
260 if send_event in self._state.waiting_senders:
261 del self._state.waiting_senders[send_event]
262 raise BrokenResourceError from None
264 def clone(self) -> MemoryObjectSendStream[T_contra]:
265 """
266 Create a clone of this send stream.
268 Each clone can be closed separately. Only when all clones have been closed will
269 the sending end of the memory stream be considered closed by the receiving ends.
271 :return: the cloned stream
273 """
274 if self._closed:
275 raise ClosedResourceError
277 return MemoryObjectSendStream(_state=self._state)
279 def close(self) -> None:
280 """
281 Close the stream.
283 This works the exact same way as :meth:`aclose`, but is provided as a special
284 case for the benefit of synchronous callbacks.
286 """
287 if not self._closed:
288 self._closed = True
289 self._state.open_send_channels -= 1
290 if self._state.open_send_channels == 0:
291 receive_events = list(self._state.waiting_receivers.keys())
292 self._state.waiting_receivers.clear()
293 for event in receive_events:
294 event.set()
296 async def aclose(self) -> None:
297 self.close()
299 def statistics(self) -> MemoryObjectStreamStatistics:
300 """
301 Return statistics about the current state of this stream.
303 .. versionadded:: 3.0
304 """
305 return self._state.statistics()
307 def __enter__(self) -> MemoryObjectSendStream[T_contra]:
308 return self
310 def __exit__(
311 self,
312 exc_type: type[BaseException] | None,
313 exc_val: BaseException | None,
314 exc_tb: TracebackType | None,
315 ) -> None:
316 self.close()
318 def __del__(self) -> None:
319 if not self._closed:
320 warnings.warn(
321 f"Unclosed <{self.__class__.__name__} at {id(self):x}>",
322 ResourceWarning,
323 stacklevel=1,
324 source=self,
325 )