Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/ioloop.py: 35%

275 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-10 06:20 +0000

1# 

2# Copyright 2009 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""An I/O event loop for non-blocking sockets. 

17 

18In Tornado 6.0, `.IOLoop` is a wrapper around the `asyncio` event loop, with a 

19slightly different interface. The `.IOLoop` interface is now provided primarily 

20for backwards compatibility; new code should generally use the `asyncio` event 

21loop interface directly. The `IOLoop.current` class method provides the 

22`IOLoop` instance corresponding to the running `asyncio` event loop. 

23 

24""" 

25 

26import asyncio 

27import concurrent.futures 

28import datetime 

29import functools 

30import numbers 

31import os 

32import sys 

33import time 

34import math 

35import random 

36import warnings 

37from inspect import isawaitable 

38 

39from tornado.concurrent import ( 

40 Future, 

41 is_future, 

42 chain_future, 

43 future_set_exc_info, 

44 future_add_done_callback, 

45) 

46from tornado.log import app_log 

47from tornado.util import Configurable, TimeoutError, import_object 

48 

49import typing 

50from typing import Union, Any, Type, Optional, Callable, TypeVar, Tuple, Awaitable 

51 

52if typing.TYPE_CHECKING: 

53 from typing import Dict, List # noqa: F401 

54 

55 from typing_extensions import Protocol 

56else: 

57 Protocol = object 

58 

59 

60class _Selectable(Protocol): 

61 def fileno(self) -> int: 

62 pass 

63 

64 def close(self) -> None: 

65 pass 

66 

67 

68_T = TypeVar("_T") 

69_S = TypeVar("_S", bound=_Selectable) 

70 

71 

72class IOLoop(Configurable): 

73 """An I/O event loop. 

74 

75 As of Tornado 6.0, `IOLoop` is a wrapper around the `asyncio` event loop. 

76 

77 Example usage for a simple TCP server: 

78 

79 .. testcode:: 

80 

81 import asyncio 

82 import errno 

83 import functools 

84 import socket 

85 

86 import tornado.ioloop 

87 from tornado.iostream import IOStream 

88 

89 async def handle_connection(connection, address): 

90 stream = IOStream(connection) 

91 message = await stream.read_until_close() 

92 print("message from client:", message.decode().strip()) 

93 

94 def connection_ready(sock, fd, events): 

95 while True: 

96 try: 

97 connection, address = sock.accept() 

98 except BlockingIOError: 

99 return 

100 connection.setblocking(0) 

101 io_loop = tornado.ioloop.IOLoop.current() 

102 io_loop.spawn_callback(handle_connection, connection, address) 

103 

104 async def main(): 

105 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 

106 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

107 sock.setblocking(0) 

108 sock.bind(("", 8888)) 

109 sock.listen(128) 

110 

111 io_loop = tornado.ioloop.IOLoop.current() 

112 callback = functools.partial(connection_ready, sock) 

113 io_loop.add_handler(sock.fileno(), callback, io_loop.READ) 

114 await asyncio.Event().wait() 

115 

116 if __name__ == "__main__": 

117 asyncio.run(main()) 

118 

119 .. testoutput:: 

120 :hide: 

121 

122 Most applications should not attempt to construct an `IOLoop` directly, 

123 and instead initialize the `asyncio` event loop and use `IOLoop.current()`. 

124 In some cases, such as in test frameworks when initializing an `IOLoop` 

125 to be run in a secondary thread, it may be appropriate to construct 

126 an `IOLoop` with ``IOLoop(make_current=False)``. Constructing an `IOLoop` 

127 without the ``make_current=False`` argument is deprecated since Tornado 6.2. 

128 

129 In general, an `IOLoop` cannot survive a fork or be shared across processes 

130 in any way. When multiple processes are being used, each process should 

131 create its own `IOLoop`, which also implies that any objects which depend on 

132 the `IOLoop` (such as `.AsyncHTTPClient`) must also be created in the child 

133 processes. As a guideline, anything that starts processes (including the 

134 `tornado.process` and `multiprocessing` modules) should do so as early as 

135 possible, ideally the first thing the application does after loading its 

136 configuration, and *before* any calls to `.IOLoop.start` or `asyncio.run`. 

137 

138 .. versionchanged:: 4.2 

139 Added the ``make_current`` keyword argument to the `IOLoop` 

140 constructor. 

141 

142 .. versionchanged:: 5.0 

143 

144 Uses the `asyncio` event loop by default. The ``IOLoop.configure`` method 

