Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tornado/ioloop.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

284 statements  

1# 

2# Copyright 2009 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""An I/O event loop for non-blocking sockets. 

17 

18In Tornado 6.0, `.IOLoop` is a wrapper around the `asyncio` event loop, with a 

19slightly different interface. The `.IOLoop` interface is now provided primarily 

20for backwards compatibility; new code should generally use the `asyncio` event 

21loop interface directly. The `IOLoop.current` class method provides the 

22`IOLoop` instance corresponding to the running `asyncio` event loop. 

23 

24""" 

25 

26import asyncio 

27import concurrent.futures 

28import datetime 

29import functools 

30import numbers 

31import os 

32import sys 

33import time 

34import math 

35import random 

36import warnings 

37from inspect import isawaitable 

38 

39from tornado.concurrent import ( 

40 Future, 

41 is_future, 

42 chain_future, 

43 future_set_exc_info, 

44 future_add_done_callback, 

45) 

46from tornado.log import app_log 

47from tornado.util import Configurable, TimeoutError, import_object 

48 

49import typing 

50from typing import Union, Any, Type, Optional, Callable, TypeVar, Tuple, Awaitable 

51 

52if typing.TYPE_CHECKING: 

53 from typing import Dict, List, Set, TypedDict # noqa: F401 

54 

55 from typing_extensions import Protocol 

56else: 

57 Protocol = object 

58 

59 

60class _Selectable(Protocol): 

61 def fileno(self) -> int: 

62 pass 

63 

64 def close(self) -> None: 

65 pass 

66 

67 

68_T = TypeVar("_T") 

69_S = TypeVar("_S", bound=_Selectable) 

70 

71 

72class IOLoop(Configurable): 

73 """An I/O event loop. 

74 

75 As of Tornado 6.0, `IOLoop` is a wrapper around the `asyncio` event loop. 

76 

77 Example usage for a simple TCP server: 

78 

79 .. testcode:: 

80 

81 import asyncio 

82 import errno 

83 import functools 

84 import socket 

85 

86 import tornado 

87 from tornado.iostream import IOStream 

88 

89 async def handle_connection(connection, address): 

90 stream = IOStream(connection) 

91 message = await stream.read_until_close() 

92 print("message from client:", message.decode().strip()) 

93 

94 def connection_ready(sock, fd, events): 

95 while True: 

96 try: 

97 connection, address = sock.accept() 

98 except BlockingIOError: 

99 return 

100 connection.setblocking(0) 

101 io_loop = tornado.ioloop.IOLoop.current() 

102 io_loop.spawn_callback(handle_connection, connection, address) 

103 

104 async def main(): 

105 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 

106 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

107 sock.setblocking(0) 

108 sock.bind(("", 8888)) 

109 sock.listen(128) 

110 

111 io_loop = tornado.ioloop.IOLoop.current() 

112 callback = functools.partial(connection_ready, sock) 

113 io_loop.add_handler(sock.fileno(), callback, io_loop.READ) 

114 await asyncio.Event().wait() 

115 

116 if __name__ == "__main__": 

117 asyncio.run(main()) 

118 

119 Most applications should not attempt to construct an `IOLoop` directly, 

120 and instead initialize the `asyncio` event loop and use `IOLoop.current()`. 

121 In some cases, such as in test frameworks when initializing an `IOLoop` 

122 to be run in a secondary thread, it may be appropriate to construct 

123 an `IOLoop` with ``IOLoop(make_current=False)``. 

124 

125 In general, an `IOLoop` cannot survive a fork or be shared across processes 

126 in any way. When multiple processes are being used, each process should 

127 create its own `IOLoop`, which also implies that any objects which depend on 

128 the `IOLoop` (such as `.AsyncHTTPClient`) must also be created in the child 

129 processes. As a guideline, anything that starts processes (including the 

130 `tornado.process` and `multiprocessing` modules) should do so as early as 

131 possible, ideally the first thing the application does after loading its 

132 configuration, and *before* any calls to `.IOLoop.start` or `asyncio.run`. 

133 

134 .. versionchanged:: 4.2 

135 Added the ``make_current`` keyword argument to the `IOLoop` 

136 constructor. 

137 

138 .. versionchanged:: 5.0 

139 

140 Uses the `asyncio` event loop by default. The ``IOLoop.configure`` method 

141 cannot be used on Python 3 except to redundantly specify the `asyncio` 

142 event loop. 

143 

144 .. versionchanged:: 6.3 

145 ``make_current=True`` is now the default when creating an IOLoop - 

146 previously the default was to make the event loop current if there wasn't 

147 already a current one. 

