Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/sqlalchemy/util/queue.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

164 statements  

1# util/queue.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""An adaptation of Py2.3/2.4's Queue module which supports reentrant 

10behavior, using RLock instead of Lock for its mutex object. The 

11Queue object is used exclusively by the sqlalchemy.pool.QueuePool 

12class. 

13 

14This is to support the connection pool's usage of weakref callbacks to return 

15connections to the underlying Queue, which can in extremely 

16rare cases be invoked within the ``get()`` method of the Queue itself, 

17producing a ``put()`` inside the ``get()`` and therefore a reentrant 

18condition. 

19 

20""" 

21from __future__ import annotations 

22 

23import asyncio 

24from collections import deque 

25import threading 

26from time import time as _time 

27import typing 

28from typing import Any 

29from typing import Awaitable 

30from typing import Deque 

31from typing import Generic 

32from typing import Optional 

33from typing import TypeVar 

34 

35from .concurrency import await_fallback 

36from .concurrency import await_only 

37from .langhelpers import memoized_property 

38 

39 

40_T = TypeVar("_T", bound=Any) 

41__all__ = ["Empty", "Full", "Queue"] 

42 

43 

44class Empty(Exception): 

45 "Exception raised by Queue.get(block=0)/get_nowait()." 

46 

47 pass 

48 

49 

50class Full(Exception): 

51 "Exception raised by Queue.put(block=0)/put_nowait()." 

52 

53 pass 

54 

55 

56class QueueCommon(Generic[_T]): 

57 maxsize: int 

58 use_lifo: bool 

59 

60 def __init__(self, maxsize: int = 0, use_lifo: bool = False): ... 

61 

62 def empty(self) -> bool: 

63 raise NotImplementedError() 

64 

65 def full(self) -> bool: 

66 raise NotImplementedError() 

67 

68 def qsize(self) -> int: 

69 raise NotImplementedError() 

70 

71 def put_nowait(self, item: _T) -> None: 

72 raise NotImplementedError() 

73 

74 def put( 

75 self, item: _T, block: bool = True, timeout: Optional[float] = None 

76 ) -> None: 

77 raise NotImplementedError() 

78 

79 def get_nowait(self) -> _T: 

80 raise NotImplementedError() 

81 

82 def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: 

83 raise NotImplementedError() 

84 

85 

86class Queue(QueueCommon[_T]): 

87 queue: Deque[_T] 

88 

89 def __init__(self, maxsize: int = 0, use_lifo: bool = False): 

90 """Initialize a queue object with a given maximum size. 

91 

92 If `maxsize` is <= 0, the queue size is infinite. 

93 

94 If `use_lifo` is True, this Queue acts like a Stack (LIFO). 

95 """ 

96 

97 self._init(maxsize) 

98 # mutex must be held whenever the queue is mutating. All methods 

99 # that acquire mutex must release it before returning. mutex 

100 # is shared between the two conditions, so acquiring and 

101 # releasing the conditions also acquires and releases mutex. 

102 self.mutex = threading.RLock() 

103 # Notify not_empty whenever an item is added to the queue; a 

104 # thread waiting to get is notified then. 

105 self.not_empty = threading.Condition(self.mutex) 

106 # Notify not_full whenever an item is removed from the queue; 

107 # a thread waiting to put is notified then. 

108 self.not_full = threading.Condition(self.mutex) 

109 # If this queue uses LIFO or FIFO 

110 self.use_lifo = use_lifo 

111 

112 def qsize(self) -> int: 

113 """Return the approximate size of the queue (not reliable!).""" 

114 

115 with self.mutex: 

116 return self._qsize() 

117 

118 def empty(self) -> bool: 

119 """Return True if the queue is empty, False otherwise (not 

120 reliable!).""" 

121 

122 with self.mutex: 

123 return self._empty() 

124 

125 def full(self) -> bool: 

126 """Return True if the queue is full, False otherwise (not 

127 reliable!).""" 

128 

129 with self.mutex: 

130 return self._full() 

131 

132 def put( 

133 self, item: _T, block: bool = True, timeout: Optional[float] = None 

134 ) -> None: 

135 """Put an item into the queue. 

136 

137 If optional args `block` is True and `timeout` is None (the 

138 default), block if necessary until a free slot is 

139 available. If `timeout` is a positive number, it blocks at 

140 most `timeout` seconds and raises the ``Full`` exception if no 

141 free slot was available within that time. Otherwise (`block` 

142 is false), put an item on the queue if a free slot is 

143 immediately available, else raise the ``Full`` exception 

144 (`timeout` is ignored in that case). 

145 """ 

146 

147 with self.not_full: 

148 if not block: 

149 if self._full(): 

150 raise Full 

151 elif timeout is None: 

152 while self._full(): 

153 self.not_full.wait() 

154 else: 

155 if timeout < 0: 

156 raise ValueError("'timeout' must be a positive number") 

157 endtime = _time() + timeout 

158 while self._full(): 

159 remaining = endtime - _time() 

160 if remaining <= 0.0: 

161 raise Full 

162 self.not_full.wait(remaining) 

163 self._put(item) 

164 self.not_empty.notify() 

165 

166 def put_nowait(self, item: _T) -> None: 

167 """Put an item into the queue without blocking. 

