Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/streams/memory.py: 39%
138 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1from __future__ import annotations
3from collections import OrderedDict, deque
4from dataclasses import dataclass, field
5from types import TracebackType
6from typing import Generic, NamedTuple, TypeVar
8from .. import (
9 BrokenResourceError,
10 ClosedResourceError,
11 EndOfStream,
12 WouldBlock,
13 get_cancelled_exc_class,
14)
15from .._core._compat import DeprecatedAwaitable
16from ..abc import Event, ObjectReceiveStream, ObjectSendStream
17from ..lowlevel import checkpoint
19T_Item = TypeVar("T_Item")
20T_co = TypeVar("T_co", covariant=True)
21T_contra = TypeVar("T_contra", contravariant=True)
24class MemoryObjectStreamStatistics(NamedTuple):
25 current_buffer_used: int #: number of items stored in the buffer
26 #: maximum number of items that can be stored on this stream (or :data:`math.inf`)
27 max_buffer_size: float
28 open_send_streams: int #: number of unclosed clones of the send stream
29 open_receive_streams: int #: number of unclosed clones of the receive stream
30 tasks_waiting_send: int #: number of tasks blocked on :meth:`MemoryObjectSendStream.send`
31 #: number of tasks blocked on :meth:`MemoryObjectReceiveStream.receive`
32 tasks_waiting_receive: int
35@dataclass(eq=False)
36class MemoryObjectStreamState(Generic[T_Item]):
37 max_buffer_size: float = field()
38 buffer: deque[T_Item] = field(init=False, default_factory=deque)
39 open_send_channels: int = field(init=False, default=0)
40 open_receive_channels: int = field(init=False, default=0)
41 waiting_receivers: OrderedDict[Event, list[T_Item]] = field(
42 init=False, default_factory=OrderedDict
43 )
44 waiting_senders: OrderedDict[Event, T_Item] = field(
45 init=False, default_factory=OrderedDict
46 )
48 def statistics(self) -> MemoryObjectStreamStatistics:
49 return MemoryObjectStreamStatistics(
50 len(self.buffer),
51 self.max_buffer_size,
52 self.open_send_channels,
53 self.open_receive_channels,
54 len(self.waiting_senders),
55 len(self.waiting_receivers),
56 )
59@dataclass(eq=False)
60class MemoryObjectReceiveStream(Generic[T_co], ObjectReceiveStream[T_co]):
61 _state: MemoryObjectStreamState[T_co]
62 _closed: bool = field(init=False, default=False)
64 def __post_init__(self) -> None:
65 self._state.open_receive_channels += 1
67 def receive_nowait(self) -> T_co:
68 """
69 Receive the next item if it can be done without waiting.
71 :return: the received item
72 :raises ~anyio.ClosedResourceError: if this send stream has been closed
73 :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been
74 closed from the sending end
75 :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks
76 waiting to send
78 """
79 if self._closed:
80 raise ClosedResourceError
82 if self._state.waiting_senders:
83 # Get the item from the next sender
84 send_event, item = self._state.waiting_senders.popitem(last=False)
85 self._state.buffer.append(item)
86 send_event.set()
88 if self._state.buffer:
89 return self._state.buffer.popleft()
90 elif not self._state.open_send_channels:
91 raise EndOfStream
93 raise WouldBlock
95 async def receive(self) -> T_co:
96 await checkpoint()
97 try:
98 return self.receive_nowait()
99 except WouldBlock:
100 # Add ourselves in the queue
101 receive_event = Event()
102 container: list[T_co] = []
103 self._state.waiting_receivers[receive_event] = container
105 try:
106 await receive_event.wait()
107 except get_cancelled_exc_class():
108 # Ignore the immediate cancellation if we already received an item, so as not to
109 # lose it
110 if not container:
111 raise
112 finally:
113 self._state.waiting_receivers.pop(receive_event, None)
115 if container:
116 return container[0]
117 else:
118 raise EndOfStream
120 def clone(self) -> MemoryObjectReceiveStream[T_co]:
121 """
122 Create a clone of this receive stream.
124 Each clone can be closed separately. Only when all clones have been closed will the
125 receiving end of the memory stream be considered closed by the sending ends.
127 :return: the cloned stream
129 """
130 if self._closed:
131 raise ClosedResourceError
133 return MemoryObjectReceiveStream(_state=self._state)
135 def close(self) -> None:
136 """
137 Close the stream.
139 This works the exact same way as :meth:`aclose`, but is provided as a special case for the
140 benefit of synchronous callbacks.
142 """
143 if not self._closed:
144 self._closed = True
145 self._state.open_receive_channels -= 1
146 if self._state.open_receive_channels == 0:
147 send_events = list(self._state.waiting_senders.keys())
148 for event in send_events:
149 event.set()
151 async def aclose(self) -> None:
152 self.close()
154 def statistics(self) -> MemoryObjectStreamStatistics:
155 """
156 Return statistics about the current state of this stream.
158 .. versionadded:: 3.0
159 """
160 return self._state.statistics()
162 def __enter__(self) -> MemoryObjectReceiveStream[T_co]:
163 return self
165 def __exit__(
166 self,
167 exc_type: type[BaseException] | None,
168 exc_val: BaseException | None,
169 exc_tb: TracebackType | None,
170 ) -> None:
171 self.close()
174@dataclass(eq=False)
175class MemoryObjectSendStream(Generic[T_contra], ObjectSendStream[T_contra]):
176 _state: MemoryObjectStreamState[T_contra]
177 _closed: bool = field(init=False, default=False)
179 def __post_init__(self) -> None:
180 self._state.open_send_channels += 1
182 def send_nowait(self, item: T_contra) -> DeprecatedAwaitable:
183 """
184 Send an item immediately if it can be done without waiting.
186 :param item: the item to send
187 :raises ~anyio.ClosedResourceError: if this send stream has been closed
188 :raises ~anyio.BrokenResourceError: if the stream has been closed from the
189 receiving end
190 :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting
191 to receive
193 """
194 if self._closed:
195 raise ClosedResourceError
196 if not self._state.open_receive_channels:
197 raise BrokenResourceError
199 if self._state.waiting_receivers:
200 receive_event, container = self._state.waiting_receivers.popitem(last=False)
201 container.append(item)
202 receive_event.set()
203 elif len(self._state.buffer) < self._state.max_buffer_size:
204 self._state.buffer.append(item)
205 else:
206 raise WouldBlock
208 return DeprecatedAwaitable(self.send_nowait)
210 async def send(self, item: T_contra) -> None:
211 await checkpoint()
212 try:
213 self.send_nowait(item)
214 except WouldBlock:
215 # Wait until there's someone on the receiving end
216 send_event = Event()
217 self._state.waiting_senders[send_event] = item
218 try:
219 await send_event.wait()
220 except BaseException:
221 self._state.waiting_senders.pop(send_event, None) # type: ignore[arg-type]
222 raise
224 if self._state.waiting_senders.pop(send_event, None): # type: ignore[arg-type]
225 raise BrokenResourceError
227 def clone(self) -> MemoryObjectSendStream[T_contra]:
228 """
229 Create a clone of this send stream.
231 Each clone can be closed separately. Only when all clones have been closed will the
232 sending end of the memory stream be considered closed by the receiving ends.
234 :return: the cloned stream
236 """
237 if self._closed:
238 raise ClosedResourceError
240 return MemoryObjectSendStream(_state=self._state)
242 def close(self) -> None:
243 """
244 Close the stream.
246 This works the exact same way as :meth:`aclose`, but is provided as a special case for the
247 benefit of synchronous callbacks.
249 """
250 if not self._closed:
251 self._closed = True
252 self._state.open_send_channels -= 1
253 if self._state.open_send_channels == 0:
254 receive_events = list(self._state.waiting_receivers.keys())
255 self._state.waiting_receivers.clear()
256 for event in receive_events:
257 event.set()
259 async def aclose(self) -> None:
260 self.close()
262 def statistics(self) -> MemoryObjectStreamStatistics:
263 """
264 Return statistics about the current state of this stream.
266 .. versionadded:: 3.0
267 """
268 return self._state.statistics()
270 def __enter__(self) -> MemoryObjectSendStream[T_contra]:
271 return self
273 def __exit__(
274 self,
275 exc_type: type[BaseException] | None,
276 exc_val: BaseException | None,
277 exc_tb: TracebackType | None,
278 ) -> None:
279 self.close()