Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/util/queue.py: 44%
133 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# util/queue.py
2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8"""An adaptation of Py2.3/2.4's Queue module which supports reentrant
9behavior, using RLock instead of Lock for its mutex object. The
10Queue object is used exclusively by the sqlalchemy.pool.QueuePool
11class.
13This is to support the connection pool's usage of weakref callbacks to return
14connections to the underlying Queue, which can in extremely
15rare cases be invoked within the ``get()`` method of the Queue itself,
16producing a ``put()`` inside the ``get()`` and therefore a reentrant
17condition.
19"""
21from collections import deque
22from time import time as _time
24from . import compat
25from .compat import threading
26from .concurrency import asyncio
27from .concurrency import await_fallback
28from .concurrency import await_only
29from .langhelpers import memoized_property
32__all__ = ["Empty", "Full", "Queue"]
35class Empty(Exception):
36 "Exception raised by Queue.get(block=0)/get_nowait()."
38 pass
41class Full(Exception):
42 "Exception raised by Queue.put(block=0)/put_nowait()."
44 pass
47class Queue:
48 def __init__(self, maxsize=0, use_lifo=False):
49 """Initialize a queue object with a given maximum size.
51 If `maxsize` is <= 0, the queue size is infinite.
53 If `use_lifo` is True, this Queue acts like a Stack (LIFO).
54 """
56 self._init(maxsize)
57 # mutex must be held whenever the queue is mutating. All methods
58 # that acquire mutex must release it before returning. mutex
59 # is shared between the two conditions, so acquiring and
60 # releasing the conditions also acquires and releases mutex.
61 self.mutex = threading.RLock()
62 # Notify not_empty whenever an item is added to the queue; a
63 # thread waiting to get is notified then.
64 self.not_empty = threading.Condition(self.mutex)
65 # Notify not_full whenever an item is removed from the queue;
66 # a thread waiting to put is notified then.
67 self.not_full = threading.Condition(self.mutex)
68 # If this queue uses LIFO or FIFO
69 self.use_lifo = use_lifo
71 def qsize(self):
72 """Return the approximate size of the queue (not reliable!)."""
74 with self.mutex:
75 return self._qsize()
77 def empty(self):
78 """Return True if the queue is empty, False otherwise (not
79 reliable!)."""
81 with self.mutex:
82 return self._empty()
84 def full(self):
85 """Return True if the queue is full, False otherwise (not
86 reliable!)."""
88 with self.mutex:
89 return self._full()
91 def put(self, item, block=True, timeout=None):
92 """Put an item into the queue.
94 If optional args `block` is True and `timeout` is None (the
95 default), block if necessary until a free slot is
96 available. If `timeout` is a positive number, it blocks at
97 most `timeout` seconds and raises the ``Full`` exception if no
98 free slot was available within that time. Otherwise (`block`
99 is false), put an item on the queue if a free slot is
100 immediately available, else raise the ``Full`` exception
101 (`timeout` is ignored in that case).
102 """
104 with self.not_full:
105 if not block:
106 if self._full():
107 raise Full
108 elif timeout is None:
109 while self._full():
110 self.not_full.wait()
111 else:
112 if timeout < 0:
113 raise ValueError("'timeout' must be a positive number")
114 endtime = _time() + timeout
115 while self._full():
116 remaining = endtime - _time()
117 if remaining <= 0.0:
118 raise Full
119 self.not_full.wait(remaining)
120 self._put(item)
121 self.not_empty.notify()
123 def put_nowait(self, item):
124 """Put an item into the queue without blocking.
126 Only enqueue the item if a free slot is immediately available.
127 Otherwise raise the ``Full`` exception.
128 """
129 return self.put(item, False)
131 def get(self, block=True, timeout=None):
132 """Remove and return an item from the queue.
134 If optional args `block` is True and `timeout` is None (the
135 default), block if necessary until an item is available. If
136 `timeout` is a positive number, it blocks at most `timeout`
137 seconds and raises the ``Empty`` exception if no item was
138 available within that time. Otherwise (`block` is false),
139 return an item if one is immediately available, else raise the
140 ``Empty`` exception (`timeout` is ignored in that case).
142 """
143 with self.not_empty:
144 if not block:
145 if self._empty():
146 raise Empty
147 elif timeout is None:
148 while self._empty():
149 self.not_empty.wait()
150 else:
151 if timeout < 0:
152 raise ValueError("'timeout' must be a positive number")
153 endtime = _time() + timeout
154 while self._empty():
155 remaining = endtime - _time()
156 if remaining <= 0.0:
157 raise Empty
158 self.not_empty.wait(remaining)
159 item = self._get()
160 self.not_full.notify()
161 return item
163 def get_nowait(self):
164 """Remove and return an item from the queue without blocking.
166 Only get an item if one is immediately available. Otherwise
167 raise the ``Empty`` exception.
168 """
170 return self.get(False)
172 # Override these methods to implement other queue organizations
173 # (e.g. stack or priority queue).
174 # These will only be called with appropriate locks held
176 # Initialize the queue representation
177 def _init(self, maxsize):
178 self.maxsize = maxsize
179 self.queue = deque()
181 def _qsize(self):
182 return len(self.queue)
184 # Check whether the queue is empty
185 def _empty(self):
186 return not self.queue
188 # Check whether the queue is full
189 def _full(self):
190 return self.maxsize > 0 and len(self.queue) == self.maxsize
192 # Put a new item in the queue
193 def _put(self, item):
194 self.queue.append(item)
196 # Get an item from the queue
197 def _get(self):
198 if self.use_lifo:
199 # LIFO
200 return self.queue.pop()
201 else:
202 # FIFO
203 return self.queue.popleft()
206class AsyncAdaptedQueue:
207 await_ = staticmethod(await_only)
209 def __init__(self, maxsize=0, use_lifo=False):
210 self.use_lifo = use_lifo
211 self.maxsize = maxsize
213 def empty(self):
214 return self._queue.empty()
216 def full(self):
217 return self._queue.full()
219 def qsize(self):
220 return self._queue.qsize()
222 @memoized_property
223 def _queue(self):
224 # Delay creation of the queue until it is first used, to avoid
225 # binding it to a possibly wrong event loop.
226 # By delaying the creation of the pool we accommodate the common
227 # usage pattern of instantiating the engine at module level, where a
228 # different event loop is in present compared to when the application
229 # is actually run.
231 if self.use_lifo:
232 queue = asyncio.LifoQueue(maxsize=self.maxsize)
233 else:
234 queue = asyncio.Queue(maxsize=self.maxsize)
235 return queue
237 def put_nowait(self, item):
238 try:
239 return self._queue.put_nowait(item)
240 except asyncio.QueueFull as err:
241 compat.raise_(
242 Full(),
243 replace_context=err,
244 )
246 def put(self, item, block=True, timeout=None):
247 if not block:
248 return self.put_nowait(item)
250 try:
251 if timeout is not None:
252 return self.await_(
253 asyncio.wait_for(self._queue.put(item), timeout)
254 )
255 else:
256 return self.await_(self._queue.put(item))
257 except (asyncio.QueueFull, asyncio.TimeoutError) as err:
258 compat.raise_(
259 Full(),
260 replace_context=err,
261 )
263 def get_nowait(self):
264 try:
265 return self._queue.get_nowait()
266 except asyncio.QueueEmpty as err:
267 compat.raise_(
268 Empty(),
269 replace_context=err,
270 )
272 def get(self, block=True, timeout=None):
273 if not block:
274 return self.get_nowait()
276 try:
277 if timeout is not None:
278 return self.await_(
279 asyncio.wait_for(self._queue.get(), timeout)
280 )
281 else:
282 return self.await_(self._queue.get())
283 except (asyncio.QueueEmpty, asyncio.TimeoutError) as err:
284 compat.raise_(
285 Empty(),
286 replace_context=err,
287 )
290class FallbackAsyncAdaptedQueue(AsyncAdaptedQueue):
291 await_ = staticmethod(await_fallback)