1# util/queue.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""An adaptation of Py2.3/2.4's Queue module which supports reentrant
10behavior, using RLock instead of Lock for its mutex object. The
11Queue object is used exclusively by the sqlalchemy.pool.QueuePool
12class.
13
14This is to support the connection pool's usage of weakref callbacks to return
15connections to the underlying Queue, which can in extremely
16rare cases be invoked within the ``get()`` method of the Queue itself,
17producing a ``put()`` inside the ``get()`` and therefore a reentrant
18condition.
19
20"""
21from __future__ import annotations
22
23import asyncio
24from collections import deque
25import threading
26from time import time as _time
27import typing
28from typing import Any
29from typing import Awaitable
30from typing import Deque
31from typing import Generic
32from typing import Optional
33from typing import TypeVar
34
35from .concurrency import await_fallback
36from .concurrency import await_only
37from .langhelpers import memoized_property
38
39
40_T = TypeVar("_T", bound=Any)
41__all__ = ["Empty", "Full", "Queue"]
42
43
44class Empty(Exception):
45 "Exception raised by Queue.get(block=0)/get_nowait()."
46
47 pass
48
49
50class Full(Exception):
51 "Exception raised by Queue.put(block=0)/put_nowait()."
52
53 pass
54
55
56class QueueCommon(Generic[_T]):
57 maxsize: int
58 use_lifo: bool
59
60 def __init__(self, maxsize: int = 0, use_lifo: bool = False): ...
61
62 def empty(self) -> bool:
63 raise NotImplementedError()
64
65 def full(self) -> bool:
66 raise NotImplementedError()
67
68 def qsize(self) -> int:
69 raise NotImplementedError()
70
71 def put_nowait(self, item: _T) -> None:
72 raise NotImplementedError()
73
74 def put(
75 self, item: _T, block: bool = True, timeout: Optional[float] = None
76 ) -> None:
77 raise NotImplementedError()
78
79 def get_nowait(self) -> _T:
80 raise NotImplementedError()
81
82 def get(self, block: bool = True, timeout: Optional[float] = None) -> _T:
83 raise NotImplementedError()
84
85
86class Queue(QueueCommon[_T]):
87 queue: Deque[_T]
88
89 def __init__(self, maxsize: int = 0, use_lifo: bool = False):
90 """Initialize a queue object with a given maximum size.
91
92 If `maxsize` is <= 0, the queue size is infinite.
93
94 If `use_lifo` is True, this Queue acts like a Stack (LIFO).
95 """
96
97 self._init(maxsize)
98 # mutex must be held whenever the queue is mutating. All methods
99 # that acquire mutex must release it before returning. mutex
100 # is shared between the two conditions, so acquiring and
101 # releasing the conditions also acquires and releases mutex.
102 self.mutex = threading.RLock()
103 # Notify not_empty whenever an item is added to the queue; a
104 # thread waiting to get is notified then.
105 self.not_empty = threading.Condition(self.mutex)
106 # Notify not_full whenever an item is removed from the queue;
107 # a thread waiting to put is notified then.
108 self.not_full = threading.Condition(self.mutex)
109 # If this queue uses LIFO or FIFO
110 self.use_lifo = use_lifo
111
112 def qsize(self) -> int:
113 """Return the approximate size of the queue (not reliable!)."""
114
115 with self.mutex:
116 return self._qsize()
117
118 def empty(self) -> bool:
119 """Return True if the queue is empty, False otherwise (not
120 reliable!)."""
121
122 with self.mutex:
123 return self._empty()
124
125 def full(self) -> bool:
126 """Return True if the queue is full, False otherwise (not
127 reliable!)."""
128
129 with self.mutex:
130 return self._full()
131
132 def put(
133 self, item: _T, block: bool = True, timeout: Optional[float] = None
134 ) -> None:
135 """Put an item into the queue.
136
137 If optional args `block` is True and `timeout` is None (the
138 default), block if necessary until a free slot is
139 available. If `timeout` is a positive number, it blocks at
140 most `timeout` seconds and raises the ``Full`` exception if no
141 free slot was available within that time. Otherwise (`block`
142 is false), put an item on the queue if a free slot is
143 immediately available, else raise the ``Full`` exception
144 (`timeout` is ignored in that case).
145 """
146
147 with self.not_full:
148 if not block:
149 if self._full():
150 raise Full
151 elif timeout is None:
152 while self._full():
153 self.not_full.wait()
154 else:
155 if timeout < 0:
156 raise ValueError("'timeout' must be a positive number")
157 endtime = _time() + timeout
158 while self._full():
159 remaining = endtime - _time()
160 if remaining <= 0.0:
161 raise Full
162 self.not_full.wait(remaining)
163 self._put(item)
164 self.not_empty.notify()
165
166 def put_nowait(self, item: _T) -> None:
167 """Put an item into the queue without blocking.
168
169 Only enqueue the item if a free slot is immediately available.
170 Otherwise raise the ``Full`` exception.
171 """
172 return self.put(item, False)
173
174 def get(self, block: bool = True, timeout: Optional[float] = None) -> _T:
175 """Remove and return an item from the queue.
176
177 If optional args `block` is True and `timeout` is None (the
178 default), block if necessary until an item is available. If
179 `timeout` is a positive number, it blocks at most `timeout`
180 seconds and raises the ``Empty`` exception if no item was
181 available within that time. Otherwise (`block` is false),
182 return an item if one is immediately available, else raise the
183 ``Empty`` exception (`timeout` is ignored in that case).
184
185 """
186 with self.not_empty:
187 if not block:
188 if self._empty():
189 raise Empty
190 elif timeout is None:
191 while self._empty():
192 self.not_empty.wait()
193 else:
194 if timeout < 0:
195 raise ValueError("'timeout' must be a positive number")
196 endtime = _time() + timeout
197 while self._empty():
198 remaining = endtime - _time()
199 if remaining <= 0.0:
200 raise Empty
201 self.not_empty.wait(remaining)
202 item = self._get()
203 self.not_full.notify()
204 return item
205
206 def get_nowait(self) -> _T:
207 """Remove and return an item from the queue without blocking.
208
209 Only get an item if one is immediately available. Otherwise
210 raise the ``Empty`` exception.
211 """
212
213 return self.get(False)
214
215 def _init(self, maxsize: int) -> None:
216 self.maxsize = maxsize
217 self.queue = deque()
218
219 def _qsize(self) -> int:
220 return len(self.queue)
221
222 def _empty(self) -> bool:
223 return not self.queue
224
225 def _full(self) -> bool:
226 return self.maxsize > 0 and len(self.queue) == self.maxsize
227
228 def _put(self, item: _T) -> None:
229 self.queue.append(item)
230
231 def _get(self) -> _T:
232 if self.use_lifo:
233 # LIFO
234 return self.queue.pop()
235 else:
236 # FIFO
237 return self.queue.popleft()
238
239
240class AsyncAdaptedQueue(QueueCommon[_T]):
241 if typing.TYPE_CHECKING:
242
243 @staticmethod
244 def await_(coroutine: Awaitable[Any]) -> _T: ...
245
246 else:
247 await_ = staticmethod(await_only)
248
249 def __init__(self, maxsize: int = 0, use_lifo: bool = False):
250 self.use_lifo = use_lifo
251 self.maxsize = maxsize
252
253 def empty(self) -> bool:
254 return self._queue.empty()
255
256 def full(self):
257 return self._queue.full()
258
259 def qsize(self):
260 return self._queue.qsize()
261
262 @memoized_property
263 def _queue(self) -> asyncio.Queue[_T]:
264 # Delay creation of the queue until it is first used, to avoid
265 # binding it to a possibly wrong event loop.
266 # By delaying the creation of the pool we accommodate the common
267 # usage pattern of instantiating the engine at module level, where a
268 # different event loop is in present compared to when the application
269 # is actually run.
270
271 queue: asyncio.Queue[_T]
272
273 if self.use_lifo:
274 queue = asyncio.LifoQueue(maxsize=self.maxsize)
275 else:
276 queue = asyncio.Queue(maxsize=self.maxsize)
277 return queue
278
279 def put_nowait(self, item: _T) -> None:
280 try:
281 self._queue.put_nowait(item)
282 except asyncio.QueueFull as err:
283 raise Full() from err
284
285 def put(
286 self, item: _T, block: bool = True, timeout: Optional[float] = None
287 ) -> None:
288 if not block:
289 return self.put_nowait(item)
290
291 try:
292 if timeout is not None:
293 self.await_(asyncio.wait_for(self._queue.put(item), timeout))
294 else:
295 self.await_(self._queue.put(item))
296 except (asyncio.QueueFull, asyncio.TimeoutError) as err:
297 raise Full() from err
298
299 def get_nowait(self) -> _T:
300 try:
301 return self._queue.get_nowait()
302 except asyncio.QueueEmpty as err:
303 raise Empty() from err
304
305 def get(self, block: bool = True, timeout: Optional[float] = None) -> _T:
306 if not block:
307 return self.get_nowait()
308
309 try:
310 if timeout is not None:
311 return self.await_(
312 asyncio.wait_for(self._queue.get(), timeout)
313 )
314 else:
315 return self.await_(self._queue.get())
316 except (asyncio.QueueEmpty, asyncio.TimeoutError) as err:
317 raise Empty() from err
318
319
320class FallbackAsyncAdaptedQueue(AsyncAdaptedQueue[_T]):
321 if not typing.TYPE_CHECKING:
322 await_ = staticmethod(await_fallback)