Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/ioloop.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

278 statements  

1# 

2# Copyright 2009 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""An I/O event loop for non-blocking sockets. 

17 

18In Tornado 6.0, `.IOLoop` is a wrapper around the `asyncio` event loop, with a 

19slightly different interface. The `.IOLoop` interface is now provided primarily 

20for backwards compatibility; new code should generally use the `asyncio` event 

21loop interface directly. The `IOLoop.current` class method provides the 

22`IOLoop` instance corresponding to the running `asyncio` event loop. 

23 

24""" 

25 

26import asyncio 

27import concurrent.futures 

28import datetime 

29import functools 

30import numbers 

31import os 

32import sys 

33import time 

34import math 

35import random 

36import warnings 

37from inspect import isawaitable 

38 

39from tornado.concurrent import ( 

40 Future, 

41 is_future, 

42 chain_future, 

43 future_set_exc_info, 

44 future_add_done_callback, 

45) 

46from tornado.log import app_log 

47from tornado.util import Configurable, TimeoutError, import_object 

48 

49import typing 

50from typing import Union, Any, Type, Optional, Callable, TypeVar, Tuple, Awaitable 

51 

52if typing.TYPE_CHECKING: 

53 from typing import Dict, List, Set # noqa: F401 

54 

55 from typing_extensions import Protocol 

56else: 

57 Protocol = object 

58 

59 

60class _Selectable(Protocol): 

61 def fileno(self) -> int: 

62 pass 

63 

64 def close(self) -> None: 

65 pass 

66 

67 

68_T = TypeVar("_T") 

69_S = TypeVar("_S", bound=_Selectable) 

70 

71 

72class IOLoop(Configurable): 

73 """An I/O event loop. 

74 

75 As of Tornado 6.0, `IOLoop` is a wrapper around the `asyncio` event loop. 

76 

77 Example usage for a simple TCP server: 

78 

79 .. testcode:: 

80 

81 import asyncio 

82 import errno 

83 import functools 

84 import socket 

85 

86 import tornado 

87 from tornado.iostream import IOStream 

88 

89 async def handle_connection(connection, address): 

90 stream = IOStream(connection) 

91 message = await stream.read_until_close() 

92 print("message from client:", message.decode().strip()) 

93 

94 def connection_ready(sock, fd, events): 

95 while True: 

96 try: 

97 connection, address = sock.accept() 

98 except BlockingIOError: 

99 return 

100 connection.setblocking(0) 

101 io_loop = tornado.ioloop.IOLoop.current() 

102 io_loop.spawn_callback(handle_connection, connection, address) 

103 

104 async def main(): 

105 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 

106 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

107 sock.setblocking(0) 

108 sock.bind(("", 8888)) 

109 sock.listen(128) 

110 

111 io_loop = tornado.ioloop.IOLoop.current() 

112 callback = functools.partial(connection_ready, sock) 

113 io_loop.add_handler(sock.fileno(), callback, io_loop.READ) 

114 await asyncio.Event().wait() 

115 

116 if __name__ == "__main__": 

117 asyncio.run(main()) 

118 

119 .. testoutput:: 

120 :hide: 

121 

122 Most applications should not attempt to construct an `IOLoop` directly, 

123 and instead initialize the `asyncio` event loop and use `IOLoop.current()`. 

124 In some cases, such as in test frameworks when initializing an `IOLoop` 

125 to be run in a secondary thread, it may be appropriate to construct 

126 an `IOLoop` with ``IOLoop(make_current=False)``. 

127 

128 In general, an `IOLoop` cannot survive a fork or be shared across processes 

129 in any way. When multiple processes are being used, each process should 

130 create its own `IOLoop`, which also implies that any objects which depend on 

131 the `IOLoop` (such as `.AsyncHTTPClient`) must also be created in the child 

132 processes. As a guideline, anything that starts processes (including the 

133 `tornado.process` and `multiprocessing` modules) should do so as early as 

134 possible, ideally the first thing the application does after loading its 

135 configuration, and *before* any calls to `.IOLoop.start` or `asyncio.run`. 

136 

137 .. versionchanged:: 4.2 

138 Added the ``make_current`` keyword argument to the `IOLoop` 

139 constructor. 

140 

141 .. versionchanged:: 5.0 

142 

143 Uses the `asyncio` event loop by default. The ``IOLoop.configure`` method 

144 cannot be used on Python 3 except to redundantly specify the `asyncio` 

145 event loop. 

146 

147 .. versionchanged:: 6.3 

148 ``make_current=True`` is now the default when creating an IOLoop - 

