Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/streams/memory.py: 38%
135 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1from collections import OrderedDict, deque
2from dataclasses import dataclass, field
3from types import TracebackType
4from typing import Deque, Generic, List, NamedTuple, Optional, Type, TypeVar
6from .. import (
7 BrokenResourceError,
8 ClosedResourceError,
9 EndOfStream,
10 WouldBlock,
11 get_cancelled_exc_class,
12)
13from .._core._compat import DeprecatedAwaitable
14from ..abc import Event, ObjectReceiveStream, ObjectSendStream
15from ..lowlevel import checkpoint
17T_Item = TypeVar("T_Item")
20class MemoryObjectStreamStatistics(NamedTuple):
21 current_buffer_used: int #: number of items stored in the buffer
22 #: maximum number of items that can be stored on this stream (or :data:`math.inf`)
23 max_buffer_size: float
24 open_send_streams: int #: number of unclosed clones of the send stream
25 open_receive_streams: int #: number of unclosed clones of the receive stream
26 tasks_waiting_send: int #: number of tasks blocked on :meth:`MemoryObjectSendStream.send`
27 #: number of tasks blocked on :meth:`MemoryObjectReceiveStream.receive`
28 tasks_waiting_receive: int
31@dataclass(eq=False)
32class MemoryObjectStreamState(Generic[T_Item]):
33 max_buffer_size: float = field()
34 buffer: Deque[T_Item] = field(init=False, default_factory=deque)
35 open_send_channels: int = field(init=False, default=0)
36 open_receive_channels: int = field(init=False, default=0)
37 waiting_receivers: "OrderedDict[Event, List[T_Item]]" = field(
38 init=False, default_factory=OrderedDict
39 )
40 waiting_senders: "OrderedDict[Event, T_Item]" = field(
41 init=False, default_factory=OrderedDict
42 )
44 def statistics(self) -> MemoryObjectStreamStatistics:
45 return MemoryObjectStreamStatistics(
46 len(self.buffer),
47 self.max_buffer_size,
48 self.open_send_channels,
49 self.open_receive_channels,
50 len(self.waiting_senders),
51 len(self.waiting_receivers),
52 )
55@dataclass(eq=False)
56class MemoryObjectReceiveStream(Generic[T_Item], ObjectReceiveStream[T_Item]):
57 _state: MemoryObjectStreamState[T_Item]
58 _closed: bool = field(init=False, default=False)
60 def __post_init__(self) -> None:
61 self._state.open_receive_channels += 1
63 def receive_nowait(self) -> T_Item:
64 """
65 Receive the next item if it can be done without waiting.
67 :return: the received item
68 :raises ~anyio.ClosedResourceError: if this send stream has been closed
69 :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been
70 closed from the sending end
71 :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks
72 waiting to send
74 """
75 if self._closed:
76 raise ClosedResourceError
78 if self._state.waiting_senders:
79 # Get the item from the next sender
80 send_event, item = self._state.waiting_senders.popitem(last=False)
81 self._state.buffer.append(item)
82 send_event.set()
84 if self._state.buffer:
85 return self._state.buffer.popleft()
86 elif not self._state.open_send_channels:
87 raise EndOfStream
89 raise WouldBlock
91 async def receive(self) -> T_Item:
92 await checkpoint()
93 try:
94 return self.receive_nowait()
95 except WouldBlock:
96 # Add ourselves in the queue
97 receive_event = Event()
98 container: List[T_Item] = []
99 self._state.waiting_receivers[receive_event] = container
101 try:
102 await receive_event.wait()
103 except get_cancelled_exc_class():
104 # Ignore the immediate cancellation if we already received an item, so as not to
105 # lose it
106 if not container:
107 raise
108 finally:
109 self._state.waiting_receivers.pop(receive_event, None)
111 if container:
112 return container[0]
113 else:
114 raise EndOfStream
116 def clone(self) -> "MemoryObjectReceiveStream[T_Item]":
117 """
118 Create a clone of this receive stream.
120 Each clone can be closed separately. Only when all clones have been closed will the
121 receiving end of the memory stream be considered closed by the sending ends.
123 :return: the cloned stream
125 """
126 if self._closed:
127 raise ClosedResourceError
129 return MemoryObjectReceiveStream(_state=self._state)
131 def close(self) -> None:
132 """
133 Close the stream.
135 This works the exact same way as :meth:`aclose`, but is provided as a special case for the
136 benefit of synchronous callbacks.
138 """
139 if not self._closed:
140 self._closed = True
141 self._state.open_receive_channels -= 1
142 if self._state.open_receive_channels == 0:
143 send_events = list(self._state.waiting_senders.keys())
144 for event in send_events:
145 event.set()
147 async def aclose(self) -> None:
148 self.close()
150 def statistics(self) -> MemoryObjectStreamStatistics:
151 """
152 Return statistics about the current state of this stream.
154 .. versionadded:: 3.0
155 """
156 return self._state.statistics()
158 def __enter__(self) -> "MemoryObjectReceiveStream[T_Item]":
159 return self
161 def __exit__(
162 self,
163 exc_type: Optional[Type[BaseException]],
164 exc_val: Optional[BaseException],
165 exc_tb: Optional[TracebackType],
166 ) -> None:
167 self.close()
170@dataclass(eq=False)
171class MemoryObjectSendStream(Generic[T_Item], ObjectSendStream[T_Item]):
172 _state: MemoryObjectStreamState[T_Item]
173 _closed: bool = field(init=False, default=False)
175 def __post_init__(self) -> None:
176 self._state.open_send_channels += 1
178 def send_nowait(self, item: T_Item) -> DeprecatedAwaitable:
179 """
180 Send an item immediately if it can be done without waiting.
182 :param item: the item to send
183 :raises ~anyio.ClosedResourceError: if this send stream has been closed
184 :raises ~anyio.BrokenResourceError: if the stream has been closed from the
185 receiving end
186 :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting
187 to receive
189 """
190 if self._closed:
191 raise ClosedResourceError
192 if not self._state.open_receive_channels:
193 raise BrokenResourceError
195 if self._state.waiting_receivers:
196 receive_event, container = self._state.waiting_receivers.popitem(last=False)
197 container.append(item)
198 receive_event.set()
199 elif len(self._state.buffer) < self._state.max_buffer_size:
200 self._state.buffer.append(item)
201 else:
202 raise WouldBlock
204 return DeprecatedAwaitable(self.send_nowait)
206 async def send(self, item: T_Item) -> None:
207 await checkpoint()
208 try:
209 self.send_nowait(item)
210 except WouldBlock:
211 # Wait until there's someone on the receiving end
212 send_event = Event()
213 self._state.waiting_senders[send_event] = item
214 try:
215 await send_event.wait()
216 except BaseException:
217 self._state.waiting_senders.pop(send_event, None) # type: ignore[arg-type]
218 raise
220 if self._state.waiting_senders.pop(send_event, None): # type: ignore[arg-type]
221 raise BrokenResourceError
223 def clone(self) -> "MemoryObjectSendStream[T_Item]":
224 """
225 Create a clone of this send stream.
227 Each clone can be closed separately. Only when all clones have been closed will the
228 sending end of the memory stream be considered closed by the receiving ends.
230 :return: the cloned stream
232 """
233 if self._closed:
234 raise ClosedResourceError
236 return MemoryObjectSendStream(_state=self._state)
238 def close(self) -> None:
239 """
240 Close the stream.
242 This works the exact same way as :meth:`aclose`, but is provided as a special case for the
243 benefit of synchronous callbacks.
245 """
246 if not self._closed:
247 self._closed = True
248 self._state.open_send_channels -= 1
249 if self._state.open_send_channels == 0:
250 receive_events = list(self._state.waiting_receivers.keys())
251 self._state.waiting_receivers.clear()
252 for event in receive_events:
253 event.set()
255 async def aclose(self) -> None:
256 self.close()
258 def statistics(self) -> MemoryObjectStreamStatistics:
259 """
260 Return statistics about the current state of this stream.
262 .. versionadded:: 3.0
263 """
264 return self._state.statistics()
266 def __enter__(self) -> "MemoryObjectSendStream[T_Item]":
267 return self
269 def __exit__(
270 self,
271 exc_type: Optional[Type[BaseException]],
272 exc_val: Optional[BaseException],
273 exc_tb: Optional[TracebackType],
274 ) -> None:
275 self.close()