Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/asyncio/queues.py: 29%
129 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
3import collections
4import heapq
5import warnings
7from . import events
8from . import locks
11class QueueEmpty(Exception):
12 """Raised when Queue.get_nowait() is called on an empty Queue."""
13 pass
16class QueueFull(Exception):
17 """Raised when the Queue.put_nowait() method is called on a full Queue."""
18 pass
21class Queue:
22 """A queue, useful for coordinating producer and consumer coroutines.
24 If maxsize is less than or equal to zero, the queue size is infinite. If it
25 is an integer greater than 0, then "await put()" will block when the
26 queue reaches maxsize, until an item is removed by get().
28 Unlike the standard library Queue, you can reliably know this Queue's size
29 with qsize(), since your single-threaded asyncio application won't be
30 interrupted between calling qsize() and doing an operation on the Queue.
31 """
33 def __init__(self, maxsize=0, *, loop=None):
34 if loop is None:
35 self._loop = events.get_event_loop()
36 else:
37 self._loop = loop
38 warnings.warn("The loop argument is deprecated since Python 3.8, "
39 "and scheduled for removal in Python 3.10.",
40 DeprecationWarning, stacklevel=2)
41 self._maxsize = maxsize
43 # Futures.
44 self._getters = collections.deque()
45 # Futures.
46 self._putters = collections.deque()
47 self._unfinished_tasks = 0
48 self._finished = locks.Event(loop=loop)
49 self._finished.set()
50 self._init(maxsize)
52 # These three are overridable in subclasses.
54 def _init(self, maxsize):
55 self._queue = collections.deque()
57 def _get(self):
58 return self._queue.popleft()
60 def _put(self, item):
61 self._queue.append(item)
63 # End of the overridable methods.
65 def _wakeup_next(self, waiters):
66 # Wake up the next waiter (if any) that isn't cancelled.
67 while waiters:
68 waiter = waiters.popleft()
69 if not waiter.done():
70 waiter.set_result(None)
71 break
73 def __repr__(self):
74 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
76 def __str__(self):
77 return f'<{type(self).__name__} {self._format()}>'
79 def _format(self):
80 result = f'maxsize={self._maxsize!r}'
81 if getattr(self, '_queue', None):
82 result += f' _queue={list(self._queue)!r}'
83 if self._getters:
84 result += f' _getters[{len(self._getters)}]'
85 if self._putters:
86 result += f' _putters[{len(self._putters)}]'
87 if self._unfinished_tasks:
88 result += f' tasks={self._unfinished_tasks}'
89 return result
91 def qsize(self):
92 """Number of items in the queue."""
93 return len(self._queue)
95 @property
96 def maxsize(self):
97 """Number of items allowed in the queue."""
98 return self._maxsize
100 def empty(self):
101 """Return True if the queue is empty, False otherwise."""
102 return not self._queue
104 def full(self):
105 """Return True if there are maxsize items in the queue.
107 Note: if the Queue was initialized with maxsize=0 (the default),
108 then full() is never True.
109 """
110 if self._maxsize <= 0:
111 return False
112 else:
113 return self.qsize() >= self._maxsize
115 async def put(self, item):
116 """Put an item into the queue.
118 Put an item into the queue. If the queue is full, wait until a free
119 slot is available before adding item.
120 """
121 while self.full():
122 putter = self._loop.create_future()
123 self._putters.append(putter)
124 try:
125 await putter
126 except:
127 putter.cancel() # Just in case putter is not done yet.
128 try:
129 # Clean self._putters from canceled putters.
130 self._putters.remove(putter)
131 except ValueError:
132 # The putter could be removed from self._putters by a
133 # previous get_nowait call.
134 pass
135 if not self.full() and not putter.cancelled():
136 # We were woken up by get_nowait(), but can't take
137 # the call. Wake up the next in line.
138 self._wakeup_next(self._putters)
139 raise
140 return self.put_nowait(item)
142 def put_nowait(self, item):
143 """Put an item into the queue without blocking.
145 If no free slot is immediately available, raise QueueFull.
146 """
147 if self.full():
148 raise QueueFull
149 self._put(item)
150 self._unfinished_tasks += 1
151 self._finished.clear()
152 self._wakeup_next(self._getters)
154 async def get(self):
155 """Remove and return an item from the queue.
157 If queue is empty, wait until an item is available.
158 """
159 while self.empty():
160 getter = self._loop.create_future()
161 self._getters.append(getter)
162 try:
163 await getter
164 except:
165 getter.cancel() # Just in case getter is not done yet.
166 try:
167 # Clean self._getters from canceled getters.
168 self._getters.remove(getter)
169 except ValueError:
170 # The getter could be removed from self._getters by a
171 # previous put_nowait call.
172 pass
173 if not self.empty() and not getter.cancelled():
174 # We were woken up by put_nowait(), but can't take
175 # the call. Wake up the next in line.
176 self._wakeup_next(self._getters)
177 raise
178 return self.get_nowait()
180 def get_nowait(self):
181 """Remove and return an item from the queue.
183 Return an item if one is immediately available, else raise QueueEmpty.
184 """
185 if self.empty():
186 raise QueueEmpty
187 item = self._get()
188 self._wakeup_next(self._putters)
189 return item
191 def task_done(self):
192 """Indicate that a formerly enqueued task is complete.
194 Used by queue consumers. For each get() used to fetch a task,
195 a subsequent call to task_done() tells the queue that the processing
196 on the task is complete.
198 If a join() is currently blocking, it will resume when all items have
199 been processed (meaning that a task_done() call was received for every
200 item that had been put() into the queue).
202 Raises ValueError if called more times than there were items placed in
203 the queue.
204 """
205 if self._unfinished_tasks <= 0:
206 raise ValueError('task_done() called too many times')
207 self._unfinished_tasks -= 1
208 if self._unfinished_tasks == 0:
209 self._finished.set()
211 async def join(self):
212 """Block until all items in the queue have been gotten and processed.
214 The count of unfinished tasks goes up whenever an item is added to the
215 queue. The count goes down whenever a consumer calls task_done() to
216 indicate that the item was retrieved and all work on it is complete.
217 When the count of unfinished tasks drops to zero, join() unblocks.
218 """
219 if self._unfinished_tasks > 0:
220 await self._finished.wait()
223class PriorityQueue(Queue):
224 """A subclass of Queue; retrieves entries in priority order (lowest first).
226 Entries are typically tuples of the form: (priority number, data).
227 """
229 def _init(self, maxsize):
230 self._queue = []
232 def _put(self, item, heappush=heapq.heappush):
233 heappush(self._queue, item)
235 def _get(self, heappop=heapq.heappop):
236 return heappop(self._queue)
239class LifoQueue(Queue):
240 """A subclass of Queue that retrieves most recently added entries first."""
242 def _init(self, maxsize):
243 self._queue = []
245 def _put(self, item):
246 self._queue.append(item)
248 def _get(self):
249 return self._queue.pop()