Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/asyncio/queues.py: 30%
131 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
1__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
3import collections
4import heapq
5import warnings
7from . import events
8from . import locks
11class QueueEmpty(Exception):
12 """Raised when Queue.get_nowait() is called on an empty Queue."""
13 pass
16class QueueFull(Exception):
17 """Raised when the Queue.put_nowait() method is called on a full Queue."""
18 pass
21class Queue:
22 """A queue, useful for coordinating producer and consumer coroutines.
24 If maxsize is less than or equal to zero, the queue size is infinite. If it
25 is an integer greater than 0, then "await put()" will block when the
26 queue reaches maxsize, until an item is removed by get().
28 Unlike the standard library Queue, you can reliably know this Queue's size
29 with qsize(), since your single-threaded asyncio application won't be
30 interrupted between calling qsize() and doing an operation on the Queue.
31 """
33 def __init__(self, maxsize=0, *, loop=None):
34 if loop is None:
35 self._loop = events.get_event_loop()
36 else:
37 self._loop = loop
38 warnings.warn("The loop argument is deprecated since Python 3.8, "
39 "and scheduled for removal in Python 3.10.",
40 DeprecationWarning, stacklevel=2)
41 self._maxsize = maxsize
43 # Futures.
44 self._getters = collections.deque()
45 # Futures.
46 self._putters = collections.deque()
47 self._unfinished_tasks = 0
48 self._finished = locks.Event(loop=loop)
49 self._finished.set()
50 self._init(maxsize)
52 # These three are overridable in subclasses.
54 def _init(self, maxsize):
55 self._queue = collections.deque()
57 def _get(self):
58 return self._queue.popleft()
60 def _put(self, item):
61 self._queue.append(item)
63 # End of the overridable methods.
65 def _wakeup_next(self, waiters):
66 # Wake up the next waiter (if any) that isn't cancelled.
67 while waiters:
68 waiter = waiters.popleft()
69 if not waiter.done():
70 waiter.set_result(None)
71 break
73 def __repr__(self):
74 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
76 def __str__(self):
77 return f'<{type(self).__name__} {self._format()}>'
79 def __class_getitem__(cls, type):
80 return cls
82 def _format(self):
83 result = f'maxsize={self._maxsize!r}'
84 if getattr(self, '_queue', None):
85 result += f' _queue={list(self._queue)!r}'
86 if self._getters:
87 result += f' _getters[{len(self._getters)}]'
88 if self._putters:
89 result += f' _putters[{len(self._putters)}]'
90 if self._unfinished_tasks:
91 result += f' tasks={self._unfinished_tasks}'
92 return result
94 def qsize(self):
95 """Number of items in the queue."""
96 return len(self._queue)
98 @property
99 def maxsize(self):
100 """Number of items allowed in the queue."""
101 return self._maxsize
103 def empty(self):
104 """Return True if the queue is empty, False otherwise."""
105 return not self._queue
107 def full(self):
108 """Return True if there are maxsize items in the queue.
110 Note: if the Queue was initialized with maxsize=0 (the default),
111 then full() is never True.
112 """
113 if self._maxsize <= 0:
114 return False
115 else:
116 return self.qsize() >= self._maxsize
118 async def put(self, item):
119 """Put an item into the queue.
121 Put an item into the queue. If the queue is full, wait until a free
122 slot is available before adding item.
123 """
124 while self.full():
125 putter = self._loop.create_future()
126 self._putters.append(putter)
127 try:
128 await putter
129 except:
130 putter.cancel() # Just in case putter is not done yet.
131 try:
132 # Clean self._putters from canceled putters.
133 self._putters.remove(putter)
134 except ValueError:
135 # The putter could be removed from self._putters by a
136 # previous get_nowait call.
137 pass
138 if not self.full() and not putter.cancelled():
139 # We were woken up by get_nowait(), but can't take
140 # the call. Wake up the next in line.
141 self._wakeup_next(self._putters)
142 raise
143 return self.put_nowait(item)
145 def put_nowait(self, item):
146 """Put an item into the queue without blocking.
148 If no free slot is immediately available, raise QueueFull.
149 """
150 if self.full():
151 raise QueueFull
152 self._put(item)
153 self._unfinished_tasks += 1
154 self._finished.clear()
155 self._wakeup_next(self._getters)
157 async def get(self):
158 """Remove and return an item from the queue.
160 If queue is empty, wait until an item is available.
161 """
162 while self.empty():
163 getter = self._loop.create_future()
164 self._getters.append(getter)
165 try:
166 await getter
167 except:
168 getter.cancel() # Just in case getter is not done yet.
169 try:
170 # Clean self._getters from canceled getters.
171 self._getters.remove(getter)
172 except ValueError:
173 # The getter could be removed from self._getters by a
174 # previous put_nowait call.
175 pass
176 if not self.empty() and not getter.cancelled():
177 # We were woken up by put_nowait(), but can't take
178 # the call. Wake up the next in line.
179 self._wakeup_next(self._getters)
180 raise
181 return self.get_nowait()
183 def get_nowait(self):
184 """Remove and return an item from the queue.
186 Return an item if one is immediately available, else raise QueueEmpty.
187 """
188 if self.empty():
189 raise QueueEmpty
190 item = self._get()
191 self._wakeup_next(self._putters)
192 return item
194 def task_done(self):
195 """Indicate that a formerly enqueued task is complete.
197 Used by queue consumers. For each get() used to fetch a task,
198 a subsequent call to task_done() tells the queue that the processing
199 on the task is complete.
201 If a join() is currently blocking, it will resume when all items have
202 been processed (meaning that a task_done() call was received for every
203 item that had been put() into the queue).
205 Raises ValueError if called more times than there were items placed in
206 the queue.
207 """
208 if self._unfinished_tasks <= 0:
209 raise ValueError('task_done() called too many times')
210 self._unfinished_tasks -= 1
211 if self._unfinished_tasks == 0:
212 self._finished.set()
214 async def join(self):
215 """Block until all items in the queue have been gotten and processed.
217 The count of unfinished tasks goes up whenever an item is added to the
218 queue. The count goes down whenever a consumer calls task_done() to
219 indicate that the item was retrieved and all work on it is complete.
220 When the count of unfinished tasks drops to zero, join() unblocks.
221 """
222 if self._unfinished_tasks > 0:
223 await self._finished.wait()
226class PriorityQueue(Queue):
227 """A subclass of Queue; retrieves entries in priority order (lowest first).
229 Entries are typically tuples of the form: (priority number, data).
230 """
232 def _init(self, maxsize):
233 self._queue = []
235 def _put(self, item, heappush=heapq.heappush):
236 heappush(self._queue, item)
238 def _get(self, heappop=heapq.heappop):
239 return heappop(self._queue)
242class LifoQueue(Queue):
243 """A subclass of Queue that retrieves most recently added entries first."""
245 def _init(self, maxsize):
246 self._queue = []
248 def _put(self, item):
249 self._queue.append(item)
251 def _get(self):
252 return self._queue.pop()