Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tornado/queues.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

148 statements  

1# Copyright 2015 The Tornado Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); you may 

4# not use this file except in compliance with the License. You may obtain 

5# a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

12# License for the specific language governing permissions and limitations 

13# under the License. 

14 

15"""Asynchronous queues for coroutines. These classes are very similar 

16to those provided in the standard library's `asyncio package 

17<https://docs.python.org/3/library/asyncio-queue.html>`_. 

18 

19.. warning:: 

20 

21 Unlike the standard library's `queue` module, the classes defined here 

22 are *not* thread-safe. To use these queues from another thread, 

23 use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread 

24 before calling any queue methods. 

25 

26""" 

27 

28import collections 

29import datetime 

30import heapq 

31 

32from tornado import gen, ioloop 

33from tornado.concurrent import Future, future_set_result_unless_cancelled 

34from tornado.locks import Event 

35 

36from typing import Union, TypeVar, Generic, Awaitable, Optional 

37import typing 

38 

39if typing.TYPE_CHECKING: 

40 from typing import Deque, Tuple, Any # noqa: F401 

41 

42_T = TypeVar("_T") 

43 

44__all__ = ["Queue", "PriorityQueue", "LifoQueue", "QueueFull", "QueueEmpty"] 

45 

46 

47class QueueEmpty(Exception): 

48 """Raised by `.Queue.get_nowait` when the queue has no items.""" 

49 

50 pass 

51 

52 

53class QueueFull(Exception): 

54 """Raised by `.Queue.put_nowait` when a queue is at its maximum size.""" 

55 

56 pass 

57 

58 

59def _set_timeout( 

60 future: Future, timeout: Union[None, float, datetime.timedelta] 

61) -> None: 

62 if timeout: 

63 

64 def on_timeout() -> None: 

65 if not future.done(): 

66 future.set_exception(gen.TimeoutError()) 

67 

68 io_loop = ioloop.IOLoop.current() 

69 timeout_handle = io_loop.add_timeout(timeout, on_timeout) 

70 future.add_done_callback(lambda _: io_loop.remove_timeout(timeout_handle)) 

71 

72 

73class _QueueIterator(Generic[_T]): 

74 def __init__(self, q: "Queue[_T]") -> None: 

75 self.q = q 

76 

77 def __anext__(self) -> Awaitable[_T]: 

78 return self.q.get() 

79 

80 

81class Queue(Generic[_T]): 

82 """Coordinate producer and consumer coroutines. 

83 

84 If maxsize is 0 (the default) the queue size is unbounded. 

85 

86 .. testcode:: 

87 

88 import asyncio 

89 from tornado.ioloop import IOLoop 

90 from tornado.queues import Queue 

91 

92 q = Queue(maxsize=2) 

93 

94 async def consumer(): 

95 async for item in q: 

96 try: 

97 print('Doing work on %s' % item) 

98 await asyncio.sleep(0.01) 

99 finally: 

100 q.task_done() 

101 

102 async def producer(): 

103 for item in range(5): 

104 await q.put(item) 

105 print('Put %s' % item) 

106 

107 async def main(): 

108 # Start consumer without waiting (since it never finishes). 

109 IOLoop.current().spawn_callback(consumer) 

110 await producer() # Wait for producer to put all tasks. 

111 await q.join() # Wait for consumer to finish all tasks. 

112 print('Done') 

113 

114 asyncio.run(main()) 

115 

116 .. testoutput:: 

117 

118 Put 0 

119 Put 1 

120 Doing work on 0 

121 Put 2 

122 Doing work on 1 

123 Put 3 

124 Doing work on 2 

125 Put 4 

126 Doing work on 3 

127 Doing work on 4 

128 Done 

129 

130 

131 In versions of Python without native coroutines (before 3.5), 

132 ``consumer()`` could be written as:: 

133 

134 @gen.coroutine 

135 def consumer(): 

136 while True: 

137 item = yield q.get() 

138 try: 

139 print('Doing work on %s' % item) 

140 yield gen.sleep(0.01) 

141 finally: 

142 q.task_done() 

143 

144 .. versionchanged:: 4.3 

145 Added ``async for`` support in Python 3.5. 

146 

147 """ 

148 

149 # Exact type depends on subclass. Could be another generic 

150 # parameter and use protocols to be more precise here. 

151 _queue = None # type: Any 

152 