168 

169 Only enqueue the item if a free slot is immediately available. 

170 Otherwise raise the ``Full`` exception. 

171 """ 

172 return self.put(item, False) 

173 

174 def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: 

175 """Remove and return an item from the queue. 

176 

177 If optional args `block` is True and `timeout` is None (the 

178 default), block if necessary until an item is available. If 

179 `timeout` is a positive number, it blocks at most `timeout` 

180 seconds and raises the ``Empty`` exception if no item was 

181 available within that time. Otherwise (`block` is false), 

182 return an item if one is immediately available, else raise the 

183 ``Empty`` exception (`timeout` is ignored in that case). 

184 

185 """ 

186 with self.not_empty: 

187 if not block: 

188 if self._empty(): 

189 raise Empty 

190 elif timeout is None: 

191 while self._empty(): 

192 self.not_empty.wait() 

193 else: 

194 if timeout < 0: 

195 raise ValueError("'timeout' must be a positive number") 

196 endtime = _time() + timeout 

197 while self._empty(): 

198 remaining = endtime - _time() 

199 if remaining <= 0.0: 

200 raise Empty 

201 self.not_empty.wait(remaining) 

202 item = self._get() 

203 self.not_full.notify() 

204 return item 

205 

206 def get_nowait(self) -> _T: 

207 """Remove and return an item from the queue without blocking. 

208 

209 Only get an item if one is immediately available. Otherwise 

210 raise the ``Empty`` exception. 

211 """ 

212 

213 return self.get(False) 

214 

215 def _init(self, maxsize: int) -> None: 

216 self.maxsize = maxsize 

217 self.queue = deque() 

218 

219 def _qsize(self) -> int: 

220 return len(self.queue) 

221 

222 def _empty(self) -> bool: 

223 return not self.queue 

224 

225 def _full(self) -> bool: 

226 return self.maxsize > 0 and len(self.queue) == self.maxsize 

227 

228 def _put(self, item: _T) -> None: 

229 self.queue.append(item) 

230 

231 def _get(self) -> _T: 

232 if self.use_lifo: 

233 # LIFO 

234 return self.queue.pop() 

235 else: 

236 # FIFO 

237 return self.queue.popleft() 

238 

239 

240class AsyncAdaptedQueue(QueueCommon[_T]): 

241 if typing.TYPE_CHECKING: 

242 

243 @staticmethod 

244 def await_(coroutine: Awaitable[Any]) -> _T: ... 

245 

246 else: 

247 await_ = staticmethod(await_only) 

248 

249 def __init__(self, maxsize: int = 0, use_lifo: bool = False): 

250 self.use_lifo = use_lifo 

251 self.maxsize = maxsize 

252 

253 def empty(self) -> bool: 

254 return self._queue.empty() 

255 

256 def full(self): 

257 return self._queue.full() 

258 

259 def qsize(self): 

260 return self._queue.qsize() 

261 

262 @memoized_property 

263 def _queue(self) -> asyncio.Queue[_T]: 

264 # Delay creation of the queue until it is first used, to avoid 

265 # binding it to a possibly wrong event loop. 

266 # By delaying the creation of the pool we accommodate the common 

267 # usage pattern of instantiating the engine at module level, where a 

268 # different event loop is in present compared to when the application 

269 # is actually run. 

270 

271 queue: asyncio.Queue[_T] 

272 

273 if self.use_lifo: 

274 queue = asyncio.LifoQueue(maxsize=self.maxsize) 

275 else: 

276 queue = asyncio.Queue(maxsize=self.maxsize) 

277 return queue 

278 

279 def put_nowait(self, item: _T) -> None: 

280 try: 

281 self._queue.put_nowait(item) 

282 except asyncio.QueueFull as err: 

283 raise Full() from err 

284 

285 def put( 

286 self, item: _T, block: bool = True, timeout: Optional[float] = None 

287 ) -> None: 

288 if not block: 

289 return self.put_nowait(item) 

290 

291 try: 

292 if timeout is not None: 

293 self.await_(asyncio.wait_for(self._queue.put(item), timeout)) 

294 else: 

295 self.await_(self._queue.put(item)) 

296 except (asyncio.QueueFull, asyncio.TimeoutError) as err: 

297 raise Full() from err 

298 

299 def get_nowait(self) -> _T: 

300 try: 

301 return self._queue.get_nowait() 

302 except asyncio.QueueEmpty as err: 

303 raise Empty() from err 

304 

305 def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: 

306 if not block: 

307 return self.get_nowait() 

308 

309 try: 

310 if timeout is not None: 

311 return self.await_( 

312 asyncio.wait_for(self._queue.get(), timeout) 

313 ) 

314 else: 

315 return self.await_(self._queue.get()) 

316 except (asyncio.QueueEmpty, asyncio.TimeoutError) as err: 

317 raise Empty() from err 

318 

319 

320class FallbackAsyncAdaptedQueue(AsyncAdaptedQueue[_T]): 

321 if not typing.TYPE_CHECKING: 

322 await_ = staticmethod(await_fallback)