148 """ 

149 

150 # These constants were originally based on constants from the epoll module. 

151 NONE = 0 

152 READ = 0x001 

153 WRITE = 0x004 

154 ERROR = 0x018 

155 

156 # In Python 3, _ioloop_for_asyncio maps from asyncio loops to IOLoops. 

157 _ioloop_for_asyncio = dict() # type: Dict[asyncio.AbstractEventLoop, IOLoop] 

158 

159 # Maintain a set of all pending tasks to follow the warning in the docs 

160 # of asyncio.create_tasks: 

161 # https://docs.python.org/3.11/library/asyncio-task.html#asyncio.create_task 

162 # This ensures that all pending tasks have a strong reference so they 

163 # will not be garbage collected before they are finished. 

164 # (Thus avoiding "task was destroyed but it is pending" warnings) 

165 # An analogous change has been proposed in cpython for 3.13: 

166 # https://github.com/python/cpython/issues/91887 

167 # If that change is accepted, this can eventually be removed. 

168 # If it is not, we will consider the rationale and may remove this. 

169 _pending_tasks = set() # type: Set[Future] 

170 

171 @classmethod 

172 def configure( 

173 cls, impl: "Union[None, str, Type[Configurable]]", **kwargs: Any 

174 ) -> None: 

175 from tornado.platform.asyncio import BaseAsyncIOLoop 

176 

177 if isinstance(impl, str): 

178 impl = import_object(impl) 

179 if isinstance(impl, type) and not issubclass(impl, BaseAsyncIOLoop): 

180 raise RuntimeError("only AsyncIOLoop is allowed when asyncio is available") 

181 super().configure(impl, **kwargs) 

182 

183 @staticmethod 

184 def instance() -> "IOLoop": 

185 """Deprecated alias for `IOLoop.current()`. 

186 

187 .. versionchanged:: 5.0 

188 

189 Previously, this method returned a global singleton 

190 `IOLoop`, in contrast with the per-thread `IOLoop` returned 

191 by `current()`. In nearly all cases the two were the same 