153 def __init__(self, maxsize: int = 0) -> None: 

154 if maxsize is None: 

155 raise TypeError("maxsize can't be None") 

156 

157 if maxsize < 0: 

158 raise ValueError("maxsize can't be negative") 

159 

160 self._maxsize = maxsize 

161 self._init() 

162 self._getters = collections.deque([]) # type: Deque[Future[_T]] 

163 self._putters = collections.deque([]) # type: Deque[Tuple[_T, Future[None]]] 

164 self._unfinished_tasks = 0 

165 self._finished = Event() 

166 self._finished.set() 

167 

168 @property 

169 def maxsize(self) -> int: 

170 """Number of items allowed in the queue.""" 

171 return self._maxsize 

172 

173 def qsize(self) -> int: 

174 """Number of items in the queue.""" 

175 return len(self._queue) 

176 

177 def empty(self) -> bool: 

178 return not self._queue 

179 

180 def full(self) -> bool: 

181 if self.maxsize == 0: 

182 return False 

183 else: 

184 return self.qsize() >= self.maxsize 

185 

186 def put( 

187 self, item: _T, timeout: Optional[Union[float, datetime.timedelta]] = None 

188 ) -> "Future[None]": 

189 """Put an item into the queue, perhaps waiting until there is room. 

190 

191 Returns a Future, which raises `tornado.util.TimeoutError` after a 

192 timeout. 

193 

194 ``timeout`` may be a number denoting a time (on the same 

195 scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a 

196 `datetime.timedelta` object for a deadline relative to the 

197 current time. 

198 """ 

199 future = Future() # type: Future[None] 

200 try: 

201 self.put_nowait(item) 

202 except QueueFull: 

203 self._putters.append((item, future)) 

204 _set_timeout(future, timeout) 

205 else: 

206 future.set_result(None) 

207 return future 

208 

209 def put_nowait(self, item: _T) -> None: 

210 """Put an item into the queue without blocking. 

211 

212 If no free slot is immediately available, raise `QueueFull`. 

213 """ 

214 self._consume_expired() 

215 if self._getters: 

216 assert self.empty(), "queue non-empty, why are getters waiting?" 

217 getter = self._getters.popleft() 

218 self.__put_internal(item) 

219 future_set_result_unless_cancelled(getter, self._get()) 

220 elif self.full(): 

221 raise QueueFull 

222 else: 

223 self.__put_internal(item) 

224 

225 def get( 

226 self, timeout: Optional[Union[float, datetime.timedelta]] = None 

227 ) -> Awaitable[_T]: 

228 """Remove and return an item from the queue. 

229 

230 Returns an awaitable which resolves once an item is available, or raises 

231 `tornado.util.TimeoutError` after a timeout. 

232 

233 ``timeout`` may be a number denoting a time (on the same 

234 scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a 

235 `datetime.timedelta` object for a deadline relative to the 

236 current time. 

237 

238 .. note:: 

239 

240 The ``timeout`` argument of this method differs from that 

241 of the standard library's `queue.Queue.get`. That method 

242 interprets numeric values as relative timeouts; this one 

243 interprets them as absolute deadlines and requires 

244 ``timedelta`` objects for relative timeouts (consistent 

245 with other timeouts in Tornado). 

246 

247 """ 

248 future = Future() # type: Future[_T] 

249 try: 

250 future.set_result(self.get_nowait()) 

251 except QueueEmpty: 

252 self._getters.append(future) 

253 _set_timeout(future, timeout) 

254 return future 

255 

256 def get_nowait(self) -> _T: 

257 """Remove and return an item from the queue without blocking. 

258 

259 Return an item if one is immediately available, else raise 

260 `QueueEmpty`. 

261 """ 

262 self._consume_expired() 

263 if self._putters: 

264 assert self.full(), "queue not full, why are putters waiting?" 

265 item, putter = self._putters.popleft() 

266 self.__put_internal(item) 

267 future_set_result_unless_cancelled(putter, None) 

268 return self._get() 

269 elif self.qsize(): 

270 return self._get() 

271 else: 

272 raise QueueEmpty 

273 

274 def task_done(self) -> None: 

275 """Indicate that a formerly enqueued task is complete. 

276 

277 Used by queue consumers. For each `.get` used to fetch a task, a 

278 subsequent call to `.task_done` tells the queue that the processing 

279 on the task is complete. 

280 

281 If a `.join` is blocking, it resumes when all items have been 

282 processed; that is, when every `.put` is matched by a `.task_done`. 

283 

284 Raises `ValueError` if called more times than `.put`. 

285 """ 

