1# util/queue.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""An adaptation of Py2.3/2.4's Queue module which supports reentrant
9behavior, using RLock instead of Lock for its mutex object. The
10Queue object is used exclusively by the sqlalchemy.pool.QueuePool
11class.
12
13This is to support the connection pool's usage of weakref callbacks to return
14connections to the underlying Queue, which can in extremely
15rare cases be invoked within the ``get()`` method of the Queue itself,
16producing a ``put()`` inside the ``get()`` and therefore a reentrant
17condition.
18
19"""
20
21from collections import deque
22from time import time as _time
23
24from . import compat
25from .compat import threading
26from .concurrency import asyncio
27from .concurrency import await_fallback
28from .concurrency import await_only
29from .langhelpers import memoized_property
30
31
32__all__ = ["Empty", "Full", "Queue"]
33
34
35class Empty(Exception):
36 "Exception raised by Queue.get(block=0)/get_nowait()."
37
38 pass
39
40
41class Full(Exception):
42 "Exception raised by Queue.put(block=0)/put_nowait()."
43
44 pass
45
46
47class Queue:
48 def __init__(self, maxsize=0, use_lifo=False):
49 """Initialize a queue object with a given maximum size.
50
51 If `maxsize` is <= 0, the queue size is infinite.
52
53 If `use_lifo` is True, this Queue acts like a Stack (LIFO).
54 """
55
56 self._init(maxsize)
57 # mutex must be held whenever the queue is mutating. All methods
58 # that acquire mutex must release it before returning. mutex
59 # is shared between the two conditions, so acquiring and
60 # releasing the conditions also acquires and releases mutex.
61 self.mutex = threading.RLock()
62 # Notify not_empty whenever an item is added to the queue; a
63 # thread waiting to get is notified then.
64 self.not_empty = threading.Condition(self.mutex)
65 # Notify not_full whenever an item is removed from the queue;
66 # a thread waiting to put is notified then.
67 self.not_full = threading.Condition(self.mutex)
68 # If this queue uses LIFO or FIFO
69 self.use_lifo = use_lifo
70
71 def qsize(self):
72 """Return the approximate size of the queue (not reliable!)."""
73
74 with self.mutex:
75 return self._qsize()
76
77 def empty(self):
78 """Return True if the queue is empty, False otherwise (not
79 reliable!)."""
80
81 with self.mutex:
82 return self._empty()
83
84 def full(self):
85 """Return True if the queue is full, False otherwise (not
86 reliable!)."""
87
88 with self.mutex:
89 return self._full()
90
91 def put(self, item, block=True, timeout=None):
92 """Put an item into the queue.
93
94 If optional args `block` is True and `timeout` is None (the
95 default), block if necessary until a free slot is
96 available. If `timeout` is a positive number, it blocks at
97 most `timeout` seconds and raises the ``Full`` exception if no
98 free slot was available within that time. Otherwise (`block`
99 is false), put an item on the queue if a free slot is
100 immediately available, else raise the ``Full`` exception
101 (`timeout` is ignored in that case).
102 """
103
104 with self.not_full:
105 if not block:
106 if self._full():
107 raise Full
108 elif timeout is None:
109 while self._full():
110 self.not_full.wait()
111 else:
112 if timeout < 0:
113 raise ValueError("'timeout' must be a positive number")
114 endtime = _time() + timeout
115 while self._full():
116 remaining = endtime - _time()
117 if remaining <= 0.0:
118 raise Full
119 self.not_full.wait(remaining)
120 self._put(item)
121 self.not_empty.notify()
122
123 def put_nowait(self, item):
124 """Put an item into the queue without blocking.
125
126 Only enqueue the item if a free slot is immediately available.
127 Otherwise raise the ``Full`` exception.
128 """
129 return self.put(item, False)
130
131 def get(self, block=True, timeout=None):
132 """Remove and return an item from the queue.
133
134 If optional args `block` is True and `timeout` is None (the
135 default), block if necessary until an item is available. If
136 `timeout` is a positive number, it blocks at most `timeout`
137 seconds and raises the ``Empty`` exception if no item was
138 available within that time. Otherwise (`block` is false),
139 return an item if one is immediately available, else raise the
140 ``Empty`` exception (`timeout` is ignored in that case).
141
142 """
143 with self.not_empty:
144 if not block:
145 if self._empty():
146 raise Empty
147 elif timeout is None:
148 while self._empty():
149 self.not_empty.wait()
150 else:
151 if timeout < 0:
152 raise ValueError("'timeout' must be a positive number")
153 endtime = _time() + timeout
154 while self._empty():
155 remaining = endtime - _time()
156 if remaining <= 0.0:
157 raise Empty
158 self.not_empty.wait(remaining)
159 item = self._get()
160 self.not_full.notify()
161 return item
162
163 def get_nowait(self):
164 """Remove and return an item from the queue without blocking.
165
166 Only get an item if one is immediately available. Otherwise
167 raise the ``Empty`` exception.
168 """
169
170 return self.get(False)
171
172 # Override these methods to implement other queue organizations
173 # (e.g. stack or priority queue).
174 # These will only be called with appropriate locks held
175
176 # Initialize the queue representation
177 def _init(self, maxsize):
178 self.maxsize = maxsize
179 self.queue = deque()
180
181 def _qsize(self):
182 return len(self.queue)
183
184 # Check whether the queue is empty
185 def _empty(self):
186 return not self.queue
187
188 # Check whether the queue is full
189 def _full(self):
190 return self.maxsize > 0 and len(self.queue) == self.maxsize
191
192 # Put a new item in the queue
193 def _put(self, item):
194 self.queue.append(item)
195
196 # Get an item from the queue
197 def _get(self):
198 if self.use_lifo:
199 # LIFO
200 return self.queue.pop()
201 else:
202 # FIFO
203 return self.queue.popleft()
204
205
206class AsyncAdaptedQueue:
207 await_ = staticmethod(await_only)
208
209 def __init__(self, maxsize=0, use_lifo=False):
210 self.use_lifo = use_lifo
211 self.maxsize = maxsize
212
213 def empty(self):
214 return self._queue.empty()
215
216 def full(self):
217 return self._queue.full()
218
219 def qsize(self):
220 return self._queue.qsize()
221
222 @memoized_property
223 def _queue(self):
224 # Delay creation of the queue until it is first used, to avoid
225 # binding it to a possibly wrong event loop.
226 # By delaying the creation of the pool we accommodate the common
227 # usage pattern of instantiating the engine at module level, where a
228 # different event loop is in present compared to when the application
229 # is actually run.
230
231 if self.use_lifo:
232 queue = asyncio.LifoQueue(maxsize=self.maxsize)
233 else:
234 queue = asyncio.Queue(maxsize=self.maxsize)
235 return queue
236
237 def put_nowait(self, item):
238 try:
239 return self._queue.put_nowait(item)
240 except asyncio.QueueFull as err:
241 compat.raise_(
242 Full(),
243 replace_context=err,
244 )
245
246 def put(self, item, block=True, timeout=None):
247 if not block:
248 return self.put_nowait(item)
249
250 try:
251 if timeout is not None:
252 return self.await_(
253 asyncio.wait_for(self._queue.put(item), timeout)
254 )
255 else:
256 return self.await_(self._queue.put(item))
257 except (asyncio.QueueFull, asyncio.TimeoutError) as err:
258 compat.raise_(
259 Full(),
260 replace_context=err,
261 )
262
263 def get_nowait(self):
264 try:
265 return self._queue.get_nowait()
266 except asyncio.QueueEmpty as err:
267 compat.raise_(
268 Empty(),
269 replace_context=err,
270 )
271
272 def get(self, block=True, timeout=None):
273 if not block:
274 return self.get_nowait()
275
276 try:
277 if timeout is not None:
278 return self.await_(
279 asyncio.wait_for(self._queue.get(), timeout)
280 )
281 else:
282 return self.await_(self._queue.get())
283 except (asyncio.QueueEmpty, asyncio.TimeoutError) as err:
284 compat.raise_(
285 Empty(),
286 replace_context=err,
287 )
288
289
290class FallbackAsyncAdaptedQueue(AsyncAdaptedQueue):
291 await_ = staticmethod(await_fallback)