Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/util/queue.py: 32%

133 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# util/queue.py 

2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""An adaptation of Py2.3/2.4's Queue module which supports reentrant 

9behavior, using RLock instead of Lock for its mutex object. The 

10Queue object is used exclusively by the sqlalchemy.pool.QueuePool 

11class. 

12 

13This is to support the connection pool's usage of weakref callbacks to return 

14connections to the underlying Queue, which can in extremely 

15rare cases be invoked within the ``get()`` method of the Queue itself, 

16producing a ``put()`` inside the ``get()`` and therefore a reentrant 

17condition. 

18 

19""" 

20 

21from collections import deque 

22from time import time as _time 

23 

24from . import compat 

25from .compat import threading 

26from .concurrency import asyncio 

27from .concurrency import await_fallback 

28from .concurrency import await_only 

29from .langhelpers import memoized_property 

30 

31 

32__all__ = ["Empty", "Full", "Queue"] 

33 

34 

35class Empty(Exception): 

36 "Exception raised by Queue.get(block=0)/get_nowait()." 

37 

38 pass 

39 

40 

41class Full(Exception): 

42 "Exception raised by Queue.put(block=0)/put_nowait()." 

43 

44 pass 

45 

46 

47class Queue: 

48 def __init__(self, maxsize=0, use_lifo=False): 

49 """Initialize a queue object with a given maximum size. 

50 

51 If `maxsize` is <= 0, the queue size is infinite. 

52 

53 If `use_lifo` is True, this Queue acts like a Stack (LIFO). 

54 """ 

55 

56 self._init(maxsize) 

57 # mutex must be held whenever the queue is mutating. All methods 

58 # that acquire mutex must release it before returning. mutex 

59 # is shared between the two conditions, so acquiring and 

60 # releasing the conditions also acquires and releases mutex. 

61 self.mutex = threading.RLock() 

62 # Notify not_empty whenever an item is added to the queue; a 

63 # thread waiting to get is notified then. 

64 self.not_empty = threading.Condition(self.mutex) 

65 # Notify not_full whenever an item is removed from the queue; 

66 # a thread waiting to put is notified then. 

67 self.not_full = threading.Condition(self.mutex) 

68 # If this queue uses LIFO or FIFO 

69 self.use_lifo = use_lifo 

70 

71 def qsize(self): 

72 """Return the approximate size of the queue (not reliable!).""" 

73 

74 with self.mutex: 

75 return self._qsize() 

76 

77 def empty(self): 

78 """Return True if the queue is empty, False otherwise (not 

79 reliable!).""" 

80 

81 with self.mutex: 

82 return self._empty() 

83 

84 def full(self): 

85 """Return True if the queue is full, False otherwise (not 

86 reliable!).""" 

87 

88 with self.mutex: 

89 return self._full() 

90 

91 def put(self, item, block=True, timeout=None): 

92 """Put an item into the queue. 

93 

94 If optional args `block` is True and `timeout` is None (the 

95 default), block if necessary until a free slot is 

96 available. If `timeout` is a positive number, it blocks at 

97 most `timeout` seconds and raises the ``Full`` exception if no 

98 free slot was available within that time. Otherwise (`block` 

99 is false), put an item on the queue if a free slot is 

100 immediately available, else raise the ``Full`` exception 

101 (`timeout` is ignored in that case). 

102 """ 

103 

104 with self.not_full: 

105 if not block: 

106 if self._full(): 

107 raise Full 

108 elif timeout is None: 

109 while self._full(): 

110 self.not_full.wait() 

111 else: 

112 if timeout < 0: 

113 raise ValueError("'timeout' must be a positive number") 

114 endtime = _time() + timeout 

115 while self._full(): 

116 remaining = endtime - _time() 

117 if remaining <= 0.0: 

118 raise Full 

119 self.not_full.wait(remaining) 

120 self._put(item) 

121 self.not_empty.notify() 

122 

123 def put_nowait(self, item): 

124 """Put an item into the queue without blocking. 

125 

126 Only enqueue the item if a free slot is immediately available. 

127 Otherwise raise the ``Full`` exception. 

128 """ 

129 return self.put(item, False) 

130 

131 def get(self, block=True, timeout=None): 

132 """Remove and return an item from the queue. 

133 

134 If optional args `block` is True and `timeout` is None (the 

135 default), block if necessary until an item is available. If 

136 `timeout` is a positive number, it blocks at most `timeout` 

137 seconds and raises the ``Empty`` exception if no item was 

138 available within that time. Otherwise (`block` is false), 

139 return an item if one is immediately available, else raise the 