192 (when they differed, it was generally used from non-Tornado 

193 threads to communicate back to the main thread's `IOLoop`). 

194 This distinction is not present in `asyncio`, so in order 

195 to facilitate integration with that package `instance()` 

196 was changed to be an alias to `current()`. Applications 

197 using the cross-thread communications aspect of 

198 `instance()` should instead set their own global variable 

199 to point to the `IOLoop` they want to use. 

200 

201 .. deprecated:: 5.0 

202 """ 

203 return IOLoop.current() 

204 

205 def install(self) -> None: 

206 """Deprecated alias for `make_current()`. 

207 

208 .. versionchanged:: 5.0 

209 

210 Previously, this method would set this `IOLoop` as the 

211 global singleton used by `IOLoop.instance()`. Now that 

212 `instance()` is an alias for `current()`, `install()` 

213 is an alias for `make_current()`. 

214 

215 .. deprecated:: 5.0 

216 """ 

217 self.make_current() 

218 

219 @staticmethod 

220 def clear_instance() -> None: 

221 """Deprecated alias for `clear_current()`. 

222 

223 .. versionchanged:: 5.0 

224 

225 Previously, this method would clear the `IOLoop` used as 

226 the global singleton by `IOLoop.instance()`. Now that 

227 `instance()` is an alias for `current()`, 

228 `clear_instance()` is an alias for `clear_current()`. 

229 

230 .. deprecated:: 5.0 

231 

232 """ 

233 IOLoop.clear_current() 

234 

235 @typing.overload 

236 @staticmethod 

237 def current() -> "IOLoop": 

238 pass 

239 

240 @typing.overload 

241 @staticmethod 

242 def current(instance: bool = True) -> Optional["IOLoop"]: # noqa: F811 

243 pass 

244 

245 @staticmethod 

246 def current(instance: bool = True) -> Optional["IOLoop"]: # noqa: F811 

247 """Returns the current thread's `IOLoop`. 

248 

249 If an `IOLoop` is currently running or has been marked as 

250 current by `make_current`, returns that instance. If there is 

251 no current `IOLoop` and ``instance`` is true, creates one. 

252 

253 .. versionchanged:: 4.1 

254 Added ``instance`` argument to control the fallback to 

255 `IOLoop.instance()`. 

256 .. versionchanged:: 5.0 

257 On Python 3, control of the current `IOLoop` is delegated 

258 to `asyncio`, with this and other methods as pass-through accessors. 

259 The ``instance`` argument now controls whether an `IOLoop` 

260 is created automatically when there is none, instead of 

261 whether we fall back to `IOLoop.instance()` (which is now 

262 an alias for this method). ``instance=False`` is deprecated, 

263 since even if we do not create an `IOLoop`, this method 

264 may initialize the asyncio loop. 

265 

266 .. deprecated:: 6.2 

267 It is deprecated to call ``IOLoop.current()`` when no `asyncio` 

268 event loop is running. 

269 """ 

270 try: 

271 loop = asyncio.get_event_loop() 

272 except RuntimeError: 

273 if not instance: 

274 return None 

275 # Create a new asyncio event loop for this thread. 

276 loop = asyncio.new_event_loop() 

277 asyncio.set_event_loop(loop) 

278 

279 try: 

280 return IOLoop._ioloop_for_asyncio[loop] 

281 except KeyError: 

282 if instance: 

283 from tornado.platform.asyncio import AsyncIOMainLoop 

284 

285 current = AsyncIOMainLoop() # type: Optional[IOLoop] 

286 else: 

287 current = None 

288 return current 

289 

290 def make_current(self) -> None: 

291 """Makes this the `IOLoop` for the current thread. 

292 

293 An `IOLoop` automatically becomes current for its thread 

294 when it is started, but it is sometimes useful to call 

295 `make_current` explicitly before starting the `IOLoop`, 

296 so that code run at startup time can find the right 

297 instance. 

298 

299 .. versionchanged:: 4.1 

300 An `IOLoop` created while there is no current `IOLoop` 

301 will automatically become current. 

302 

303 .. versionchanged:: 5.0 

304 This method also sets the current `asyncio` event loop. 

305 

306 .. deprecated:: 6.2 

307 Setting and clearing the current event loop through Tornado is 

308 deprecated. Use ``asyncio.set_event_loop`` instead if you need this. 

309 """ 

310 warnings.warn( 

311 "make_current is deprecated; start the event loop first", 

312 DeprecationWarning, 

313 stacklevel=2, 

314 ) 

315 self._make_current() 

316 

317 def _make_current(self) -> None: 

318 # The asyncio event loops override this method. 

319 raise NotImplementedError() 

320 

321 @staticmethod 

322 def clear_current() -> None: 

323 """Clears the `IOLoop` for the current thread. 

324 

325 Intended primarily for use by test frameworks in between tests. 

326 

327 .. versionchanged:: 5.0 

328 This method also clears the current `asyncio` event loop. 

329 .. deprecated:: 6.2 

330 """ 

331 warnings.warn( 

332 "clear_current is deprecated", 

333 DeprecationWarning, 

334 stacklevel=2, 

335 ) 

336 IOLoop._clear_current() 

337 

338 @staticmethod 

339 def _clear_current() -> None: 

340 old = IOLoop.current(instance=False) 

341 if old is not None: 

342 old._clear_current_hook() 

343 

344 def _clear_current_hook(self) -> None: 

345 """Instance method called when an IOLoop ceases to be current. 

346 

347 May be overridden by subclasses as a counterpart to make_current. 

348 """ 

349 pass 

350 

351 @classmethod 

352 def configurable_base(cls) -> Type[Configurable]: 

353 return IOLoop 

354 

355 @classmethod 

356 def configurable_default(cls) -> Type[Configurable]: 

357 from tornado.platform.asyncio import AsyncIOLoop 

358 

359 return AsyncIOLoop 

360 

361 def initialize(self, make_current: bool = True) -> None: 

362 if make_current: 

363 self._make_current() 

364 

365 def close(self, all_fds: bool = False) -> None: 

366 """Closes the `IOLoop`, freeing any resources used. 

367 

368 If ``all_fds`` is true, all file descriptors registered on the 

369 IOLoop will be closed (not just the ones created by the 

370 `IOLoop` itself). 

371 

372 Many applications will only use a single `IOLoop` that runs for the 

373 entire lifetime of the process. In that case closing the `IOLoop` 

374 is not necessary since everything will be cleaned up when the 

375 process exits. `IOLoop.close` is provided mainly for scenarios 

376 such as unit tests, which create and destroy a large number of 

377 ``IOLoops``. 

378 

379 An `IOLoop` must be completely stopped before it can be closed. This 

380 means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must 

381 be allowed to return before attempting to call `IOLoop.close()`. 

382 Therefore the call to `close` will usually appear just after 

383 the call to `start` rather than near the call to `stop`. 

384 

385 .. versionchanged:: 3.1 

386 If the `IOLoop` implementation supports non-integer objects 

387 for "file descriptors", those objects will have their 

388 ``close`` method when ``all_fds`` is true. 

389 """ 

390 raise NotImplementedError() 

391 

392 @typing.overload 

393 def add_handler( 

394 self, fd: int, handler: Callable[[int, int], None], events: int 

395 ) -> None: 

396 pass 

397 

398 @typing.overload # noqa: F811 

399 def add_handler( 

400 self, fd: _S, handler: Callable[[_S, int], None], events: int 

401 ) -> None: 

402 pass 

403 

404 def add_handler( # noqa: F811 

405 self, fd: Union[int, _Selectable], handler: Callable[..., None], events: int 

406 ) -> None: 

407 """Registers the given handler to receive the given events for ``fd``. 

408 

409 The ``fd`` argument may either be an integer file descriptor or 

410 a file-like object with a ``fileno()`` and ``close()`` method. 

411 

412 The ``events`` argument is a bitwise or of the constants 

413 ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``. 

414 

415 When an event occurs, ``handler(fd, events)`` will be run. 

416 

417 .. versionchanged:: 4.0 

418 Added the ability to pass file-like objects in addition to 

419 raw file descriptors. 

420 """ 

421 raise NotImplementedError() 

422 

423 def update_handler(self, fd: Union[int, _Selectable], events: int) -> None: 

424 """Changes the events we listen for ``fd``. 

425 

426 .. versionchanged:: 4.0 

427 Added the ability to pass file-like objects in addition to 

428 raw file descriptors. 

429 """ 

430 raise NotImplementedError() 

431 

432 def remove_handler(self, fd: Union[int, _Selectable]) -> None: 

433 """Stop listening for events on ``fd``. 

434 

435 .. versionchanged:: 4.0 

436 Added the ability to pass file-like objects in addition to 

437 raw file descriptors. 

438 """ 

439 raise NotImplementedError() 

440 

441 def start(self) -> None: 

442 """Starts the I/O loop. 

443 

444 The loop will run until one of the callbacks calls `stop()`, which 

445 will make the loop stop after the current event iteration completes. 

446 """ 

447 raise NotImplementedError() 

448 

449 def stop(self) -> None: 

450 """Stop the I/O loop. 

451 

452 If the event loop is not currently running, the next call to `start()` 

453 will return immediately. 

454 

455 Note that even after `stop` has been called, the `IOLoop` is not 

456 completely stopped until `IOLoop.start` has also returned. 

457 Some work that was scheduled before the call to `stop` may still 

458 be run before the `IOLoop` shuts down. 

459 """ 

460 raise NotImplementedError() 

461 

462 def run_sync(self, func: Callable, timeout: Optional[float] = None) -> Any: 

463 """Starts the `IOLoop`, runs the given function, and stops the loop. 

464 

465 The function must return either an awaitable object or 

466 ``None``. If the function returns an awaitable object, the 

467 `IOLoop` will run until the awaitable is resolved (and 

468 `run_sync()` will return the awaitable's result). If it raises 

469 an exception, the `IOLoop` will stop and the exception will be 

470 re-raised to the caller. 

471 

472 The keyword-only argument ``timeout`` may be used to set 

473 a maximum duration for the function. If the timeout expires, 

474 a `asyncio.TimeoutError` is raised. 

475 

476 This method is useful to allow asynchronous calls in a 

477 ``main()`` function:: 

478 

479 async def main(): 

480 # do stuff... 

481 

482 if __name__ == '__main__': 

483 IOLoop.current().run_sync(main) 

484 

485 .. versionchanged:: 4.3 

486 Returning a non-``None``, non-awaitable value is now an error. 

487 

488 .. versionchanged:: 5.0 

489 If a timeout occurs, the ``func`` coroutine will be cancelled. 

490 

491 .. versionchanged:: 6.2 

492 ``tornado.util.TimeoutError`` is now an alias to ``asyncio.TimeoutError``. 

493 """ 

494 if typing.TYPE_CHECKING: 

495 FutureCell = TypedDict( # noqa: F841 

496 "FutureCell", {"future": Optional[Future], "timeout_called": bool} 

497 ) 

498 future_cell = {"future": None, "timeout_called": False} # type: FutureCell 

499 

500 def run() -> None: 

501 try: 

502 result = func() 

503 if result is not None: 

504 from tornado.gen import convert_yielded 

505 

506 result = convert_yielded(result) 

507 except Exception: 

508 fut = Future() # type: Future[Any] 

509 future_cell["future"] = fut 

510 future_set_exc_info(fut, sys.exc_info()) 

511 else: 

512 if is_future(result): 

513 future_cell["future"] = result 

514 else: 

515 fut = Future() 

516 future_cell["future"] = fut 

517 fut.set_result(result) 

518 assert future_cell["future"] is not None 

519 self.add_future(future_cell["future"], lambda future: self.stop()) 

520 

521 self.add_callback(run) 

522 if timeout is not None: 

523 

524 def timeout_callback() -> None: 

525 # signal that timeout is triggered 

526 future_cell["timeout_called"] = True 

527 # If we can cancel the future, do so and wait on it. If not, 

528 # Just stop the loop and return with the task still pending. 

529 # (If we neither cancel nor wait for the task, a warning 

530 # will be logged). 

531 assert future_cell["future"] is not None 

532 if not future_cell["future"].cancel(): 

533 self.stop() 

534 

535 timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback) 

536 self.start() 

537 if timeout is not None: 

538 self.remove_timeout(timeout_handle) 

539 assert future_cell["future"] is not None 

540 if future_cell["future"].cancelled() or not future_cell["future"].done(): 

541 if future_cell["timeout_called"]: 

542 raise TimeoutError("Operation timed out after %s seconds" % timeout) 

543 else: 

544 # timeout not called; maybe stop() was called explicitly 

545 # or some other cancellation 

546 raise RuntimeError("Event loop stopped before Future completed.") 

547 return future_cell["future"].result() 

548 

549 def time(self) -> float: 

550 """Returns the current time according to the `IOLoop`'s clock. 

551 

552 The return value is a floating-point number relative to an 

553 unspecified time in the past. 

554 

555 Historically, the IOLoop could be customized to use e.g. 

556 `time.monotonic` instead of `time.time`, but this is not 

557 currently supported and so this method is equivalent to 

558 `time.time`. 

559 

560 """ 

561 return time.time() 

562 

563 def add_timeout( 

564 self, 

565 deadline: Union[float, datetime.timedelta], 

566 callback: Callable, 

567 *args: Any, 

568 **kwargs: Any, 

569 ) -> object: 

570 """Runs the ``callback`` at the time ``deadline`` from the I/O loop. 

571 

572 Returns an opaque handle that may be passed to 

573 `remove_timeout` to cancel. 

574 

575 ``deadline`` may be a number denoting a time (on the same 

576 scale as `IOLoop.time`, normally `time.time`), or a 

577 `datetime.timedelta` object for a deadline relative to the 

578 current time. Since Tornado 4.0, `call_later` is a more 

579 convenient alternative for the relative case since it does not 

580 require a timedelta object. 

581 

582 Note that it is not safe to call `add_timeout` from other threads. 

583 Instead, you must use `add_callback` to transfer control to the 

584 `IOLoop`'s thread, and then call `add_timeout` from there. 

585 

586 Subclasses of IOLoop must implement either `add_timeout` or 

587 `call_at`; the default implementations of each will call 

588 the other. `call_at` is usually easier to implement, but 

589 subclasses that wish to maintain compatibility with Tornado 

590 versions prior to 4.0 must use `add_timeout` instead. 

591 

592 .. versionchanged:: 4.0 

593 Now passes through ``*args`` and ``**kwargs`` to the callback. 

594 """ 

595 if isinstance(deadline, numbers.Real): 

596 return self.call_at(deadline, callback, *args, **kwargs) 

597 elif isinstance(deadline, datetime.timedelta): 

598 return self.call_at( 

599 self.time() + deadline.total_seconds(), callback, *args, **kwargs 

600 ) 

601 else: 

602 raise TypeError("Unsupported deadline %r" % deadline) 

603 

604 def call_later( 

605 self, delay: float, callback: Callable, *args: Any, **kwargs: Any 

606 ) -> object: 

607 """Runs the ``callback`` after ``delay`` seconds have passed. 

608 

609 Returns an opaque handle that may be passed to `remove_timeout` 

610 to cancel. Note that unlike the `asyncio` method of the same 

611 name, the returned object does not have a ``cancel()`` method. 

612 

613 See `add_timeout` for comments on thread-safety and subclassing. 

614 

615 .. versionadded:: 4.0 

616 """ 

617 return self.call_at(self.time() + delay, callback, *args, **kwargs) 

618 

619 def call_at( 

620 self, when: float, callback: Callable, *args: Any, **kwargs: Any 

621 ) -> object: 

622 """Runs the ``callback`` at the absolute time designated by ``when``. 

623 

624 ``when`` must be a number using the same reference point as 

625 `IOLoop.time`. 

626 

627 Returns an opaque handle that may be passed to `remove_timeout` 

628 to cancel. Note that unlike the `asyncio` method of the same 

629 name, the returned object does not have a ``cancel()`` method. 

630 

631 See `add_timeout` for comments on thread-safety and subclassing. 

632 

633 .. versionadded:: 4.0 

634 """ 

635 return self.add_timeout(when, callback, *args, **kwargs) 

636 

637 def remove_timeout(self, timeout: object) -> None: 

638 """Cancels a pending timeout. 

639 

640 The argument is a handle as returned by `add_timeout`. It is 

641 safe to call `remove_timeout` even if the callback has already 

642 been run. 

643 """ 

644 raise NotImplementedError() 

645 

646 def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None: 

647 """Calls the given callback on the next I/O loop iteration. 

648 

649 It is safe to call this method from any thread at any time, 

650 except from a signal handler. Note that this is the **only** 

651 method in `IOLoop` that makes this thread-safety guarantee; all 

652 other interaction with the `IOLoop` must be done from that 

653 `IOLoop`'s thread. `add_callback()` may be used to transfer 

654 control from other threads to the `IOLoop`'s thread. 

655 """ 

656 raise NotImplementedError() 

657 

658 def add_callback_from_signal( 

659 self, callback: Callable, *args: Any, **kwargs: Any 

660 ) -> None: 

661 """Calls the given callback on the next I/O loop iteration. 

662 

663 Intended to be afe for use from a Python signal handler; should not be 

664 used otherwise. 

665 

666 .. deprecated:: 6.4 

667 Use ``asyncio.AbstractEventLoop.add_signal_handler`` instead. 

668 This method is suspected to have been broken since Tornado 5.0 and 

669 will be removed in version 7.0. 

670 """ 

671 raise NotImplementedError() 

672 

673 def spawn_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None: 

674 """Calls the given callback on the next IOLoop iteration. 

675 

676 As of Tornado 6.0, this method is equivalent to `add_callback`. 

677 

678 .. versionadded:: 4.0 

679 """ 

680 self.add_callback(callback, *args, **kwargs) 

681 

682 def add_future( 

683 self, 

684 future: "Union[Future[_T], concurrent.futures.Future[_T]]", 

685 callback: Callable[["Future[_T]"], None], 

686 ) -> None: 

687 """Schedules a callback on the ``IOLoop`` when the given 

688 `.Future` is finished. 

689 

690 The callback is invoked with one argument, the 

691 `.Future`. 

692 

693 This method only accepts `.Future` objects and not other 

694 awaitables (unlike most of Tornado where the two are 

695 interchangeable). 

696 """ 

697 if isinstance(future, Future): 

698 # Note that we specifically do not want the inline behavior of 

699 # tornado.concurrent.future_add_done_callback. We always want 

700 # this callback scheduled on the next IOLoop iteration (which 

701 # asyncio.Future always does). 

702 # 

703 # Wrap the callback in self._run_callback so we control 

704 # the error logging (i.e. it goes to tornado.log.app_log 

705 # instead of asyncio's log). 

706 future.add_done_callback( 

707 lambda f: self._run_callback(functools.partial(callback, f)) 

708 ) 

709 else: 

710 assert is_future(future) 

711 # For concurrent futures, we use self.add_callback, so 

712 # it's fine if future_add_done_callback inlines that call. 

713 future_add_done_callback(future, lambda f: self.add_callback(callback, f)) 

714 

715 def run_in_executor( 

716 self, 

717 executor: Optional[concurrent.futures.Executor], 

718 func: Callable[..., _T], 

719 *args: Any, 

720 ) -> "Future[_T]": 

721 """Runs a function in a ``concurrent.futures.Executor``. If 

722 ``executor`` is ``None``, the IO loop's default executor will be used. 

723 

724 Use `functools.partial` to pass keyword arguments to ``func``. 

725 

726 .. versionadded:: 5.0 

727 """ 

728 if executor is None: 

729 if not hasattr(self, "_executor"): 

730 from tornado.process import cpu_count 

731 

732 self._executor = concurrent.futures.ThreadPoolExecutor( 

733 max_workers=(cpu_count() * 5) 

734 ) # type: concurrent.futures.Executor 

735 executor = self._executor 

736 c_future = executor.submit(func, *args) 

737 # Concurrent Futures are not usable with await. Wrap this in a 

738 # Tornado Future instead, using self.add_future for thread-safety. 

739 t_future = Future() # type: Future[_T] 

740 self.add_future(c_future, lambda f: chain_future(f, t_future)) 

741 return t_future 

742 

743 def set_default_executor(self, executor: concurrent.futures.Executor) -> None: 

744 """Sets the default executor to use with :meth:`run_in_executor`. 

745 

746 .. versionadded:: 5.0 

747 """ 

748 self._executor = executor 

749 

750 def _run_callback(self, callback: Callable[[], Any]) -> None: 

751 """Runs a callback with error handling. 

752 

753 .. versionchanged:: 6.0 

754 

755 CancelledErrors are no longer logged. 

756 """ 

757 try: 

758 ret = callback() 

759 if ret is not None: 

760 from tornado import gen 

761 

762 # Functions that return Futures typically swallow all 

763 # exceptions and store them in the Future. If a Future 

764 # makes it out to the IOLoop, ensure its exception (if any) 

765 # gets logged too. 

766 try: 

767 ret = gen.convert_yielded(ret) 

768 except gen.BadYieldError: 

769 # It's not unusual for add_callback to be used with 

770 # methods returning a non-None and non-yieldable 

771 # result, which should just be ignored. 

772 pass 

773 else: 

774 self.add_future(ret, self._discard_future_result) 

775 except asyncio.CancelledError: 

776 pass 

777 except Exception: 

778 app_log.error("Exception in callback %r", callback, exc_info=True) 

779 

780 def _discard_future_result(self, future: Future) -> None: 

781 """Avoid unhandled-exception warnings from spawned coroutines.""" 

782 future.result() 

783 

784 def split_fd( 

785 self, fd: Union[int, _Selectable] 

786 ) -> Tuple[int, Union[int, _Selectable]]: 

787 # """Returns an (fd, obj) pair from an ``fd`` parameter. 

788 

789 # We accept both raw file descriptors and file-like objects as 

790 # input to `add_handler` and related methods. When a file-like 

791 # object is passed, we must retain the object itself so we can 

792 # close it correctly when the `IOLoop` shuts down, but the 

793 # poller interfaces favor file descriptors (they will accept 

794 # file-like objects and call ``fileno()`` for you, but they 

795 # always return the descriptor itself). 

796 

797 # This method is provided for use by `IOLoop` subclasses and should 

798 # not generally be used by application code. 

799 

800 # .. versionadded:: 4.0 

801 # """ 

802 if isinstance(fd, int): 

803 return fd, fd 

804 return fd.fileno(), fd 

805 

806 def close_fd(self, fd: Union[int, _Selectable]) -> None: 

807 # """Utility method to close an ``fd``. 

808 

809 # If ``fd`` is a file-like object, we close it directly; otherwise 

810 # we use `os.close`. 

811 

812 # This method is provided for use by `IOLoop` subclasses (in 

813 # implementations of ``IOLoop.close(all_fds=True)`` and should 

814 # not generally be used by application code. 

815 

816 # .. versionadded:: 4.0 

817 # """ 

818 try: 

819 if isinstance(fd, int): 

820 os.close(fd) 

821 else: 

822 fd.close() 

823 except OSError: 

824 pass 

825 

826 def _register_task(self, f: Future) -> None: 

827 self._pending_tasks.add(f) 

828 

829 def _unregister_task(self, f: Future) -> None: 

830 self._pending_tasks.discard(f) 

831 

832 

833class _Timeout: 

834 """An IOLoop timeout, a UNIX timestamp and a callback""" 

835 

836 # Reduce memory overhead when there are lots of pending callbacks 

837 __slots__ = ["deadline", "callback", "tdeadline"] 

838 

839 def __init__( 

840 self, deadline: float, callback: Callable[[], None], io_loop: IOLoop 

841 ) -> None: 

842 if not isinstance(deadline, numbers.Real): 

843 raise TypeError("Unsupported deadline %r" % deadline) 

844 self.deadline = deadline 

845 self.callback = callback 

846 self.tdeadline = ( 

847 deadline, 

848 next(io_loop._timeout_counter), 

849 ) # type: Tuple[float, int] 

850 

851 # Comparison methods to sort by deadline, with object id as a tiebreaker 

852 # to guarantee a consistent ordering. The heapq module uses __le__ 

853 # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons 

854 # use __lt__). 

855 def __lt__(self, other: "_Timeout") -> bool: 

856 return self.tdeadline < other.tdeadline 

857 

858 def __le__(self, other: "_Timeout") -> bool: 

859 return self.tdeadline <= other.tdeadline 

860 

861 

862class PeriodicCallback: 

863 """Schedules the given callback to be called periodically. 

864 

865 The callback is called every ``callback_time`` milliseconds when 

866 ``callback_time`` is a float. Note that the timeout is given in 

867 milliseconds, while most other time-related functions in Tornado use 

868 seconds. ``callback_time`` may alternatively be given as a 

869 `datetime.timedelta` object. 

870 

871 If ``jitter`` is specified, each callback time will be randomly selected 

872 within a window of ``jitter * callback_time`` milliseconds. 

873 Jitter can be used to reduce alignment of events with similar periods. 

874 A jitter of 0.1 means allowing a 10% variation in callback time. 

875 The window is centered on ``callback_time`` so the total number of calls 

876 within a given interval should not be significantly affected by adding 

877 jitter. 

878 

879 If the callback runs for longer than ``callback_time`` milliseconds, 

880 subsequent invocations will be skipped to get back on schedule. 

881 

882 `start` must be called after the `PeriodicCallback` is created. 

883 

884 .. versionchanged:: 5.0 

885 The ``io_loop`` argument (deprecated since version 4.1) has been removed. 

886 

887 .. versionchanged:: 5.1 

888 The ``jitter`` argument is added. 

889 

890 .. versionchanged:: 6.2 

891 If the ``callback`` argument is a coroutine, and a callback runs for 

892 longer than ``callback_time``, subsequent invocations will be skipped. 

893 Previously this was only true for regular functions, not coroutines, 

894 which were "fire-and-forget" for `PeriodicCallback`. 

895 

896 The ``callback_time`` argument now accepts `datetime.timedelta` objects, 

897 in addition to the previous numeric milliseconds. 

898 """ 

899 

900 def __init__( 

901 self, 

902 callback: Callable[[], Optional[Awaitable]], 

903 callback_time: Union[datetime.timedelta, float], 

904 jitter: float = 0, 

905 ) -> None: 

906 self.callback = callback 

907 if isinstance(callback_time, datetime.timedelta): 

908 self.callback_time = callback_time / datetime.timedelta(milliseconds=1) 

909 else: 

910 if callback_time <= 0: 

911 raise ValueError("Periodic callback must have a positive callback_time") 

912 self.callback_time = callback_time 

913 self.jitter = jitter 

914 self._running = False 

915 self._timeout = None # type: object 

916 

917 def start(self) -> None: 

918 """Starts the timer.""" 

919 # Looking up the IOLoop here allows to first instantiate the 

920 # PeriodicCallback in another thread, then start it using 

921 # IOLoop.add_callback(). 

922 self.io_loop = IOLoop.current() 

923 self._running = True 

924 self._next_timeout = self.io_loop.time() 

925 self._schedule_next() 

926 

927 def stop(self) -> None: 

928 """Stops the timer.""" 

929 self._running = False 

930 if self._timeout is not None: 

931 self.io_loop.remove_timeout(self._timeout) 

932 self._timeout = None 

933 

934 def is_running(self) -> bool: 

935 """Returns ``True`` if this `.PeriodicCallback` has been started. 

936 

937 .. versionadded:: 4.1 

938 """ 

939 return self._running 

940 

941 async def _run(self) -> None: 

942 if not self._running: 

943 return 

944 try: 

945 val = self.callback() 

946 if val is not None and isawaitable(val): 

947 await val 

948 except Exception: 

949 app_log.error("Exception in callback %r", self.callback, exc_info=True) 

950 finally: 

951 self._schedule_next() 

952 

953 def _schedule_next(self) -> None: 

954 if self._running: 

955 self._update_next(self.io_loop.time()) 

956 self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run) 

957 

958 def _update_next(self, current_time: float) -> None: 

959 callback_time_sec = self.callback_time / 1000.0 

960 if self.jitter: 

961 # apply jitter fraction 

962 callback_time_sec *= 1 + (self.jitter * (random.random() - 0.5)) 

963 if self._next_timeout <= current_time: 

964 # The period should be measured from the start of one call 

965 # to the start of the next. If one call takes too long, 

966 # skip cycles to get back to a multiple of the original 

967 # schedule. 

968 self._next_timeout += ( 

969 math.floor((current_time - self._next_timeout) / callback_time_sec) + 1 

970 ) * callback_time_sec 

971 else: 

972 # If the clock moved backwards, ensure we advance the next 

973 # timeout instead of recomputing the same value again. 

974 # This may result in long gaps between callbacks if the 

975 # clock jumps backwards by a lot, but the far more common 

976 # scenario is a small NTP adjustment that should just be 

977 # ignored. 

978 # 

979 # Note that on some systems if time.time() runs slower 

980 # than time.monotonic() (most common on windows), we 

981 # effectively experience a small backwards time jump on 

982 # every iteration because PeriodicCallback uses 

983 # time.time() while asyncio schedules callbacks using 

984 # time.monotonic(). 

985 # https://github.com/tornadoweb/tornado/issues/2333 

986 self._next_timeout += callback_time_sec