Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/util/queue.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

154 statements  

1# util/queue.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""An adaptation of Py2.3/2.4's Queue module which supports reentrant 

10behavior, using RLock instead of Lock for its mutex object. The 

11Queue object is used exclusively by the sqlalchemy.pool.QueuePool 

12class. 

13 

14This is to support the connection pool's usage of weakref callbacks to return 

15connections to the underlying Queue, which can in extremely 

16rare cases be invoked within the ``get()`` method of the Queue itself, 

17producing a ``put()`` inside the ``get()`` and therefore a reentrant 

18condition. 

19 

20""" 

21from __future__ import annotations 

22 

23import asyncio 

24from collections import deque 

25import threading 

26from time import time as _time 

27from typing import Any 

28from typing import Deque 

29from typing import Generic 

30from typing import Optional 

31from typing import TypeVar 

32 

33from .concurrency import await_ 

34from .langhelpers import memoized_property 

35 

36 

37_T = TypeVar("_T", bound=Any) 

38__all__ = ["Empty", "Full", "Queue"] 

39 

40 

41class Empty(Exception): 

42 "Exception raised by Queue.get(block=0)/get_nowait()." 

43 

44 pass 

45 

46 

47class Full(Exception): 

48 "Exception raised by Queue.put(block=0)/put_nowait()." 

49 

50 pass 

51 

52 

53class QueueCommon(Generic[_T]): 

54 maxsize: int 

55 use_lifo: bool 

56 

57 def __init__(self, maxsize: int = 0, use_lifo: bool = False): ... 

58 

59 def empty(self) -> bool: 

60 raise NotImplementedError() 

61 

62 def full(self) -> bool: 

63 raise NotImplementedError() 

64 

65 def qsize(self) -> int: 

66 raise NotImplementedError() 

67 

68 def put_nowait(self, item: _T) -> None: 

69 raise NotImplementedError() 

70 

71 def put( 

72 self, item: _T, block: bool = True, timeout: Optional[float] = None 

73 ) -> None: 

74 raise NotImplementedError() 

75 

76 def get_nowait(self) -> _T: 

77 raise NotImplementedError() 

78 

79 def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: 

80 raise NotImplementedError() 

81 

82 

83class Queue(QueueCommon[_T]): 

84 queue: Deque[_T] 

85 

86 def __init__(self, maxsize: int = 0, use_lifo: bool = False): 

87 """Initialize a queue object with a given maximum size. 

88 

89 If `maxsize` is <= 0, the queue size is infinite. 

90 

91 If `use_lifo` is True, this Queue acts like a Stack (LIFO). 

92 """ 

93 

94 self._init(maxsize) 

95 # mutex must be held whenever the queue is mutating. All methods 

96 # that acquire mutex must release it before returning. mutex 

97 # is shared between the two conditions, so acquiring and 

98 # releasing the conditions also acquires and releases mutex. 

99 self.mutex = threading.RLock() 

100 # Notify not_empty whenever an item is added to the queue; a 

101 # thread waiting to get is notified then. 

102 self.not_empty = threading.Condition(self.mutex) 

103 # Notify not_full whenever an item is removed from the queue; 

104 # a thread waiting to put is notified then. 

105 self.not_full = threading.Condition(self.mutex) 

106 # If this queue uses LIFO or FIFO 

107 self.use_lifo = use_lifo 

108 

109 def qsize(self) -> int: 

110 """Return the approximate size of the queue (not reliable!).""" 

111 

112 with self.mutex: 

113 return self._qsize() 

114 

115 def empty(self) -> bool: 

116 """Return True if the queue is empty, False otherwise (not 

117 reliable!).""" 

118 

119 with self.mutex: 

120 return self._empty() 

121 

122 def full(self) -> bool: 

123 """Return True if the queue is full, False otherwise (not 

124 reliable!).""" 

125 

126 with self.mutex: 

127 return self._full() 

128 

129 def put( 

130 self, item: _T, block: bool = True, timeout: Optional[float] = None 

131 ) -> None: 

132 """Put an item into the queue. 

133 

134 If optional args `block` is True and `timeout` is None (the 

135 default), block if necessary until a free slot is 

136 available. If `timeout` is a positive number, it blocks at 

137 most `timeout` seconds and raises the ``Full`` exception if no 

138 free slot was available within that time. Otherwise (`block` 

139 is false), put an item on the queue if a free slot is 

140 immediately available, else raise the ``Full`` exception 

141 (`timeout` is ignored in that case). 

142 """ 

143 

144 with self.not_full: 

145 if not block: 

146 if self._full(): 

147 raise Full 

148 elif timeout is None: 

149 while self._full(): 

150 self.not_full.wait() 

151 else: 

152 if timeout < 0: 

153 raise ValueError("'timeout' must be a positive number") 

154 endtime = _time() + timeout 

155 while self._full(): 

156 remaining = endtime - _time() 

157 if remaining <= 0.0: 

158 raise Full 

159 self.not_full.wait(remaining) 

160 self._put(item) 

161 self.not_empty.notify() 

162 

163 def put_nowait(self, item: _T) -> None: 

164 """Put an item into the queue without blocking. 