145 cannot be used on Python 3 except to redundantly specify the `asyncio` 

146 event loop. 

147 

148 .. deprecated:: 6.2 

149 It is deprecated to create an event loop that is "current" but not 

150 running. This means it is deprecated to pass 

151 ``make_current=True`` to the ``IOLoop`` constructor, or to create 

152 an ``IOLoop`` while no asyncio event loop is running unless 

153 ``make_current=False`` is used. 

154 """ 

155 

156 # These constants were originally based on constants from the epoll module. 

157 NONE = 0 

158 READ = 0x001 

159 WRITE = 0x004 

160 ERROR = 0x018 

161 

162 # In Python 3, _ioloop_for_asyncio maps from asyncio loops to IOLoops. 

163 _ioloop_for_asyncio = dict() # type: Dict[asyncio.AbstractEventLoop, IOLoop] 

164 

165 @classmethod 

166 def configure( 

167 cls, impl: "Union[None, str, Type[Configurable]]", **kwargs: Any 

168 ) -> None: 

169 from tornado.platform.asyncio import BaseAsyncIOLoop 

170 

171 if isinstance(impl, str): 

172 impl = import_object(impl) 

173 if isinstance(impl, type) and not issubclass(impl, BaseAsyncIOLoop): 

174 raise RuntimeError("only AsyncIOLoop is allowed when asyncio is available") 

175 super(IOLoop, cls).configure(impl, **kwargs) 

176 

177 @staticmethod 

178 def instance() -> "IOLoop": 

179 """Deprecated alias for `IOLoop.current()`. 

180 

181 .. versionchanged:: 5.0 

182 

183 Previously, this method returned a global singleton 

184 `IOLoop`, in contrast with the per-thread `IOLoop` returned 

185 by `current()`. In nearly all cases the two were the same 