149 previously the default was to make the event loop current if there wasn't 

150 already a current one. 

151 """ 

152 

153 # These constants were originally based on constants from the epoll module. 

154 NONE = 0 

155 READ = 0x001 

156 WRITE = 0x004 

157 ERROR = 0x018 

158 

159 # In Python 3, _ioloop_for_asyncio maps from asyncio loops to IOLoops. 

160 _ioloop_for_asyncio = dict() # type: Dict[asyncio.AbstractEventLoop, IOLoop] 

161 

162 # Maintain a set of all pending tasks to follow the warning in the docs 

163 # of asyncio.create_tasks: 

164 # https://docs.python.org/3.11/library/asyncio-task.html#asyncio.create_task 

165 # This ensures that all pending tasks have a strong reference so they 

166 # will not be garbage collected before they are finished. 

167 # (Thus avoiding "task was destroyed but it is pending" warnings) 

168 # An analogous change has been proposed in cpython for 3.13: 

169 # https://github.com/python/cpython/issues/91887 

170 # If that change is accepted, this can eventually be removed. 

171 # If it is not, we will consider the rationale and may remove this. 

172 _pending_tasks = set() # type: Set[Future] 

173 

174 @classmethod 

175 def configure( 

176 cls, impl: "Union[None, str, Type[Configurable]]", **kwargs: Any 

177 ) -> None: 

178 from tornado.platform.asyncio import BaseAsyncIOLoop 

179 

180 if isinstance(impl, str): 

181 impl = import_object(impl) 

182 if isinstance(impl, type) and not issubclass(impl, BaseAsyncIOLoop): 

183 raise RuntimeError("only AsyncIOLoop is allowed when asyncio is available") 

184 super(IOLoop, cls).configure(impl, **kwargs) 

185 

186 @staticmethod 

187 def instance() -> "IOLoop": 

188 """Deprecated alias for `IOLoop.current()`. 

189 

190 .. versionchanged:: 5.0 

191 

192 Previously, this method returned a global singleton 

193 `IOLoop`, in contrast with the per-thread `IOLoop` returned 

194 by `current()`. In nearly all cases the two were the same 