165 

166 Only enqueue the item if a free slot is immediately available. 

167 Otherwise raise the ``Full`` exception. 

168 """ 

169 return self.put(item, False) 

170 

171 def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: 

172 """Remove and return an item from the queue. 

173 

174 If optional args `block` is True and `timeout` is None (the 

175 default), block if necessary until an item is available. If 

176 `timeout` is a positive number, it blocks at most `timeout` 

177 seconds and raises the ``Empty`` exception if no item was 

178 available within that time. Otherwise (`block` is false), 

179 return an item if one is immediately available, else raise the 

180 ``Empty`` exception (`timeout` is ignored in that case). 

181 

182 """ 

183 with self.not_empty: 

184 if not block: 

185 if self._empty(): 

186 raise Empty 

187 elif timeout is None: 

188 while self._empty(): 

189 self.not_empty.wait() 

190 else: 

191 if timeout < 0: 

192 raise ValueError("'timeout' must be a positive number") 

193 endtime = _time() + timeout 

194 while self._empty(): 

195 remaining = endtime - _time() 

196 if remaining <= 0.0: 

197 raise Empty 

198 self.not_empty.wait(remaining) 

199 item = self._get() 

200 self.not_full.notify() 

201 return item 

202 

203 def get_nowait(self) -> _T: 

204 """Remove and return an item from the queue without blocking. 

205 

206 Only get an item if one is immediately available. Otherwise 

207 raise the ``Empty`` exception. 

208 """ 

209 

210 return self.get(False) 

211 

212 def _init(self, maxsize: int) -> None: 

213 self.maxsize = maxsize 

214 self.queue = deque() 

215 

216 def _qsize(self) -> int: 

217 return len(self.queue) 

218 

219 def _empty(self) -> bool: 

220 return not self.queue 

221 

222 def _full(self) -> bool: 

223 return self.maxsize > 0 and len(self.queue) == self.maxsize 

224 

225 def _put(self, item: _T) -> None: 

226 self.queue.append(item) 

227 

228 def _get(self) -> _T: 

229 if self.use_lifo: 

230 # LIFO 

231 return self.queue.pop() 

232 else: 

233 # FIFO 

234 return self.queue.popleft() 

235 

236 

237class AsyncAdaptedQueue(QueueCommon[_T]): 

238 def __init__(self, maxsize: int = 0, use_lifo: bool = False): 

239 self.use_lifo = use_lifo 

240 self.maxsize = maxsize 

241 

242 def empty(self) -> bool: 

243 return self._queue.empty() 

244 

245 def full(self): 

246 return self._queue.full() 

247 

248 def qsize(self): 

249 return self._queue.qsize() 

250 

251 @memoized_property 

252 def _queue(self) -> asyncio.Queue[_T]: 

253 # Delay creation of the queue until it is first used, to avoid 

254 # binding it to a possibly wrong event loop. 

255 # By delaying the creation of the pool we accommodate the common 

256 # usage pattern of instantiating the engine at module level, where a 

257 # different event loop is in present compared to when the application 

258 # is actually run. 

259 

260 queue: asyncio.Queue[_T] 

261 

262 if self.use_lifo: 

263 queue = asyncio.LifoQueue(maxsize=self.maxsize) 

264 else: 

265 queue = asyncio.Queue(maxsize=self.maxsize) 

266 return queue 

267 

268 def put_nowait(self, item: _T) -> None: 

269 try: 

270 self._queue.put_nowait(item) 

271 except asyncio.QueueFull as err: 

272 raise Full() from err 

273 

274 def put( 

275 self, item: _T, block: bool = True, timeout: Optional[float] = None 

276 ) -> None: 

277 if not block: 

278 return self.put_nowait(item) 

279 

280 try: 

281 if timeout is not None: 

282 await_(asyncio.wait_for(self._queue.put(item), timeout)) 

283 else: 

284 await_(self._queue.put(item)) 

285 except (asyncio.QueueFull, asyncio.TimeoutError) as err: 

286 raise Full() from err 

287 

288 def get_nowait(self) -> _T: 

289 try: 

290 return self._queue.get_nowait() 

291 except asyncio.QueueEmpty as err: 

292 raise Empty() from err 

293 

294 def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: 

295 if not block: 

296 return self.get_nowait() 

297 

298 try: 

299 if timeout is not None: 

300 return await_(asyncio.wait_for(self._queue.get(), timeout)) 

301 else: 

302 return await_(self._queue.get()) 

303 except (asyncio.QueueEmpty, asyncio.TimeoutError) as err: 

304 raise Empty() from err