Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/streams/memory.py: 40%
133 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
1from __future__ import annotations
3from collections import OrderedDict, deque
4from dataclasses import dataclass, field
5from types import TracebackType
6from typing import Generic, NamedTuple, TypeVar
8from .. import (
9 BrokenResourceError,
10 ClosedResourceError,
11 EndOfStream,
12 WouldBlock,
13)
14from ..abc import Event, ObjectReceiveStream, ObjectSendStream
15from ..lowlevel import checkpoint
17T_Item = TypeVar("T_Item")
18T_co = TypeVar("T_co", covariant=True)
19T_contra = TypeVar("T_contra", contravariant=True)
22class MemoryObjectStreamStatistics(NamedTuple):
23 current_buffer_used: int #: number of items stored in the buffer
24 #: maximum number of items that can be stored on this stream (or :data:`math.inf`)
25 max_buffer_size: float
26 open_send_streams: int #: number of unclosed clones of the send stream
27 open_receive_streams: int #: number of unclosed clones of the receive stream
28 #: number of tasks blocked on :meth:`MemoryObjectSendStream.send`
29 tasks_waiting_send: int
30 #: number of tasks blocked on :meth:`MemoryObjectReceiveStream.receive`
31 tasks_waiting_receive: int
34@dataclass(eq=False)
35class MemoryObjectStreamState(Generic[T_Item]):
36 max_buffer_size: float = field()
37 buffer: deque[T_Item] = field(init=False, default_factory=deque)
38 open_send_channels: int = field(init=False, default=0)
39 open_receive_channels: int = field(init=False, default=0)
40 waiting_receivers: OrderedDict[Event, list[T_Item]] = field(
41 init=False, default_factory=OrderedDict
42 )
43 waiting_senders: OrderedDict[Event, T_Item] = field(
44 init=False, default_factory=OrderedDict
45 )
47 def statistics(self) -> MemoryObjectStreamStatistics:
48 return MemoryObjectStreamStatistics(
49 len(self.buffer),
50 self.max_buffer_size,
51 self.open_send_channels,
52 self.open_receive_channels,
53 len(self.waiting_senders),
54 len(self.waiting_receivers),
55 )
58@dataclass(eq=False)
59class MemoryObjectReceiveStream(Generic[T_co], ObjectReceiveStream[T_co]):
60 _state: MemoryObjectStreamState[T_co]
61 _closed: bool = field(init=False, default=False)
63 def __post_init__(self) -> None:
64 self._state.open_receive_channels += 1
66 def receive_nowait(self) -> T_co:
67 """
68 Receive the next item if it can be done without waiting.
70 :return: the received item
71 :raises ~anyio.ClosedResourceError: if this send stream has been closed
72 :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been
73 closed from the sending end
74 :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks
75 waiting to send
77 """
78 if self._closed:
79 raise ClosedResourceError
81 if self._state.waiting_senders:
82 # Get the item from the next sender
83 send_event, item = self._state.waiting_senders.popitem(last=False)
84 self._state.buffer.append(item)
85 send_event.set()
87 if self._state.buffer:
88 return self._state.buffer.popleft()
89 elif not self._state.open_send_channels:
90 raise EndOfStream
92 raise WouldBlock
94 async def receive(self) -> T_co:
95 await checkpoint()
96 try:
97 return self.receive_nowait()
98 except WouldBlock:
99 # Add ourselves in the queue
100 receive_event = Event()
101 container: list[T_co] = []
102 self._state.waiting_receivers[receive_event] = container
104 try:
105 await receive_event.wait()
106 finally:
107 self._state.waiting_receivers.pop(receive_event, None)
109 if container:
110 return container[0]
111 else:
112 raise EndOfStream
114 def clone(self) -> MemoryObjectReceiveStream[T_co]:
115 """
116 Create a clone of this receive stream.
118 Each clone can be closed separately. Only when all clones have been closed will
119 the receiving end of the memory stream be considered closed by the sending ends.
121 :return: the cloned stream
123 """
124 if self._closed:
125 raise ClosedResourceError
127 return MemoryObjectReceiveStream(_state=self._state)
129 def close(self) -> None:
130 """
131 Close the stream.
133 This works the exact same way as :meth:`aclose`, but is provided as a special
134 case for the benefit of synchronous callbacks.
136 """
137 if not self._closed:
138 self._closed = True
139 self._state.open_receive_channels -= 1
140 if self._state.open_receive_channels == 0:
141 send_events = list(self._state.waiting_senders.keys())
142 for event in send_events:
143 event.set()
145 async def aclose(self) -> None:
146 self.close()
148 def statistics(self) -> MemoryObjectStreamStatistics:
149 """
150 Return statistics about the current state of this stream.
152 .. versionadded:: 3.0
153 """
154 return self._state.statistics()
156 def __enter__(self) -> MemoryObjectReceiveStream[T_co]:
157 return self
159 def __exit__(
160 self,
161 exc_type: type[BaseException] | None,
162 exc_val: BaseException | None,
163 exc_tb: TracebackType | None,
164 ) -> None:
165 self.close()
168@dataclass(eq=False)
169class MemoryObjectSendStream(Generic[T_contra], ObjectSendStream[T_contra]):
170 _state: MemoryObjectStreamState[T_contra]
171 _closed: bool = field(init=False, default=False)
173 def __post_init__(self) -> None:
174 self._state.open_send_channels += 1
176 def send_nowait(self, item: T_contra) -> None:
177 """
178 Send an item immediately if it can be done without waiting.
180 :param item: the item to send
181 :raises ~anyio.ClosedResourceError: if this send stream has been closed
182 :raises ~anyio.BrokenResourceError: if the stream has been closed from the
183 receiving end
184 :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting
185 to receive
187 """
188 if self._closed:
189 raise ClosedResourceError
190 if not self._state.open_receive_channels:
191 raise BrokenResourceError
193 if self._state.waiting_receivers:
194 receive_event, container = self._state.waiting_receivers.popitem(last=False)
195 container.append(item)
196 receive_event.set()
197 elif len(self._state.buffer) < self._state.max_buffer_size:
198 self._state.buffer.append(item)
199 else:
200 raise WouldBlock
202 async def send(self, item: T_contra) -> None:
203 """
204 Send an item to the stream.
206 If the buffer is full, this method blocks until there is again room in the
207 buffer or the item can be sent directly to a receiver.
209 :param item: the item to send
210 :raises ~anyio.ClosedResourceError: if this send stream has been closed
211 :raises ~anyio.BrokenResourceError: if the stream has been closed from the
212 receiving end
214 """
215 await checkpoint()
216 try:
217 self.send_nowait(item)
218 except WouldBlock:
219 # Wait until there's someone on the receiving end
220 send_event = Event()
221 self._state.waiting_senders[send_event] = item
222 try:
223 await send_event.wait()
224 except BaseException:
225 self._state.waiting_senders.pop(
226 send_event, None # type: ignore[arg-type]
227 )
228 raise
230 if self._state.waiting_senders.pop(
231 send_event, None # type: ignore[arg-type]
232 ):
233 raise BrokenResourceError from None
235 def clone(self) -> MemoryObjectSendStream[T_contra]:
236 """
237 Create a clone of this send stream.
239 Each clone can be closed separately. Only when all clones have been closed will
240 the sending end of the memory stream be considered closed by the receiving ends.
242 :return: the cloned stream
244 """
245 if self._closed:
246 raise ClosedResourceError
248 return MemoryObjectSendStream(_state=self._state)
250 def close(self) -> None:
251 """
252 Close the stream.
254 This works the exact same way as :meth:`aclose`, but is provided as a special
255 case for the benefit of synchronous callbacks.
257 """
258 if not self._closed:
259 self._closed = True
260 self._state.open_send_channels -= 1
261 if self._state.open_send_channels == 0:
262 receive_events = list(self._state.waiting_receivers.keys())
263 self._state.waiting_receivers.clear()
264 for event in receive_events:
265 event.set()
267 async def aclose(self) -> None:
268 self.close()
270 def statistics(self) -> MemoryObjectStreamStatistics:
271 """
272 Return statistics about the current state of this stream.
274 .. versionadded:: 3.0
275 """
276 return self._state.statistics()
278 def __enter__(self) -> MemoryObjectSendStream[T_contra]:
279 return self
281 def __exit__(
282 self,
283 exc_type: type[BaseException] | None,
284 exc_val: BaseException | None,
285 exc_tb: TracebackType | None,
286 ) -> None:
287 self.close()