186 (when they differed, it was generally used from non-Tornado 

187 threads to communicate back to the main thread's `IOLoop`). 

188 This distinction is not present in `asyncio`, so in order 

189 to facilitate integration with that package `instance()` 

190 was changed to be an alias to `current()`. Applications 

191 using the cross-thread communications aspect of 

192 `instance()` should instead set their own global variable 

193 to point to the `IOLoop` they want to use. 

194 

195 .. deprecated:: 5.0 

196 """ 

197 return IOLoop.current() 

198 

199 def install(self) -> None: 

200 """Deprecated alias for `make_current()`. 

201 

202 .. versionchanged:: 5.0 

203 

204 Previously, this method would set this `IOLoop` as the 

205 global singleton used by `IOLoop.instance()`. Now that 

206 `instance()` is an alias for `current()`, `install()` 

207 is an alias for `make_current()`. 

208 

209 .. deprecated:: 5.0 

210 """ 

211 self.make_current() 

212 

213 @staticmethod 

214 def clear_instance() -> None: 

215 """Deprecated alias for `clear_current()`. 

216 

217 .. versionchanged:: 5.0 

218 

219 Previously, this method would clear the `IOLoop` used as 

220 the global singleton by `IOLoop.instance()`. Now that 

221 `instance()` is an alias for `current()`, 

222 `clear_instance()` is an alias for `clear_current()`. 

223 

224 .. deprecated:: 5.0 

225 

226 """ 

227 IOLoop.clear_current() 

228 

229 @typing.overload 

230 @staticmethod 

231 def current() -> "IOLoop": 

232 pass 

233 

234 @typing.overload 

235 @staticmethod 

236 def current(instance: bool = True) -> Optional["IOLoop"]: # noqa: F811 

237 pass 

238 

239 @staticmethod 

240 def current(instance: bool = True) -> Optional["IOLoop"]: # noqa: F811 

241 """Returns the current thread's `IOLoop`. 

242 

243 If an `IOLoop` is currently running or has been marked as 

244 current by `make_current`, returns that instance. If there is 

245 no current `IOLoop` and ``instance`` is true, creates one. 

246 

247 .. versionchanged:: 4.1 

248 Added ``instance`` argument to control the fallback to 

249 `IOLoop.instance()`. 

250 .. versionchanged:: 5.0 

251 On Python 3, control of the current `IOLoop` is delegated 

252 to `asyncio`, with this and other methods as pass-through accessors. 

253 The ``instance`` argument now controls whether an `IOLoop` 

254 is created automatically when there is none, instead of 

255 whether we fall back to `IOLoop.instance()` (which is now 

256 an alias for this method). ``instance=False`` is deprecated, 

257 since even if we do not create an `IOLoop`, this method 

258 may initialize the asyncio loop. 

259 

260 .. deprecated:: 6.2 

261 It is deprecated to call ``IOLoop.current()`` when no `asyncio` 

262 event loop is running. 

263 """ 

264 try: 

265 loop = asyncio.get_event_loop() 

266 except (RuntimeError, AssertionError): 

267 if not instance: 

268 return None 

269 raise 

270 try: 

271 return IOLoop._ioloop_for_asyncio[loop] 

272 except KeyError: 

273 if instance: 

274 from tornado.platform.asyncio import AsyncIOMainLoop 

275 

276 current = AsyncIOMainLoop(make_current=True) # type: Optional[IOLoop] 

277 else: 

278 current = None 

279 return current 

280 

281 def make_current(self) -> None: 

282 """Makes this the `IOLoop` for the current thread. 

283 

284 An `IOLoop` automatically becomes current for its thread 

285 when it is started, but it is sometimes useful to call 

286 `make_current` explicitly before starting the `IOLoop`, 

287 so that code run at startup time can find the right 

288 instance. 

289 

290 .. versionchanged:: 4.1 

291 An `IOLoop` created while there is no current `IOLoop` 

292 will automatically become current. 

293 

294 .. versionchanged:: 5.0 

295 This method also sets the current `asyncio` event loop. 

296 

297 .. deprecated:: 6.2 

298 The concept of an event loop that is "current" without 

299 currently running is deprecated in asyncio since Python 

300 3.10. All related functionality in Tornado is also 

301 deprecated. Instead, start the event loop with `asyncio.run` 

302 before interacting with it. 

303 """ 

304 # The asyncio event loops override this method. 

305 raise NotImplementedError() 

306 

307 @staticmethod 

308 def clear_current() -> None: 

309 """Clears the `IOLoop` for the current thread. 

310 

311 Intended primarily for use by test frameworks in between tests. 

312 

313 .. versionchanged:: 5.0 

314 This method also clears the current `asyncio` event loop. 

315 .. deprecated:: 6.2 

316 """ 

317 warnings.warn( 

318 "clear_current is deprecated", 

319 DeprecationWarning, 

320 stacklevel=2, 

321 ) 

322 IOLoop._clear_current() 

323 

324 @staticmethod 

325 def _clear_current() -> None: 

326 old = IOLoop.current(instance=False) 

327 if old is not None: 

328 old._clear_current_hook() 

329 

330 def _clear_current_hook(self) -> None: 

331 """Instance method called when an IOLoop ceases to be current. 

332 

333 May be overridden by subclasses as a counterpart to make_current. 

334 """ 

335 pass 

336 

337 @classmethod 

338 def configurable_base(cls) -> Type[Configurable]: 

339 return IOLoop 

340 

341 @classmethod 

342 def configurable_default(cls) -> Type[Configurable]: 

343 from tornado.platform.asyncio import AsyncIOLoop 

344 

345 return AsyncIOLoop 

346 

347 def initialize(self, make_current: Optional[bool] = None) -> None: 

348 if make_current is None: 

349 if IOLoop.current(instance=False) is None: 

350 self.make_current() 

351 elif make_current: 

352 current = IOLoop.current(instance=False) 

353 # AsyncIO loops can already be current by this point. 

354 if current is not None and current is not self: 

355 raise RuntimeError("current IOLoop already exists") 

356 self.make_current() 

357 

358 def close(self, all_fds: bool = False) -> None: 

359 """Closes the `IOLoop`, freeing any resources used. 

360 

361 If ``all_fds`` is true, all file descriptors registered on the 

362 IOLoop will be closed (not just the ones created by the 

363 `IOLoop` itself). 

364 

365 Many applications will only use a single `IOLoop` that runs for the 

366 entire lifetime of the process. In that case closing the `IOLoop` 

367 is not necessary since everything will be cleaned up when the 

368 process exits. `IOLoop.close` is provided mainly for scenarios 

369 such as unit tests, which create and destroy a large number of 

370 ``IOLoops``. 

371 

372 An `IOLoop` must be completely stopped before it can be closed. This 

373 means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must 

374 be allowed to return before attempting to call `IOLoop.close()`. 

375 Therefore the call to `close` will usually appear just after 

376 the call to `start` rather than near the call to `stop`. 

377 

378 .. versionchanged:: 3.1 

379 If the `IOLoop` implementation supports non-integer objects 

380 for "file descriptors", those objects will have their 

381 ``close`` method when ``all_fds`` is true. 

382 """ 

383 raise NotImplementedError() 

384 

385 @typing.overload 

386 def add_handler( 

387 self, fd: int, handler: Callable[[int, int], None], events: int 

388 ) -> None: 

389 pass 

390 

391 @typing.overload # noqa: F811 

392 def add_handler( 

393 self, fd: _S, handler: Callable[[_S, int], None], events: int 

394 ) -> None: 

395 pass 

396 

397 def add_handler( # noqa: F811 

398 self, fd: Union[int, _Selectable], handler: Callable[..., None], events: int 

399 ) -> None: 

400 """Registers the given handler to receive the given events for ``fd``. 

401 

402 The ``fd`` argument may either be an integer file descriptor or 

403 a file-like object with a ``fileno()`` and ``close()`` method. 

404 

405 The ``events`` argument is a bitwise or of the constants 

406 ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``. 

407 

408 When an event occurs, ``handler(fd, events)`` will be run. 

409 

410 .. versionchanged:: 4.0 

411 Added the ability to pass file-like objects in addition to 

412 raw file descriptors. 

413 """ 

414 raise NotImplementedError() 

415 

416 def update_handler(self, fd: Union[int, _Selectable], events: int) -> None: 

417 """Changes the events we listen for ``fd``. 

418 

419 .. versionchanged:: 4.0 

420 Added the ability to pass file-like objects in addition to 

421 raw file descriptors. 

422 """ 

423 raise NotImplementedError() 

424 

425 def remove_handler(self, fd: Union[int, _Selectable]) -> None: 

426 """Stop listening for events on ``fd``. 

427 

428 .. versionchanged:: 4.0 

429 Added the ability to pass file-like objects in addition to 

430 raw file descriptors. 

431 """ 

432 raise NotImplementedError() 

433 

434 def start(self) -> None: 

435 """Starts the I/O loop. 

436 

437 The loop will run until one of the callbacks calls `stop()`, which 

438 will make the loop stop after the current event iteration completes. 

439 """ 

440 raise NotImplementedError() 

441 

442 def stop(self) -> None: 

443 """Stop the I/O loop. 

444 

445 If the event loop is not currently running, the next call to `start()` 

446 will return immediately. 

447 

448 Note that even after `stop` has been called, the `IOLoop` is not 

449 completely stopped until `IOLoop.start` has also returned. 

450 Some work that was scheduled before the call to `stop` may still 

451 be run before the `IOLoop` shuts down. 

452 """ 

453 raise NotImplementedError() 

454 

455 def run_sync(self, func: Callable, timeout: Optional[float] = None) -> Any: 

456 """Starts the `IOLoop`, runs the given function, and stops the loop. 

457 

458 The function must return either an awaitable object or 

459 ``None``. If the function returns an awaitable object, the 

460 `IOLoop` will run until the awaitable is resolved (and 

461 `run_sync()` will return the awaitable's result). If it raises 

462 an exception, the `IOLoop` will stop and the exception will be 

463 re-raised to the caller. 

464 

465 The keyword-only argument ``timeout`` may be used to set 

466 a maximum duration for the function. If the timeout expires, 

467 a `asyncio.TimeoutError` is raised. 

468 

469 This method is useful to allow asynchronous calls in a 

470 ``main()`` function:: 

471 

472 async def main(): 

473 # do stuff... 

474 

475 if __name__ == '__main__': 

476 IOLoop.current().run_sync(main) 

477 

478 .. versionchanged:: 4.3 

479 Returning a non-``None``, non-awaitable value is now an error. 

480 

481 .. versionchanged:: 5.0 

482 If a timeout occurs, the ``func`` coroutine will be cancelled. 

483 

484 .. versionchanged:: 6.2 

485 ``tornado.util.TimeoutError`` is now an alias to ``asyncio.TimeoutError``. 

486 """ 

487 future_cell = [None] # type: List[Optional[Future]] 

488 

489 def run() -> None: 

490 try: 

491 result = func() 

492 if result is not None: 

493 from tornado.gen import convert_yielded 

494 

495 result = convert_yielded(result) 

496 except Exception: 

497 fut = Future() # type: Future[Any] 

498 future_cell[0] = fut 

499 future_set_exc_info(fut, sys.exc_info()) 

500 else: 

501 if is_future(result): 

502 future_cell[0] = result 

503 else: 

504 fut = Future() 

505 future_cell[0] = fut 

506 fut.set_result(result) 

507 assert future_cell[0] is not None 

508 self.add_future(future_cell[0], lambda future: self.stop()) 

509 

510 self.add_callback(run) 

511 if timeout is not None: 

512 

513 def timeout_callback() -> None: 

514 # If we can cancel the future, do so and wait on it. If not, 

515 # Just stop the loop and return with the task still pending. 

516 # (If we neither cancel nor wait for the task, a warning 

517 # will be logged). 

518 assert future_cell[0] is not None 

519 if not future_cell[0].cancel(): 

520 self.stop() 

521 

522 timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback) 

523 self.start() 

524 if timeout is not None: 

525 self.remove_timeout(timeout_handle) 

526 assert future_cell[0] is not None 

527 if future_cell[0].cancelled() or not future_cell[0].done(): 

528 raise TimeoutError("Operation timed out after %s seconds" % timeout) 

529 return future_cell[0].result() 

530 

531 def time(self) -> float: 

532 """Returns the current time according to the `IOLoop`'s clock. 

533 

534 The return value is a floating-point number relative to an 

535 unspecified time in the past. 

536 

537 Historically, the IOLoop could be customized to use e.g. 

538 `time.monotonic` instead of `time.time`, but this is not 

539 currently supported and so this method is equivalent to 

540 `time.time`. 

541 

542 """ 

543 return time.time() 

544 

545 def add_timeout( 

546 self, 

547 deadline: Union[float, datetime.timedelta], 

548 callback: Callable, 

549 *args: Any, 

550 **kwargs: Any 

551 ) -> object: 

552 """Runs the ``callback`` at the time ``deadline`` from the I/O loop. 

553 

554 Returns an opaque handle that may be passed to 

555 `remove_timeout` to cancel. 

556 

557 ``deadline`` may be a number denoting a time (on the same 

558 scale as `IOLoop.time`, normally `time.time`), or a 

559 `datetime.timedelta` object for a deadline relative to the 

560 current time. Since Tornado 4.0, `call_later` is a more 

561 convenient alternative for the relative case since it does not 

562 require a timedelta object. 

563 

564 Note that it is not safe to call `add_timeout` from other threads. 

565 Instead, you must use `add_callback` to transfer control to the 

566 `IOLoop`'s thread, and then call `add_timeout` from there. 

567 

568 Subclasses of IOLoop must implement either `add_timeout` or 

569 `call_at`; the default implementations of each will call 

570 the other. `call_at` is usually easier to implement, but 

571 subclasses that wish to maintain compatibility with Tornado 

572 versions prior to 4.0 must use `add_timeout` instead. 

573 

574 .. versionchanged:: 4.0 

575 Now passes through ``*args`` and ``**kwargs`` to the callback. 

576 """ 

577 if isinstance(deadline, numbers.Real): 

578 return self.call_at(deadline, callback, *args, **kwargs) 

579 elif isinstance(deadline, datetime.timedelta): 

580 return self.call_at( 

581 self.time() + deadline.total_seconds(), callback, *args, **kwargs 

582 ) 

583 else: 

584 raise TypeError("Unsupported deadline %r" % deadline) 

585 

586 def call_later( 

587 self, delay: float, callback: Callable, *args: Any, **kwargs: Any 

588 ) -> object: 

589 """Runs the ``callback`` after ``delay`` seconds have passed. 

590 

591 Returns an opaque handle that may be passed to `remove_timeout` 

592 to cancel. Note that unlike the `asyncio` method of the same 

593 name, the returned object does not have a ``cancel()`` method. 

594 

595 See `add_timeout` for comments on thread-safety and subclassing. 

596 

597 .. versionadded:: 4.0 

598 """ 

599 return self.call_at(self.time() + delay, callback, *args, **kwargs) 

600 

601 def call_at( 

602 self, when: float, callback: Callable, *args: Any, **kwargs: Any 

603 ) -> object: 

604 """Runs the ``callback`` at the absolute time designated by ``when``. 

605 

606 ``when`` must be a number using the same reference point as 

607 `IOLoop.time`. 

608 

609 Returns an opaque handle that may be passed to `remove_timeout` 

610 to cancel. Note that unlike the `asyncio` method of the same 

611 name, the returned object does not have a ``cancel()`` method. 

612 

613 See `add_timeout` for comments on thread-safety and subclassing. 

614 

615 .. versionadded:: 4.0 

616 """ 

617 return self.add_timeout(when, callback, *args, **kwargs) 

618 

619 def remove_timeout(self, timeout: object) -> None: 

620 """Cancels a pending timeout. 

621 

622 The argument is a handle as returned by `add_timeout`. It is 

623 safe to call `remove_timeout` even if the callback has already 

624 been run. 

625 """ 

626 raise NotImplementedError() 

627 

628 def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None: 

629 """Calls the given callback on the next I/O loop iteration. 

630 

631 It is safe to call this method from any thread at any time, 

632 except from a signal handler. Note that this is the **only** 

633 method in `IOLoop` that makes this thread-safety guarantee; all 

634 other interaction with the `IOLoop` must be done from that 

635 `IOLoop`'s thread. `add_callback()` may be used to transfer 

636 control from other threads to the `IOLoop`'s thread. 

637 

638 To add a callback from a signal handler, see 

639 `add_callback_from_signal`. 

640 """ 

641 raise NotImplementedError() 

642 

643 def add_callback_from_signal( 

644 self, callback: Callable, *args: Any, **kwargs: Any 

645 ) -> None: 

646 """Calls the given callback on the next I/O loop iteration. 

647 

648 Safe for use from a Python signal handler; should not be used 

649 otherwise. 

650 """ 

651 raise NotImplementedError() 

652 

653 def spawn_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None: 

654 """Calls the given callback on the next IOLoop iteration. 

655 

656 As of Tornado 6.0, this method is equivalent to `add_callback`. 

657 

658 .. versionadded:: 4.0 

659 """ 

660 self.add_callback(callback, *args, **kwargs) 

661 

662 def add_future( 

663 self, 

664 future: "Union[Future[_T], concurrent.futures.Future[_T]]", 

665 callback: Callable[["Future[_T]"], None], 

666 ) -> None: 

667 """Schedules a callback on the ``IOLoop`` when the given 

668 `.Future` is finished. 

669 

670 The callback is invoked with one argument, the 

671 `.Future`. 

672 

673 This method only accepts `.Future` objects and not other 

674 awaitables (unlike most of Tornado where the two are 

675 interchangeable). 

676 """ 

677 if isinstance(future, Future): 

678 # Note that we specifically do not want the inline behavior of 

679 # tornado.concurrent.future_add_done_callback. We always want 

680 # this callback scheduled on the next IOLoop iteration (which 

681 # asyncio.Future always does). 

682 # 

683 # Wrap the callback in self._run_callback so we control 

684 # the error logging (i.e. it goes to tornado.log.app_log 

685 # instead of asyncio's log). 

686 future.add_done_callback( 

687 lambda f: self._run_callback(functools.partial(callback, future)) 

688 ) 

689 else: 

690 assert is_future(future) 

691 # For concurrent futures, we use self.add_callback, so 

692 # it's fine if future_add_done_callback inlines that call. 

693 future_add_done_callback( 

694 future, lambda f: self.add_callback(callback, future) 

695 ) 

696 

697 def run_in_executor( 

698 self, 

699 executor: Optional[concurrent.futures.Executor], 

700 func: Callable[..., _T], 

701 *args: Any 

702 ) -> Awaitable[_T]: 

703 """Runs a function in a ``concurrent.futures.Executor``. If 

704 ``executor`` is ``None``, the IO loop's default executor will be used. 

705 

706 Use `functools.partial` to pass keyword arguments to ``func``. 

707 

708 .. versionadded:: 5.0 

709 """ 

710 if executor is None: 

711 if not hasattr(self, "_executor"): 

712 from tornado.process import cpu_count 

713 

714 self._executor = concurrent.futures.ThreadPoolExecutor( 

715 max_workers=(cpu_count() * 5) 

716 ) # type: concurrent.futures.Executor 

717 executor = self._executor 

718 c_future = executor.submit(func, *args) 

719 # Concurrent Futures are not usable with await. Wrap this in a 

720 # Tornado Future instead, using self.add_future for thread-safety. 

721 t_future = Future() # type: Future[_T] 

722 self.add_future(c_future, lambda f: chain_future(f, t_future)) 

723 return t_future 

724 

725 def set_default_executor(self, executor: concurrent.futures.Executor) -> None: 

726 """Sets the default executor to use with :meth:`run_in_executor`. 

727 

728 .. versionadded:: 5.0 

729 """ 

730 self._executor = executor 

731 

732 def _run_callback(self, callback: Callable[[], Any]) -> None: 

733 """Runs a callback with error handling. 

734 

735 .. versionchanged:: 6.0 

736 

737 CancelledErrors are no longer logged. 

738 """ 

739 try: 

740 ret = callback() 

741 if ret is not None: 

742 from tornado import gen 

743 

744 # Functions that return Futures typically swallow all 

745 # exceptions and store them in the Future. If a Future 

746 # makes it out to the IOLoop, ensure its exception (if any) 

747 # gets logged too. 

748 try: 

749 ret = gen.convert_yielded(ret) 

750 except gen.BadYieldError: 

751 # It's not unusual for add_callback to be used with 

752 # methods returning a non-None and non-yieldable 

753 # result, which should just be ignored. 

754 pass 

755 else: 

756 self.add_future(ret, self._discard_future_result) 

757 except asyncio.CancelledError: 

758 pass 

759 except Exception: 

760 app_log.error("Exception in callback %r", callback, exc_info=True) 

761 

762 def _discard_future_result(self, future: Future) -> None: 

763 """Avoid unhandled-exception warnings from spawned coroutines.""" 

764 future.result() 

765 

766 def split_fd( 

767 self, fd: Union[int, _Selectable] 

768 ) -> Tuple[int, Union[int, _Selectable]]: 

769 # """Returns an (fd, obj) pair from an ``fd`` parameter. 

770 

771 # We accept both raw file descriptors and file-like objects as 

772 # input to `add_handler` and related methods. When a file-like 

773 # object is passed, we must retain the object itself so we can 

774 # close it correctly when the `IOLoop` shuts down, but the 

775 # poller interfaces favor file descriptors (they will accept 

776 # file-like objects and call ``fileno()`` for you, but they 

777 # always return the descriptor itself). 

778 

779 # This method is provided for use by `IOLoop` subclasses and should 

780 # not generally be used by application code. 

781 

782 # .. versionadded:: 4.0 

783 # """ 

784 if isinstance(fd, int): 

785 return fd, fd 

786 return fd.fileno(), fd 

787 

788 def close_fd(self, fd: Union[int, _Selectable]) -> None: 

789 # """Utility method to close an ``fd``. 

790 

791 # If ``fd`` is a file-like object, we close it directly; otherwise 

792 # we use `os.close`. 

793 

794 # This method is provided for use by `IOLoop` subclasses (in 

795 # implementations of ``IOLoop.close(all_fds=True)`` and should 

796 # not generally be used by application code. 

797 

798 # .. versionadded:: 4.0 

799 # """ 

800 try: 

801 if isinstance(fd, int): 

802 os.close(fd) 

803 else: 

804 fd.close() 

805 except OSError: 

806 pass 

807 

808 

809class _Timeout(object): 

810 """An IOLoop timeout, a UNIX timestamp and a callback""" 

811 

812 # Reduce memory overhead when there are lots of pending callbacks 

813 __slots__ = ["deadline", "callback", "tdeadline"] 

814 

815 def __init__( 

816 self, deadline: float, callback: Callable[[], None], io_loop: IOLoop 

817 ) -> None: 

818 if not isinstance(deadline, numbers.Real): 

819 raise TypeError("Unsupported deadline %r" % deadline) 

820 self.deadline = deadline 

821 self.callback = callback 

822 self.tdeadline = ( 

823 deadline, 

824 next(io_loop._timeout_counter), 

825 ) # type: Tuple[float, int] 

826 

827 # Comparison methods to sort by deadline, with object id as a tiebreaker 

828 # to guarantee a consistent ordering. The heapq module uses __le__ 

829 # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons 

830 # use __lt__). 

831 def __lt__(self, other: "_Timeout") -> bool: 

832 return self.tdeadline < other.tdeadline 

833 

834 def __le__(self, other: "_Timeout") -> bool: 

835 return self.tdeadline <= other.tdeadline 

836 

837 

838class PeriodicCallback(object): 

839 """Schedules the given callback to be called periodically. 

840 

841 The callback is called every ``callback_time`` milliseconds when 

842 ``callback_time`` is a float. Note that the timeout is given in 

843 milliseconds, while most other time-related functions in Tornado use 

844 seconds. ``callback_time`` may alternatively be given as a 

845 `datetime.timedelta` object. 

846 

847 If ``jitter`` is specified, each callback time will be randomly selected 

848 within a window of ``jitter * callback_time`` milliseconds. 

849 Jitter can be used to reduce alignment of events with similar periods. 

850 A jitter of 0.1 means allowing a 10% variation in callback time. 

851 The window is centered on ``callback_time`` so the total number of calls 

852 within a given interval should not be significantly affected by adding 

853 jitter. 

854 

855 If the callback runs for longer than ``callback_time`` milliseconds, 

856 subsequent invocations will be skipped to get back on schedule. 

857 

858 `start` must be called after the `PeriodicCallback` is created. 

859 

860 .. versionchanged:: 5.0 

861 The ``io_loop`` argument (deprecated since version 4.1) has been removed. 

862 

863 .. versionchanged:: 5.1 

864 The ``jitter`` argument is added. 

865 

866 .. versionchanged:: 6.2 

867 If the ``callback`` argument is a coroutine, and a callback runs for 

868 longer than ``callback_time``, subsequent invocations will be skipped. 

869 Previously this was only true for regular functions, not coroutines, 

870 which were "fire-and-forget" for `PeriodicCallback`. 

871 

872 The ``callback_time`` argument now accepts `datetime.timedelta` objects, 

873 in addition to the previous numeric milliseconds. 

874 """ 

875 

876 def __init__( 

877 self, 

878 callback: Callable[[], Optional[Awaitable]], 

879 callback_time: Union[datetime.timedelta, float], 

880 jitter: float = 0, 

881 ) -> None: 

882 self.callback = callback 

883 if isinstance(callback_time, datetime.timedelta): 

884 self.callback_time = callback_time / datetime.timedelta(milliseconds=1) 

885 else: 

886 if callback_time <= 0: 

887 raise ValueError("Periodic callback must have a positive callback_time") 

888 self.callback_time = callback_time 

889 self.jitter = jitter 

890 self._running = False 

891 self._timeout = None # type: object 

892 

893 def start(self) -> None: 

894 """Starts the timer.""" 

895 # Looking up the IOLoop here allows to first instantiate the 

896 # PeriodicCallback in another thread, then start it using 

897 # IOLoop.add_callback(). 

898 self.io_loop = IOLoop.current() 

899 self._running = True 

900 self._next_timeout = self.io_loop.time() 

901 self._schedule_next() 

902 

903 def stop(self) -> None: 

904 """Stops the timer.""" 

905 self._running = False 

906 if self._timeout is not None: 

907 self.io_loop.remove_timeout(self._timeout) 

908 self._timeout = None 

909 

910 def is_running(self) -> bool: 

911 """Returns ``True`` if this `.PeriodicCallback` has been started. 

912 

913 .. versionadded:: 4.1 

914 """ 

915 return self._running 

916 

917 async def _run(self) -> None: 

918 if not self._running: 

919 return 

920 try: 

921 val = self.callback() 

922 if val is not None and isawaitable(val): 

923 await val 

924 except Exception: 

925 app_log.error("Exception in callback %r", self.callback, exc_info=True) 

926 finally: 

927 self._schedule_next() 

928 

929 def _schedule_next(self) -> None: 

930 if self._running: 

931 self._update_next(self.io_loop.time()) 

932 self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run) 

933 

934 def _update_next(self, current_time: float) -> None: 

935 callback_time_sec = self.callback_time / 1000.0 

936 if self.jitter: 

937 # apply jitter fraction 

938 callback_time_sec *= 1 + (self.jitter * (random.random() - 0.5)) 

939 if self._next_timeout <= current_time: 

940 # The period should be measured from the start of one call 

941 # to the start of the next. If one call takes too long, 

942 # skip cycles to get back to a multiple of the original 

943 # schedule. 

944 self._next_timeout += ( 

945 math.floor((current_time - self._next_timeout) / callback_time_sec) + 1 

946 ) * callback_time_sec 

947 else: 

948 # If the clock moved backwards, ensure we advance the next 

949 # timeout instead of recomputing the same value again. 

950 # This may result in long gaps between callbacks if the 

951 # clock jumps backwards by a lot, but the far more common 

952 # scenario is a small NTP adjustment that should just be 

953 # ignored. 

954 # 

955 # Note that on some systems if time.time() runs slower 

956 # than time.monotonic() (most common on windows), we 

957 # effectively experience a small backwards time jump on 

958 # every iteration because PeriodicCallback uses 

959 # time.time() while asyncio schedules callbacks using 

960 # time.monotonic(). 

961 # https://github.com/tornadoweb/tornado/issues/2333 

962 self._next_timeout += callback_time_sec