195 (when they differed, it was generally used from non-Tornado 

196 threads to communicate back to the main thread's `IOLoop`). 

197 This distinction is not present in `asyncio`, so in order 

198 to facilitate integration with that package `instance()` 

199 was changed to be an alias to `current()`. Applications 

200 using the cross-thread communications aspect of 

201 `instance()` should instead set their own global variable 

202 to point to the `IOLoop` they want to use. 

203 

204 .. deprecated:: 5.0 

205 """ 

206 return IOLoop.current() 

207 

208 def install(self) -> None: 

209 """Deprecated alias for `make_current()`. 

210 

211 .. versionchanged:: 5.0 

212 

213 Previously, this method would set this `IOLoop` as the 

214 global singleton used by `IOLoop.instance()`. Now that 

215 `instance()` is an alias for `current()`, `install()` 

216 is an alias for `make_current()`. 

217 

218 .. deprecated:: 5.0 

219 """ 

220 self.make_current() 

221 

222 @staticmethod 

223 def clear_instance() -> None: 

224 """Deprecated alias for `clear_current()`. 

225 

226 .. versionchanged:: 5.0 

227 

228 Previously, this method would clear the `IOLoop` used as 

229 the global singleton by `IOLoop.instance()`. Now that 

230 `instance()` is an alias for `current()`, 

231 `clear_instance()` is an alias for `clear_current()`. 

232 

233 .. deprecated:: 5.0 

234 

235 """ 

236 IOLoop.clear_current() 

237 

238 @typing.overload 

239 @staticmethod 

240 def current() -> "IOLoop": 

241 pass 

242 

243 @typing.overload 

244 @staticmethod 

245 def current(instance: bool = True) -> Optional["IOLoop"]: # noqa: F811 

246 pass 

247 

248 @staticmethod 

249 def current(instance: bool = True) -> Optional["IOLoop"]: # noqa: F811 

250 """Returns the current thread's `IOLoop`. 

251 

252 If an `IOLoop` is currently running or has been marked as 

253 current by `make_current`, returns that instance. If there is 

254 no current `IOLoop` and ``instance`` is true, creates one. 

255 

256 .. versionchanged:: 4.1 

257 Added ``instance`` argument to control the fallback to 

258 `IOLoop.instance()`. 

259 .. versionchanged:: 5.0 

260 On Python 3, control of the current `IOLoop` is delegated 

261 to `asyncio`, with this and other methods as pass-through accessors. 

262 The ``instance`` argument now controls whether an `IOLoop` 

263 is created automatically when there is none, instead of 

264 whether we fall back to `IOLoop.instance()` (which is now 

265 an alias for this method). ``instance=False`` is deprecated, 

266 since even if we do not create an `IOLoop`, this method 

267 may initialize the asyncio loop. 

268 

269 .. deprecated:: 6.2 

270 It is deprecated to call ``IOLoop.current()`` when no `asyncio` 

271 event loop is running. 

272 """ 

273 try: 

274 loop = asyncio.get_event_loop() 

275 except RuntimeError: 

276 if not instance: 

277 return None 

278 # Create a new asyncio event loop for this thread. 

279 loop = asyncio.new_event_loop() 

280 asyncio.set_event_loop(loop) 

281 

282 try: 

283 return IOLoop._ioloop_for_asyncio[loop] 

284 except KeyError: 

285 if instance: 

286 from tornado.platform.asyncio import AsyncIOMainLoop 

287 

288 current = AsyncIOMainLoop() # type: Optional[IOLoop] 

289 else: 

290 current = None 

291 return current 

292 

293 def make_current(self) -> None: 

294 """Makes this the `IOLoop` for the current thread. 

295 

296 An `IOLoop` automatically becomes current for its thread 

297 when it is started, but it is sometimes useful to call 

298 `make_current` explicitly before starting the `IOLoop`, 

299 so that code run at startup time can find the right 

300 instance. 

301 

302 .. versionchanged:: 4.1 

303 An `IOLoop` created while there is no current `IOLoop` 

304 will automatically become current. 

305 

306 .. versionchanged:: 5.0 

307 This method also sets the current `asyncio` event loop. 

308 

309 .. deprecated:: 6.2 

310 Setting and clearing the current event loop through Tornado is 

311 deprecated. Use ``asyncio.set_event_loop`` instead if you need this. 

312 """ 

313 warnings.warn( 

314 "make_current is deprecated; start the event loop first", 

315 DeprecationWarning, 

316 stacklevel=2, 

317 ) 

318 self._make_current() 

319 

320 def _make_current(self) -> None: 

321 # The asyncio event loops override this method. 

322 raise NotImplementedError() 

323 

324 @staticmethod 

325 def clear_current() -> None: 

326 """Clears the `IOLoop` for the current thread. 

327 

328 Intended primarily for use by test frameworks in between tests. 

329 

330 .. versionchanged:: 5.0 

331 This method also clears the current `asyncio` event loop. 

332 .. deprecated:: 6.2 

333 """ 

334 warnings.warn( 

335 "clear_current is deprecated", 

336 DeprecationWarning, 

337 stacklevel=2, 

338 ) 

339 IOLoop._clear_current() 

340 

341 @staticmethod 

342 def _clear_current() -> None: 

343 old = IOLoop.current(instance=False) 

344 if old is not None: 

345 old._clear_current_hook() 

346 

347 def _clear_current_hook(self) -> None: 

348 """Instance method called when an IOLoop ceases to be current. 

349 

350 May be overridden by subclasses as a counterpart to make_current. 

351 """ 

352 pass 

353 

354 @classmethod 

355 def configurable_base(cls) -> Type[Configurable]: 

356 return IOLoop 

357 

358 @classmethod 

359 def configurable_default(cls) -> Type[Configurable]: 

360 from tornado.platform.asyncio import AsyncIOLoop 

361 

362 return AsyncIOLoop 

363 

364 def initialize(self, make_current: bool = True) -> None: 

365 if make_current: 

366 self._make_current() 

367 

368 def close(self, all_fds: bool = False) -> None: 

369 """Closes the `IOLoop`, freeing any resources used. 

370 

371 If ``all_fds`` is true, all file descriptors registered on the 

372 IOLoop will be closed (not just the ones created by the 

373 `IOLoop` itself). 

374 

375 Many applications will only use a single `IOLoop` that runs for the 

376 entire lifetime of the process. In that case closing the `IOLoop` 

377 is not necessary since everything will be cleaned up when the 

378 process exits. `IOLoop.close` is provided mainly for scenarios 

379 such as unit tests, which create and destroy a large number of 

380 ``IOLoops``. 

381 

382 An `IOLoop` must be completely stopped before it can be closed. This 

383 means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must 

384 be allowed to return before attempting to call `IOLoop.close()`. 

385 Therefore the call to `close` will usually appear just after 

386 the call to `start` rather than near the call to `stop`. 

387 

388 .. versionchanged:: 3.1 

389 If the `IOLoop` implementation supports non-integer objects 

390 for "file descriptors", those objects will have their 

391 ``close`` method when ``all_fds`` is true. 

392 """ 

393 raise NotImplementedError() 

394 

395 @typing.overload 

396 def add_handler( 

397 self, fd: int, handler: Callable[[int, int], None], events: int 

398 ) -> None: 

399 pass 

400 

401 @typing.overload # noqa: F811 

402 def add_handler( 

403 self, fd: _S, handler: Callable[[_S, int], None], events: int 

404 ) -> None: 

405 pass 

406 

407 def add_handler( # noqa: F811 

408 self, fd: Union[int, _Selectable], handler: Callable[..., None], events: int 

409 ) -> None: 

410 """Registers the given handler to receive the given events for ``fd``. 

411 

412 The ``fd`` argument may either be an integer file descriptor or 

413 a file-like object with a ``fileno()`` and ``close()`` method. 

414 

415 The ``events`` argument is a bitwise or of the constants 

416 ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``. 

417 

418 When an event occurs, ``handler(fd, events)`` will be run. 

419 

420 .. versionchanged:: 4.0 

421 Added the ability to pass file-like objects in addition to 

422 raw file descriptors. 

423 """ 

424 raise NotImplementedError() 

425 

426 def update_handler(self, fd: Union[int, _Selectable], events: int) -> None: 

427 """Changes the events we listen for ``fd``. 

428 

429 .. versionchanged:: 4.0 

430 Added the ability to pass file-like objects in addition to 

431 raw file descriptors. 

432 """ 

433 raise NotImplementedError() 

434 

435 def remove_handler(self, fd: Union[int, _Selectable]) -> None: 

436 """Stop listening for events on ``fd``. 

437 

438 .. versionchanged:: 4.0 

439 Added the ability to pass file-like objects in addition to 

440 raw file descriptors. 

441 """ 

442 raise NotImplementedError() 

443 

444 def start(self) -> None: 

445 """Starts the I/O loop. 

446 

447 The loop will run until one of the callbacks calls `stop()`, which 

448 will make the loop stop after the current event iteration completes. 

449 """ 

450 raise NotImplementedError() 

451 

452 def stop(self) -> None: 

453 """Stop the I/O loop. 

454 

455 If the event loop is not currently running, the next call to `start()` 

456 will return immediately. 

457 

458 Note that even after `stop` has been called, the `IOLoop` is not 

459 completely stopped until `IOLoop.start` has also returned. 

460 Some work that was scheduled before the call to `stop` may still 

461 be run before the `IOLoop` shuts down. 

462 """ 

463 raise NotImplementedError() 

464 

465 def run_sync(self, func: Callable, timeout: Optional[float] = None) -> Any: 

466 """Starts the `IOLoop`, runs the given function, and stops the loop. 

467 

468 The function must return either an awaitable object or 

469 ``None``. If the function returns an awaitable object, the 

470 `IOLoop` will run until the awaitable is resolved (and 

471 `run_sync()` will return the awaitable's result). If it raises 

472 an exception, the `IOLoop` will stop and the exception will be 

473 re-raised to the caller. 

474 

475 The keyword-only argument ``timeout`` may be used to set 

476 a maximum duration for the function. If the timeout expires, 

477 a `asyncio.TimeoutError` is raised. 

478 

479 This method is useful to allow asynchronous calls in a 

480 ``main()`` function:: 

481 

482 async def main(): 

483 # do stuff... 

484 

485 if __name__ == '__main__': 

486 IOLoop.current().run_sync(main) 

487 

488 .. versionchanged:: 4.3 

489 Returning a non-``None``, non-awaitable value is now an error. 

490 

491 .. versionchanged:: 5.0 

492 If a timeout occurs, the ``func`` coroutine will be cancelled. 

493 

494 .. versionchanged:: 6.2 

495 ``tornado.util.TimeoutError`` is now an alias to ``asyncio.TimeoutError``. 

496 """ 

497 future_cell = [None] # type: List[Optional[Future]] 

498 

499 def run() -> None: 

500 try: 

501 result = func() 

502 if result is not None: 

503 from tornado.gen import convert_yielded 

504 

505 result = convert_yielded(result) 

506 except Exception: 

507 fut = Future() # type: Future[Any] 

508 future_cell[0] = fut 

509 future_set_exc_info(fut, sys.exc_info()) 

510 else: 

511 if is_future(result): 

512 future_cell[0] = result 

513 else: 

514 fut = Future() 

515 future_cell[0] = fut 

516 fut.set_result(result) 

517 assert future_cell[0] is not None 

518 self.add_future(future_cell[0], lambda future: self.stop()) 

519 

520 self.add_callback(run) 

521 if timeout is not None: 

522 

523 def timeout_callback() -> None: 

524 # If we can cancel the future, do so and wait on it. If not, 

525 # Just stop the loop and return with the task still pending. 

526 # (If we neither cancel nor wait for the task, a warning 

527 # will be logged). 

528 assert future_cell[0] is not None 

529 if not future_cell[0].cancel(): 

530 self.stop() 

531 

532 timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback) 

533 self.start() 

534 if timeout is not None: 

535 self.remove_timeout(timeout_handle) 

536 assert future_cell[0] is not None 

537 if future_cell[0].cancelled() or not future_cell[0].done(): 

538 raise TimeoutError("Operation timed out after %s seconds" % timeout) 

539 return future_cell[0].result() 

540 

541 def time(self) -> float: 

542 """Returns the current time according to the `IOLoop`'s clock. 

543 

544 The return value is a floating-point number relative to an 

545 unspecified time in the past. 

546 

547 Historically, the IOLoop could be customized to use e.g. 

548 `time.monotonic` instead of `time.time`, but this is not 

549 currently supported and so this method is equivalent to 

550 `time.time`. 

551 

552 """ 

553 return time.time() 

554 

555 def add_timeout( 

556 self, 

557 deadline: Union[float, datetime.timedelta], 

558 callback: Callable, 

559 *args: Any, 

560 **kwargs: Any 

561 ) -> object: 

562 """Runs the ``callback`` at the time ``deadline`` from the I/O loop. 

563 

564 Returns an opaque handle that may be passed to 

565 `remove_timeout` to cancel. 

566 

567 ``deadline`` may be a number denoting a time (on the same 

568 scale as `IOLoop.time`, normally `time.time`), or a 

569 `datetime.timedelta` object for a deadline relative to the 

570 current time. Since Tornado 4.0, `call_later` is a more 

571 convenient alternative for the relative case since it does not 

572 require a timedelta object. 

573 

574 Note that it is not safe to call `add_timeout` from other threads. 

575 Instead, you must use `add_callback` to transfer control to the 

576 `IOLoop`'s thread, and then call `add_timeout` from there. 

577 

578 Subclasses of IOLoop must implement either `add_timeout` or 

579 `call_at`; the default implementations of each will call 

580 the other. `call_at` is usually easier to implement, but 

581 subclasses that wish to maintain compatibility with Tornado 

582 versions prior to 4.0 must use `add_timeout` instead. 

583 

584 .. versionchanged:: 4.0 

585 Now passes through ``*args`` and ``**kwargs`` to the callback. 

586 """ 

587 if isinstance(deadline, numbers.Real): 

588 return self.call_at(deadline, callback, *args, **kwargs) 

589 elif isinstance(deadline, datetime.timedelta): 

590 return self.call_at( 

591 self.time() + deadline.total_seconds(), callback, *args, **kwargs 

592 ) 

593 else: 

594 raise TypeError("Unsupported deadline %r" % deadline) 

595 

596 def call_later( 

597 self, delay: float, callback: Callable, *args: Any, **kwargs: Any 

598 ) -> object: 

599 """Runs the ``callback`` after ``delay`` seconds have passed. 

600 

601 Returns an opaque handle that may be passed to `remove_timeout` 

602 to cancel. Note that unlike the `asyncio` method of the same 

603 name, the returned object does not have a ``cancel()`` method. 

604 

605 See `add_timeout` for comments on thread-safety and subclassing. 

606 

607 .. versionadded:: 4.0 

608 """ 

609 return self.call_at(self.time() + delay, callback, *args, **kwargs) 

610 

611 def call_at( 

612 self, when: float, callback: Callable, *args: Any, **kwargs: Any 

613 ) -> object: 

614 """Runs the ``callback`` at the absolute time designated by ``when``. 

615 

616 ``when`` must be a number using the same reference point as 

617 `IOLoop.time`. 

618 

619 Returns an opaque handle that may be passed to `remove_timeout` 

620 to cancel. Note that unlike the `asyncio` method of the same 

621 name, the returned object does not have a ``cancel()`` method. 

622 

623 See `add_timeout` for comments on thread-safety and subclassing. 

624 

625 .. versionadded:: 4.0 

626 """ 

627 return self.add_timeout(when, callback, *args, **kwargs) 

628 

629 def remove_timeout(self, timeout: object) -> None: 

630 """Cancels a pending timeout. 

631 

632 The argument is a handle as returned by `add_timeout`. It is 

633 safe to call `remove_timeout` even if the callback has already 

634 been run. 

635 """ 

636 raise NotImplementedError() 

637 

638 def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None: 

639 """Calls the given callback on the next I/O loop iteration. 

640 

641 It is safe to call this method from any thread at any time, 

642 except from a signal handler. Note that this is the **only** 

643 method in `IOLoop` that makes this thread-safety guarantee; all 

644 other interaction with the `IOLoop` must be done from that 

645 `IOLoop`'s thread. `add_callback()` may be used to transfer 

646 control from other threads to the `IOLoop`'s thread. 

647 """ 

648 raise NotImplementedError() 

649 

650 def add_callback_from_signal( 

651 self, callback: Callable, *args: Any, **kwargs: Any 

652 ) -> None: 

653 """Calls the given callback on the next I/O loop iteration. 

654 

655 Intended to be afe for use from a Python signal handler; should not be 

656 used otherwise. 

657 

658 .. deprecated:: 6.4 

659 Use ``asyncio.AbstractEventLoop.add_signal_handler`` instead. 

660 This method is suspected to have been broken since Tornado 5.0 and 

661 will be removed in version 7.0. 

662 """ 

663 raise NotImplementedError() 

664 

665 def spawn_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None: 

666 """Calls the given callback on the next IOLoop iteration. 

667 

668 As of Tornado 6.0, this method is equivalent to `add_callback`. 

669 

670 .. versionadded:: 4.0 

671 """ 

672 self.add_callback(callback, *args, **kwargs) 

673 

674 def add_future( 

675 self, 

676 future: "Union[Future[_T], concurrent.futures.Future[_T]]", 

677 callback: Callable[["Future[_T]"], None], 

678 ) -> None: 

679 """Schedules a callback on the ``IOLoop`` when the given 

680 `.Future` is finished. 

681 

682 The callback is invoked with one argument, the 

683 `.Future`. 

684 

685 This method only accepts `.Future` objects and not other 

686 awaitables (unlike most of Tornado where the two are 

687 interchangeable). 

688 """ 

689 if isinstance(future, Future): 

690 # Note that we specifically do not want the inline behavior of 

691 # tornado.concurrent.future_add_done_callback. We always want 

692 # this callback scheduled on the next IOLoop iteration (which 

693 # asyncio.Future always does). 

694 # 

695 # Wrap the callback in self._run_callback so we control 

696 # the error logging (i.e. it goes to tornado.log.app_log 

697 # instead of asyncio's log). 

698 future.add_done_callback( 

699 lambda f: self._run_callback(functools.partial(callback, f)) 

700 ) 

701 else: 

702 assert is_future(future) 

703 # For concurrent futures, we use self.add_callback, so 

704 # it's fine if future_add_done_callback inlines that call. 

705 future_add_done_callback(future, lambda f: self.add_callback(callback, f)) 

706 

707 def run_in_executor( 

708 self, 

709 executor: Optional[concurrent.futures.Executor], 

710 func: Callable[..., _T], 

711 *args: Any 

712 ) -> "Future[_T]": 

713 """Runs a function in a ``concurrent.futures.Executor``. If 

714 ``executor`` is ``None``, the IO loop's default executor will be used. 

715 

716 Use `functools.partial` to pass keyword arguments to ``func``. 

717 

718 .. versionadded:: 5.0 

719 """ 

720 if executor is None: 

721 if not hasattr(self, "_executor"): 

722 from tornado.process import cpu_count 

723 

724 self._executor = concurrent.futures.ThreadPoolExecutor( 

725 max_workers=(cpu_count() * 5) 

726 ) # type: concurrent.futures.Executor 

727 executor = self._executor 

728 c_future = executor.submit(func, *args) 

729 # Concurrent Futures are not usable with await. Wrap this in a 

730 # Tornado Future instead, using self.add_future for thread-safety. 

731 t_future = Future() # type: Future[_T] 

732 self.add_future(c_future, lambda f: chain_future(f, t_future)) 

733 return t_future 

734 

735 def set_default_executor(self, executor: concurrent.futures.Executor) -> None: 

736 """Sets the default executor to use with :meth:`run_in_executor`. 

737 

738 .. versionadded:: 5.0 

739 """ 

740 self._executor = executor 

741 

742 def _run_callback(self, callback: Callable[[], Any]) -> None: 

743 """Runs a callback with error handling. 

744 

745 .. versionchanged:: 6.0 

746 

747 CancelledErrors are no longer logged. 

748 """ 

749 try: 

750 ret = callback() 

751 if ret is not None: 

752 from tornado import gen 

753 

754 # Functions that return Futures typically swallow all 

755 # exceptions and store them in the Future. If a Future 

756 # makes it out to the IOLoop, ensure its exception (if any) 

757 # gets logged too. 

758 try: 

759 ret = gen.convert_yielded(ret) 

760 except gen.BadYieldError: 

761 # It's not unusual for add_callback to be used with 

762 # methods returning a non-None and non-yieldable 

763 # result, which should just be ignored. 

764 pass 

765 else: 

766 self.add_future(ret, self._discard_future_result) 

767 except asyncio.CancelledError: 

768 pass 

769 except Exception: 

770 app_log.error("Exception in callback %r", callback, exc_info=True) 

771 

772 def _discard_future_result(self, future: Future) -> None: 

773 """Avoid unhandled-exception warnings from spawned coroutines.""" 

774 future.result() 

775 

776 def split_fd( 

777 self, fd: Union[int, _Selectable] 

778 ) -> Tuple[int, Union[int, _Selectable]]: 

779 # """Returns an (fd, obj) pair from an ``fd`` parameter. 

780 

781 # We accept both raw file descriptors and file-like objects as 

782 # input to `add_handler` and related methods. When a file-like 

783 # object is passed, we must retain the object itself so we can 

784 # close it correctly when the `IOLoop` shuts down, but the 

785 # poller interfaces favor file descriptors (they will accept 

786 # file-like objects and call ``fileno()`` for you, but they 

787 # always return the descriptor itself). 

788 

789 # This method is provided for use by `IOLoop` subclasses and should 

790 # not generally be used by application code. 

791 

792 # .. versionadded:: 4.0 

793 # """ 

794 if isinstance(fd, int): 

795 return fd, fd 

796 return fd.fileno(), fd 

797 

798 def close_fd(self, fd: Union[int, _Selectable]) -> None: 

799 # """Utility method to close an ``fd``. 

800 

801 # If ``fd`` is a file-like object, we close it directly; otherwise 

802 # we use `os.close`. 

803 

804 # This method is provided for use by `IOLoop` subclasses (in 

805 # implementations of ``IOLoop.close(all_fds=True)`` and should 

806 # not generally be used by application code. 

807 

808 # .. versionadded:: 4.0 

809 # """ 

810 try: 

811 if isinstance(fd, int): 

812 os.close(fd) 

813 else: 

814 fd.close() 

815 except OSError: 

816 pass 

817 

818 def _register_task(self, f: Future) -> None: 

819 self._pending_tasks.add(f) 

820 

821 def _unregister_task(self, f: Future) -> None: 

822 self._pending_tasks.discard(f) 

823 

824 

825class _Timeout(object): 

826 """An IOLoop timeout, a UNIX timestamp and a callback""" 

827 

828 # Reduce memory overhead when there are lots of pending callbacks 

829 __slots__ = ["deadline", "callback", "tdeadline"] 

830 

831 def __init__( 

832 self, deadline: float, callback: Callable[[], None], io_loop: IOLoop 

833 ) -> None: 

834 if not isinstance(deadline, numbers.Real): 

835 raise TypeError("Unsupported deadline %r" % deadline) 

836 self.deadline = deadline 

837 self.callback = callback 

838 self.tdeadline = ( 

839 deadline, 

840 next(io_loop._timeout_counter), 

841 ) # type: Tuple[float, int] 

842 

843 # Comparison methods to sort by deadline, with object id as a tiebreaker 

844 # to guarantee a consistent ordering. The heapq module uses __le__ 

845 # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons 

846 # use __lt__). 

847 def __lt__(self, other: "_Timeout") -> bool: 

848 return self.tdeadline < other.tdeadline 

849 

850 def __le__(self, other: "_Timeout") -> bool: 

851 return self.tdeadline <= other.tdeadline 

852 

853 

854class PeriodicCallback(object): 

855 """Schedules the given callback to be called periodically. 

856 

857 The callback is called every ``callback_time`` milliseconds when 

858 ``callback_time`` is a float. Note that the timeout is given in 

859 milliseconds, while most other time-related functions in Tornado use 

860 seconds. ``callback_time`` may alternatively be given as a 

861 `datetime.timedelta` object. 

862 

863 If ``jitter`` is specified, each callback time will be randomly selected 

864 within a window of ``jitter * callback_time`` milliseconds. 

865 Jitter can be used to reduce alignment of events with similar periods. 

866 A jitter of 0.1 means allowing a 10% variation in callback time. 

867 The window is centered on ``callback_time`` so the total number of calls 

868 within a given interval should not be significantly affected by adding 

869 jitter. 

870 

871 If the callback runs for longer than ``callback_time`` milliseconds, 

872 subsequent invocations will be skipped to get back on schedule. 

873 

874 `start` must be called after the `PeriodicCallback` is created. 

875 

876 .. versionchanged:: 5.0 

877 The ``io_loop`` argument (deprecated since version 4.1) has been removed. 

878 

879 .. versionchanged:: 5.1 

880 The ``jitter`` argument is added. 

881 

882 .. versionchanged:: 6.2 

883 If the ``callback`` argument is a coroutine, and a callback runs for 

884 longer than ``callback_time``, subsequent invocations will be skipped. 

885 Previously this was only true for regular functions, not coroutines, 

886 which were "fire-and-forget" for `PeriodicCallback`. 

887 

888 The ``callback_time`` argument now accepts `datetime.timedelta` objects, 

889 in addition to the previous numeric milliseconds. 

890 """ 

891 

892 def __init__( 

893 self, 

894 callback: Callable[[], Optional[Awaitable]], 

895 callback_time: Union[datetime.timedelta, float], 

896 jitter: float = 0, 

897 ) -> None: 

898 self.callback = callback 

899 if isinstance(callback_time, datetime.timedelta): 

900 self.callback_time = callback_time / datetime.timedelta(milliseconds=1) 

901 else: 

902 if callback_time <= 0: 

903 raise ValueError("Periodic callback must have a positive callback_time") 

904 self.callback_time = callback_time 

905 self.jitter = jitter 

906 self._running = False 

907 self._timeout = None # type: object 

908 

909 def start(self) -> None: 

910 """Starts the timer.""" 

911 # Looking up the IOLoop here allows to first instantiate the 

912 # PeriodicCallback in another thread, then start it using 

913 # IOLoop.add_callback(). 

914 self.io_loop = IOLoop.current() 

915 self._running = True 

916 self._next_timeout = self.io_loop.time() 

917 self._schedule_next() 

918 

919 def stop(self) -> None: 

920 """Stops the timer.""" 

921 self._running = False 

922 if self._timeout is not None: 

923 self.io_loop.remove_timeout(self._timeout) 

924 self._timeout = None 

925 

926 def is_running(self) -> bool: 

927 """Returns ``True`` if this `.PeriodicCallback` has been started. 

928 

929 .. versionadded:: 4.1 

930 """ 

931 return self._running 

932 

933 async def _run(self) -> None: 

934 if not self._running: 

935 return 

936 try: 

937 val = self.callback() 

938 if val is not None and isawaitable(val): 

939 await val 

940 except Exception: 

941 app_log.error("Exception in callback %r", self.callback, exc_info=True) 

942 finally: 

943 self._schedule_next() 

944 

945 def _schedule_next(self) -> None: 

946 if self._running: 

947 self._update_next(self.io_loop.time()) 

948 self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run) 

949 

950 def _update_next(self, current_time: float) -> None: 

951 callback_time_sec = self.callback_time / 1000.0 

952 if self.jitter: 

953 # apply jitter fraction 

954 callback_time_sec *= 1 + (self.jitter * (random.random() - 0.5)) 

955 if self._next_timeout <= current_time: 

956 # The period should be measured from the start of one call 

957 # to the start of the next. If one call takes too long, 

958 # skip cycles to get back to a multiple of the original 

959 # schedule. 

960 self._next_timeout += ( 

961 math.floor((current_time - self._next_timeout) / callback_time_sec) + 1 

962 ) * callback_time_sec 

963 else: 

964 # If the clock moved backwards, ensure we advance the next 

965 # timeout instead of recomputing the same value again. 

966 # This may result in long gaps between callbacks if the 

967 # clock jumps backwards by a lot, but the far more common 

968 # scenario is a small NTP adjustment that should just be 

969 # ignored. 

970 # 

971 # Note that on some systems if time.time() runs slower 

972 # than time.monotonic() (most common on windows), we 

973 # effectively experience a small backwards time jump on 

974 # every iteration because PeriodicCallback uses 

975 # time.time() while asyncio schedules callbacks using 

976 # time.monotonic(). 

977 # https://github.com/tornadoweb/tornado/issues/2333 

978 self._next_timeout += callback_time_sec