140 ``Empty`` exception (`timeout` is ignored in that case). 

141 

142 """ 

143 with self.not_empty: 

144 if not block: 

145 if self._empty(): 

146 raise Empty 

147 elif timeout is None: 

148 while self._empty(): 

149 self.not_empty.wait() 

150 else: 

151 if timeout < 0: 

152 raise ValueError("'timeout' must be a positive number") 

153 endtime = _time() + timeout 

154 while self._empty(): 

155 remaining = endtime - _time() 

156 if remaining <= 0.0: 

157 raise Empty 

158 self.not_empty.wait(remaining) 

159 item = self._get() 

160 self.not_full.notify() 

161 return item 

162 

163 def get_nowait(self): 

164 """Remove and return an item from the queue without blocking. 

165 

166 Only get an item if one is immediately available. Otherwise 

167 raise the ``Empty`` exception. 

168 """ 

169 

170 return self.get(False) 

171 

172 # Override these methods to implement other queue organizations 

173 # (e.g. stack or priority queue). 

174 # These will only be called with appropriate locks held 

175 

176 # Initialize the queue representation 

177 def _init(self, maxsize): 

178 self.maxsize = maxsize 

179 self.queue = deque() 

180 

181 def _qsize(self): 

182 return len(self.queue) 

183 

184 # Check whether the queue is empty 

185 def _empty(self): 

186 return not self.queue 

187 

188 # Check whether the queue is full 

189 def _full(self): 

190 return self.maxsize > 0 and len(self.queue) == self.maxsize 

191 

192 # Put a new item in the queue 

193 def _put(self, item): 

194 self.queue.append(item) 

195 

196 # Get an item from the queue 

197 def _get(self): 

198 if self.use_lifo: 

199 # LIFO 

200 return self.queue.pop() 

201 else: 

202 # FIFO 

203 return self.queue.popleft() 

204 

205 

206class AsyncAdaptedQueue: 

207 await_ = staticmethod(await_only) 

208 

209 def __init__(self, maxsize=0, use_lifo=False): 

210 self.use_lifo = use_lifo 

211 self.maxsize = maxsize 

212 

213 def empty(self): 

214 return self._queue.empty() 

215 

216 def full(self): 

217 return self._queue.full() 

218 

219 def qsize(self): 

220 return self._queue.qsize() 

221 

222 @memoized_property 

223 def _queue(self): 

224 # Delay creation of the queue until it is first used, to avoid 

225 # binding it to a possibly wrong event loop. 

226 # By delaying the creation of the pool we accommodate the common 

227 # usage pattern of instantiating the engine at module level, where a 

228 # different event loop is in present compared to when the application 

229 # is actually run. 

230 

231 if self.use_lifo: 

232 queue = asyncio.LifoQueue(maxsize=self.maxsize) 

233 else: 

234 queue = asyncio.Queue(maxsize=self.maxsize) 

235 return queue 

236 

237 def put_nowait(self, item): 

238 try: 

239 return self._queue.put_nowait(item) 

240 except asyncio.QueueFull as err: 

241 compat.raise_( 

242 Full(), 

243 replace_context=err, 

244 ) 

245 

246 def put(self, item, block=True, timeout=None): 

247 if not block: 

248 return self.put_nowait(item) 

249 

250 try: 

251 if timeout is not None: 

252 return self.await_( 

253 asyncio.wait_for(self._queue.put(item), timeout) 

254 ) 

255 else: 

256 return self.await_(self._queue.put(item)) 

257 except (asyncio.QueueFull, asyncio.TimeoutError) as err: 

258 compat.raise_( 

259 Full(), 

260 replace_context=err, 

261 ) 

262 

263 def get_nowait(self): 

264 try: 

265 return self._queue.get_nowait() 

266 except asyncio.QueueEmpty as err: 

267 compat.raise_( 

268 Empty(), 

269 replace_context=err, 

270 ) 

271 

272 def get(self, block=True, timeout=None): 

273 if not block: 

274 return self.get_nowait() 

275 

276 try: 

277 if timeout is not None: 

278 return self.await_( 

279 asyncio.wait_for(self._queue.get(), timeout) 

280 ) 

281 else: 

282 return self.await_(self._queue.get()) 

283 except (asyncio.QueueEmpty, asyncio.TimeoutError) as err: 

284 compat.raise_( 

285 Empty(), 

286 replace_context=err, 

287 ) 

288 

289 

290class FallbackAsyncAdaptedQueue(AsyncAdaptedQueue): 

291 await_ = staticmethod(await_fallback)