286 if self._unfinished_tasks <= 0: 

287 raise ValueError("task_done() called too many times") 

288 self._unfinished_tasks -= 1 

289 if self._unfinished_tasks == 0: 

290 self._finished.set() 

291 

292 def join( 

293 self, timeout: Optional[Union[float, datetime.timedelta]] = None 

294 ) -> Awaitable[None]: 

295 """Block until all items in the queue are processed. 

296 

297 Returns an awaitable, which raises `tornado.util.TimeoutError` after a 

298 timeout. 

299 """ 

300 return self._finished.wait(timeout) 

301 

302 def __aiter__(self) -> _QueueIterator[_T]: 

303 return _QueueIterator(self) 

304 

305 # These three are overridable in subclasses. 

306 def _init(self) -> None: 

307 self._queue = collections.deque() 

308 

309 def _get(self) -> _T: 

310 return self._queue.popleft() 

311 

312 def _put(self, item: _T) -> None: 

313 self._queue.append(item) 

314 

315 # End of the overridable methods. 

316 

317 def __put_internal(self, item: _T) -> None: 

318 self._unfinished_tasks += 1 

319 self._finished.clear() 

320 self._put(item) 

321 

322 def _consume_expired(self) -> None: 

323 # Remove timed-out waiters. 

324 while self._putters and self._putters[0][1].done(): 

325 self._putters.popleft() 

326 

327 while self._getters and self._getters[0].done(): 

328 self._getters.popleft() 

329 

330 def __repr__(self) -> str: 

331 return f"<{type(self).__name__} at {hex(id(self))} {self._format()}>" 

332 

333 def __str__(self) -> str: 

334 return f"<{type(self).__name__} {self._format()}>" 

335 

336 def _format(self) -> str: 

337 result = f"maxsize={self.maxsize!r}" 

338 if getattr(self, "_queue", None): 

339 result += " queue=%r" % self._queue 

340 if self._getters: 

341 result += " getters[%s]" % len(self._getters) 

342 if self._putters: 

343 result += " putters[%s]" % len(self._putters) 

344 if self._unfinished_tasks: 

345 result += " tasks=%s" % self._unfinished_tasks 

346 return result 

347 

348 

349class PriorityQueue(Queue): 

350 """A `.Queue` that retrieves entries in priority order, lowest first. 

351 

352 Entries are typically tuples like ``(priority number, data)``. 

353 

354 .. testcode:: 

355 

356 import asyncio 

357 from tornado.queues import PriorityQueue 

358 

359 async def main(): 

360 q = PriorityQueue() 

361 q.put((1, 'medium-priority item')) 

362 q.put((0, 'high-priority item')) 

363 q.put((10, 'low-priority item')) 

364 

365 print(await q.get()) 

366 print(await q.get()) 

367 print(await q.get()) 

368 

369 asyncio.run(main()) 

370 

371 .. testoutput:: 

372 

373 (0, 'high-priority item') 

374 (1, 'medium-priority item') 

375 (10, 'low-priority item') 

376 """ 

377 

378 def _init(self) -> None: 

379 self._queue = [] 

380 

381 def _put(self, item: _T) -> None: 

382 heapq.heappush(self._queue, item) 

383 

384 def _get(self) -> _T: # type: ignore[type-var] 

385 return heapq.heappop(self._queue) 

386 

387 

388class LifoQueue(Queue): 

389 """A `.Queue` that retrieves the most recently put items first. 

390 

391 .. testcode:: 

392 

393 import asyncio 

394 from tornado.queues import LifoQueue 

395 

396 async def main(): 

397 q = LifoQueue() 

398 q.put(3) 

399 q.put(2) 

400 q.put(1) 

401 

402 print(await q.get()) 

403 print(await q.get()) 

404 print(await q.get()) 

405 

406 asyncio.run(main()) 

407 

408 .. testoutput:: 

409 

410 1 

411 2 

412 3 

413 """ 

414 

415 def _init(self) -> None: 

416 self._queue = [] 

417 

418 def _put(self, item: _T) -> None: 

419 self._queue.append(item) 

420 

421 def _get(self) -> _T: # type: ignore[type-var] 

422 return self._queue.pop()