Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tornado/queues.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2015 The Tornado Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
15"""Asynchronous queues for coroutines. These classes are very similar
16to those provided in the standard library's `asyncio package
17<https://docs.python.org/3/library/asyncio-queue.html>`_.
19.. warning::
21 Unlike the standard library's `queue` module, the classes defined here
22 are *not* thread-safe. To use these queues from another thread,
23 use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread
24 before calling any queue methods.
26"""
28import collections
29import datetime
30import heapq
32from tornado import gen, ioloop
33from tornado.concurrent import Future, future_set_result_unless_cancelled
34from tornado.locks import Event
36from typing import Union, TypeVar, Generic, Awaitable, Optional
37import typing
39if typing.TYPE_CHECKING:
40 from typing import Deque, Tuple, Any # noqa: F401
42_T = TypeVar("_T")
44__all__ = ["Queue", "PriorityQueue", "LifoQueue", "QueueFull", "QueueEmpty"]
47class QueueEmpty(Exception):
48 """Raised by `.Queue.get_nowait` when the queue has no items."""
50 pass
53class QueueFull(Exception):
54 """Raised by `.Queue.put_nowait` when a queue is at its maximum size."""
56 pass
59def _set_timeout(
60 future: Future, timeout: Union[None, float, datetime.timedelta]
61) -> None:
62 if timeout:
64 def on_timeout() -> None:
65 if not future.done():
66 future.set_exception(gen.TimeoutError())
68 io_loop = ioloop.IOLoop.current()
69 timeout_handle = io_loop.add_timeout(timeout, on_timeout)
70 future.add_done_callback(lambda _: io_loop.remove_timeout(timeout_handle))
73class _QueueIterator(Generic[_T]):
74 def __init__(self, q: "Queue[_T]") -> None:
75 self.q = q
77 def __anext__(self) -> Awaitable[_T]:
78 return self.q.get()
81class Queue(Generic[_T]):
82 """Coordinate producer and consumer coroutines.
84 If maxsize is 0 (the default) the queue size is unbounded.
86 .. testcode::
88 import asyncio
89 from tornado.ioloop import IOLoop
90 from tornado.queues import Queue
92 q = Queue(maxsize=2)
94 async def consumer():
95 async for item in q:
96 try:
97 print('Doing work on %s' % item)
98 await asyncio.sleep(0.01)
99 finally:
100 q.task_done()
102 async def producer():
103 for item in range(5):
104 await q.put(item)
105 print('Put %s' % item)
107 async def main():
108 # Start consumer without waiting (since it never finishes).
109 IOLoop.current().spawn_callback(consumer)
110 await producer() # Wait for producer to put all tasks.
111 await q.join() # Wait for consumer to finish all tasks.
112 print('Done')
114 asyncio.run(main())
116 .. testoutput::
118 Put 0
119 Put 1
120 Doing work on 0
121 Put 2
122 Doing work on 1
123 Put 3
124 Doing work on 2
125 Put 4
126 Doing work on 3
127 Doing work on 4
128 Done
131 In versions of Python without native coroutines (before 3.5),
132 ``consumer()`` could be written as::
134 @gen.coroutine
135 def consumer():
136 while True:
137 item = yield q.get()
138 try:
139 print('Doing work on %s' % item)
140 yield gen.sleep(0.01)
141 finally:
142 q.task_done()
144 .. versionchanged:: 4.3
145 Added ``async for`` support in Python 3.5.
147 """
149 # Exact type depends on subclass. Could be another generic
150 # parameter and use protocols to be more precise here.
151 _queue = None # type: Any
153 def __init__(self, maxsize: int = 0) -> None:
154 if maxsize is None:
155 raise TypeError("maxsize can't be None")
157 if maxsize < 0:
158 raise ValueError("maxsize can't be negative")
160 self._maxsize = maxsize
161 self._init()
162 self._getters = collections.deque([]) # type: Deque[Future[_T]]
163 self._putters = collections.deque([]) # type: Deque[Tuple[_T, Future[None]]]
164 self._unfinished_tasks = 0
165 self._finished = Event()
166 self._finished.set()
168 @property
169 def maxsize(self) -> int:
170 """Number of items allowed in the queue."""
171 return self._maxsize
173 def qsize(self) -> int:
174 """Number of items in the queue."""
175 return len(self._queue)
177 def empty(self) -> bool:
178 return not self._queue
180 def full(self) -> bool:
181 if self.maxsize == 0:
182 return False
183 else:
184 return self.qsize() >= self.maxsize
186 def put(
187 self, item: _T, timeout: Optional[Union[float, datetime.timedelta]] = None
188 ) -> "Future[None]":
189 """Put an item into the queue, perhaps waiting until there is room.
191 Returns a Future, which raises `tornado.util.TimeoutError` after a
192 timeout.
194 ``timeout`` may be a number denoting a time (on the same
195 scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
196 `datetime.timedelta` object for a deadline relative to the
197 current time.
198 """
199 future = Future() # type: Future[None]
200 try:
201 self.put_nowait(item)
202 except QueueFull:
203 self._putters.append((item, future))
204 _set_timeout(future, timeout)
205 else:
206 future.set_result(None)
207 return future
209 def put_nowait(self, item: _T) -> None:
210 """Put an item into the queue without blocking.
212 If no free slot is immediately available, raise `QueueFull`.
213 """
214 self._consume_expired()
215 if self._getters:
216 assert self.empty(), "queue non-empty, why are getters waiting?"
217 getter = self._getters.popleft()
218 self.__put_internal(item)
219 future_set_result_unless_cancelled(getter, self._get())
220 elif self.full():
221 raise QueueFull
222 else:
223 self.__put_internal(item)
225 def get(
226 self, timeout: Optional[Union[float, datetime.timedelta]] = None
227 ) -> Awaitable[_T]:
228 """Remove and return an item from the queue.
230 Returns an awaitable which resolves once an item is available, or raises
231 `tornado.util.TimeoutError` after a timeout.
233 ``timeout`` may be a number denoting a time (on the same
234 scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
235 `datetime.timedelta` object for a deadline relative to the
236 current time.
238 .. note::
240 The ``timeout`` argument of this method differs from that
241 of the standard library's `queue.Queue.get`. That method
242 interprets numeric values as relative timeouts; this one
243 interprets them as absolute deadlines and requires
244 ``timedelta`` objects for relative timeouts (consistent
245 with other timeouts in Tornado).
247 """
248 future = Future() # type: Future[_T]
249 try:
250 future.set_result(self.get_nowait())
251 except QueueEmpty:
252 self._getters.append(future)
253 _set_timeout(future, timeout)
254 return future
256 def get_nowait(self) -> _T:
257 """Remove and return an item from the queue without blocking.
259 Return an item if one is immediately available, else raise
260 `QueueEmpty`.
261 """
262 self._consume_expired()
263 if self._putters:
264 assert self.full(), "queue not full, why are putters waiting?"
265 item, putter = self._putters.popleft()
266 self.__put_internal(item)
267 future_set_result_unless_cancelled(putter, None)
268 return self._get()
269 elif self.qsize():
270 return self._get()
271 else:
272 raise QueueEmpty
274 def task_done(self) -> None:
275 """Indicate that a formerly enqueued task is complete.
277 Used by queue consumers. For each `.get` used to fetch a task, a
278 subsequent call to `.task_done` tells the queue that the processing
279 on the task is complete.
281 If a `.join` is blocking, it resumes when all items have been
282 processed; that is, when every `.put` is matched by a `.task_done`.
284 Raises `ValueError` if called more times than `.put`.
285 """
286 if self._unfinished_tasks <= 0:
287 raise ValueError("task_done() called too many times")
288 self._unfinished_tasks -= 1
289 if self._unfinished_tasks == 0:
290 self._finished.set()
292 def join(
293 self, timeout: Optional[Union[float, datetime.timedelta]] = None
294 ) -> Awaitable[None]:
295 """Block until all items in the queue are processed.
297 Returns an awaitable, which raises `tornado.util.TimeoutError` after a
298 timeout.
299 """
300 return self._finished.wait(timeout)
302 def __aiter__(self) -> _QueueIterator[_T]:
303 return _QueueIterator(self)
305 # These three are overridable in subclasses.
306 def _init(self) -> None:
307 self._queue = collections.deque()
309 def _get(self) -> _T:
310 return self._queue.popleft()
312 def _put(self, item: _T) -> None:
313 self._queue.append(item)
315 # End of the overridable methods.
317 def __put_internal(self, item: _T) -> None:
318 self._unfinished_tasks += 1
319 self._finished.clear()
320 self._put(item)
322 def _consume_expired(self) -> None:
323 # Remove timed-out waiters.
324 while self._putters and self._putters[0][1].done():
325 self._putters.popleft()
327 while self._getters and self._getters[0].done():
328 self._getters.popleft()
330 def __repr__(self) -> str:
331 return f"<{type(self).__name__} at {hex(id(self))} {self._format()}>"
333 def __str__(self) -> str:
334 return f"<{type(self).__name__} {self._format()}>"
336 def _format(self) -> str:
337 result = f"maxsize={self.maxsize!r}"
338 if getattr(self, "_queue", None):
339 result += " queue=%r" % self._queue
340 if self._getters:
341 result += " getters[%s]" % len(self._getters)
342 if self._putters:
343 result += " putters[%s]" % len(self._putters)
344 if self._unfinished_tasks:
345 result += " tasks=%s" % self._unfinished_tasks
346 return result
349class PriorityQueue(Queue):
350 """A `.Queue` that retrieves entries in priority order, lowest first.
352 Entries are typically tuples like ``(priority number, data)``.
354 .. testcode::
356 import asyncio
357 from tornado.queues import PriorityQueue
359 async def main():
360 q = PriorityQueue()
361 q.put((1, 'medium-priority item'))
362 q.put((0, 'high-priority item'))
363 q.put((10, 'low-priority item'))
365 print(await q.get())
366 print(await q.get())
367 print(await q.get())
369 asyncio.run(main())
371 .. testoutput::
373 (0, 'high-priority item')
374 (1, 'medium-priority item')
375 (10, 'low-priority item')
376 """
378 def _init(self) -> None:
379 self._queue = []
381 def _put(self, item: _T) -> None:
382 heapq.heappush(self._queue, item)
384 def _get(self) -> _T: # type: ignore[type-var]
385 return heapq.heappop(self._queue)
388class LifoQueue(Queue):
389 """A `.Queue` that retrieves the most recently put items first.
391 .. testcode::
393 import asyncio
394 from tornado.queues import LifoQueue
396 async def main():
397 q = LifoQueue()
398 q.put(3)
399 q.put(2)
400 q.put(1)
402 print(await q.get())
403 print(await q.get())
404 print(await q.get())
406 asyncio.run(main())
408 .. testoutput::
410 1
411 2
412 3
413 """
415 def _init(self) -> None:
416 self._queue = []
418 def _put(self, item: _T) -> None:
419 self._queue.append(item)
421 def _get(self) -> _T: # type: ignore[type-var]
422 return self._queue.pop()