1# util/queue.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""An adaptation of Py2.3/2.4's Queue module which supports reentrant
10behavior, using RLock instead of Lock for its mutex object. The
11Queue object is used exclusively by the sqlalchemy.pool.QueuePool
12class.
13
14This is to support the connection pool's usage of weakref callbacks to return
15connections to the underlying Queue, which can in extremely
16rare cases be invoked within the ``get()`` method of the Queue itself,
17producing a ``put()`` inside the ``get()`` and therefore a reentrant
18condition.
19
20"""
21from __future__ import annotations
22
23import asyncio
24from collections import deque
25import threading
26from time import time as _time
27from typing import Any
28from typing import Deque
29from typing import Generic
30from typing import Optional
31from typing import TypeVar
32
33from .concurrency import await_
34from .langhelpers import memoized_property
35
36
37_T = TypeVar("_T", bound=Any)
38__all__ = ["Empty", "Full", "Queue"]
39
40
41class Empty(Exception):
42 "Exception raised by Queue.get(block=0)/get_nowait()."
43
44 pass
45
46
47class Full(Exception):
48 "Exception raised by Queue.put(block=0)/put_nowait()."
49
50 pass
51
52
53class QueueCommon(Generic[_T]):
54 maxsize: int
55 use_lifo: bool
56
57 def __init__(self, maxsize: int = 0, use_lifo: bool = False): ...
58
59 def empty(self) -> bool:
60 raise NotImplementedError()
61
62 def full(self) -> bool:
63 raise NotImplementedError()
64
65 def qsize(self) -> int:
66 raise NotImplementedError()
67
68 def put_nowait(self, item: _T) -> None:
69 raise NotImplementedError()
70
71 def put(
72 self, item: _T, block: bool = True, timeout: Optional[float] = None
73 ) -> None:
74 raise NotImplementedError()
75
76 def get_nowait(self) -> _T:
77 raise NotImplementedError()
78
79 def get(self, block: bool = True, timeout: Optional[float] = None) -> _T:
80 raise NotImplementedError()
81
82
83class Queue(QueueCommon[_T]):
84 queue: Deque[_T]
85
86 def __init__(self, maxsize: int = 0, use_lifo: bool = False):
87 """Initialize a queue object with a given maximum size.
88
89 If `maxsize` is <= 0, the queue size is infinite.
90
91 If `use_lifo` is True, this Queue acts like a Stack (LIFO).
92 """
93
94 self._init(maxsize)
95 # mutex must be held whenever the queue is mutating. All methods
96 # that acquire mutex must release it before returning. mutex
97 # is shared between the two conditions, so acquiring and
98 # releasing the conditions also acquires and releases mutex.
99 self.mutex = threading.RLock()
100 # Notify not_empty whenever an item is added to the queue; a
101 # thread waiting to get is notified then.
102 self.not_empty = threading.Condition(self.mutex)
103 # Notify not_full whenever an item is removed from the queue;
104 # a thread waiting to put is notified then.
105 self.not_full = threading.Condition(self.mutex)
106 # If this queue uses LIFO or FIFO
107 self.use_lifo = use_lifo
108
109 def qsize(self) -> int:
110 """Return the approximate size of the queue (not reliable!)."""
111
112 with self.mutex:
113 return self._qsize()
114
115 def empty(self) -> bool:
116 """Return True if the queue is empty, False otherwise (not
117 reliable!)."""
118
119 with self.mutex:
120 return self._empty()
121
122 def full(self) -> bool:
123 """Return True if the queue is full, False otherwise (not
124 reliable!)."""
125
126 with self.mutex:
127 return self._full()
128
129 def put(
130 self, item: _T, block: bool = True, timeout: Optional[float] = None
131 ) -> None:
132 """Put an item into the queue.
133
134 If optional args `block` is True and `timeout` is None (the
135 default), block if necessary until a free slot is
136 available. If `timeout` is a positive number, it blocks at
137 most `timeout` seconds and raises the ``Full`` exception if no
138 free slot was available within that time. Otherwise (`block`
139 is false), put an item on the queue if a free slot is
140 immediately available, else raise the ``Full`` exception
141 (`timeout` is ignored in that case).
142 """
143
144 with self.not_full:
145 if not block:
146 if self._full():
147 raise Full
148 elif timeout is None:
149 while self._full():
150 self.not_full.wait()
151 else:
152 if timeout < 0:
153 raise ValueError("'timeout' must be a positive number")
154 endtime = _time() + timeout
155 while self._full():
156 remaining = endtime - _time()
157 if remaining <= 0.0:
158 raise Full
159 self.not_full.wait(remaining)
160 self._put(item)
161 self.not_empty.notify()
162
163 def put_nowait(self, item: _T) -> None:
164 """Put an item into the queue without blocking.
165
166 Only enqueue the item if a free slot is immediately available.
167 Otherwise raise the ``Full`` exception.
168 """
169 return self.put(item, False)
170
171 def get(self, block: bool = True, timeout: Optional[float] = None) -> _T:
172 """Remove and return an item from the queue.
173
174 If optional args `block` is True and `timeout` is None (the
175 default), block if necessary until an item is available. If
176 `timeout` is a positive number, it blocks at most `timeout`
177 seconds and raises the ``Empty`` exception if no item was
178 available within that time. Otherwise (`block` is false),
179 return an item if one is immediately available, else raise the
180 ``Empty`` exception (`timeout` is ignored in that case).
181
182 """
183 with self.not_empty:
184 if not block:
185 if self._empty():
186 raise Empty
187 elif timeout is None:
188 while self._empty():
189 self.not_empty.wait()
190 else:
191 if timeout < 0:
192 raise ValueError("'timeout' must be a positive number")
193 endtime = _time() + timeout
194 while self._empty():
195 remaining = endtime - _time()
196 if remaining <= 0.0:
197 raise Empty
198 self.not_empty.wait(remaining)
199 item = self._get()
200 self.not_full.notify()
201 return item
202
203 def get_nowait(self) -> _T:
204 """Remove and return an item from the queue without blocking.
205
206 Only get an item if one is immediately available. Otherwise
207 raise the ``Empty`` exception.
208 """
209
210 return self.get(False)
211
212 def _init(self, maxsize: int) -> None:
213 self.maxsize = maxsize
214 self.queue = deque()
215
216 def _qsize(self) -> int:
217 return len(self.queue)
218
219 def _empty(self) -> bool:
220 return not self.queue
221
222 def _full(self) -> bool:
223 return self.maxsize > 0 and len(self.queue) == self.maxsize
224
225 def _put(self, item: _T) -> None:
226 self.queue.append(item)
227
228 def _get(self) -> _T:
229 if self.use_lifo:
230 # LIFO
231 return self.queue.pop()
232 else:
233 # FIFO
234 return self.queue.popleft()
235
236
237class AsyncAdaptedQueue(QueueCommon[_T]):
238 def __init__(self, maxsize: int = 0, use_lifo: bool = False):
239 self.use_lifo = use_lifo
240 self.maxsize = maxsize
241
242 def empty(self) -> bool:
243 return self._queue.empty()
244
245 def full(self):
246 return self._queue.full()
247
248 def qsize(self):
249 return self._queue.qsize()
250
251 @memoized_property
252 def _queue(self) -> asyncio.Queue[_T]:
253 # Delay creation of the queue until it is first used, to avoid
254 # binding it to a possibly wrong event loop.
255 # By delaying the creation of the pool we accommodate the common
256 # usage pattern of instantiating the engine at module level, where a
257 # different event loop is in present compared to when the application
258 # is actually run.
259
260 queue: asyncio.Queue[_T]
261
262 if self.use_lifo:
263 queue = asyncio.LifoQueue(maxsize=self.maxsize)
264 else:
265 queue = asyncio.Queue(maxsize=self.maxsize)
266 return queue
267
268 def put_nowait(self, item: _T) -> None:
269 try:
270 self._queue.put_nowait(item)
271 except asyncio.QueueFull as err:
272 raise Full() from err
273
274 def put(
275 self, item: _T, block: bool = True, timeout: Optional[float] = None
276 ) -> None:
277 if not block:
278 return self.put_nowait(item)
279
280 try:
281 if timeout is not None:
282 await_(asyncio.wait_for(self._queue.put(item), timeout))
283 else:
284 await_(self._queue.put(item))
285 except (asyncio.QueueFull, asyncio.TimeoutError) as err:
286 raise Full() from err
287
288 def get_nowait(self) -> _T:
289 try:
290 return self._queue.get_nowait()
291 except asyncio.QueueEmpty as err:
292 raise Empty() from err
293
294 def get(self, block: bool = True, timeout: Optional[float] = None) -> _T:
295 if not block:
296 return self.get_nowait()
297
298 try:
299 if timeout is not None:
300 return await_(asyncio.wait_for(self._queue.get(), timeout))
301 else:
302 return await_(self._queue.get())
303 except (asyncio.QueueEmpty, asyncio.TimeoutError) as err:
304 raise Empty() from err