Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/ioloop.py: 36%

273 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1# 

2# Copyright 2009 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""An I/O event loop for non-blocking sockets. 

17 

18In Tornado 6.0, `.IOLoop` is a wrapper around the `asyncio` event loop, with a 

19slightly different interface. The `.IOLoop` interface is now provided primarily 

20for backwards compatibility; new code should generally use the `asyncio` event 

21loop interface directly. The `IOLoop.current` class method provides the 

22`IOLoop` instance corresponding to the running `asyncio` event loop. 

23 

24""" 

25 

26import asyncio 

27import concurrent.futures 

28import datetime 

29import functools 

30import numbers 

31import os 

32import sys 

33import time 

34import math 

35import random 

36import warnings 

37from inspect import isawaitable 

38 

39from tornado.concurrent import ( 

40 Future, 

41 is_future, 

42 chain_future, 

43 future_set_exc_info, 

44 future_add_done_callback, 

45) 

46from tornado.log import app_log 

47from tornado.util import Configurable, TimeoutError, import_object 

48 

49import typing 

50from typing import Union, Any, Type, Optional, Callable, TypeVar, Tuple, Awaitable 

51 

52if typing.TYPE_CHECKING: 

53 from typing import Dict, List # noqa: F401 

54 

55 from typing_extensions import Protocol 

56else: 

57 Protocol = object 

58 

59 

60class _Selectable(Protocol): 

61 def fileno(self) -> int: 

62 pass 

63 

64 def close(self) -> None: 

65 pass 

66 

67 

68_T = TypeVar("_T") 

69_S = TypeVar("_S", bound=_Selectable) 

70 

71 

72class IOLoop(Configurable): 

73 """An I/O event loop. 

74 

75 As of Tornado 6.0, `IOLoop` is a wrapper around the `asyncio` event loop. 

76 

77 Example usage for a simple TCP server: 

78 

79 .. testcode:: 

80 

81 import asyncio 

82 import errno 

83 import functools 

84 import socket 

85 

86 import tornado 

87 from tornado.iostream import IOStream 

88 

89 async def handle_connection(connection, address): 

90 stream = IOStream(connection) 

91 message = await stream.read_until_close() 

92 print("message from client:", message.decode().strip()) 

93 

94 def connection_ready(sock, fd, events): 

95 while True: 

96 try: 

97 connection, address = sock.accept() 

98 except BlockingIOError: 

99 return 

100 connection.setblocking(0) 

101 io_loop = tornado.ioloop.IOLoop.current() 

102 io_loop.spawn_callback(handle_connection, connection, address) 

103 

104 async def main(): 

105 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 

106 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

107 sock.setblocking(0) 

108 sock.bind(("", 8888)) 

109 sock.listen(128) 

110 

111 io_loop = tornado.ioloop.IOLoop.current() 

112 callback = functools.partial(connection_ready, sock) 

113 io_loop.add_handler(sock.fileno(), callback, io_loop.READ) 

114 await asyncio.Event().wait() 

115 

116 if __name__ == "__main__": 

117 asyncio.run(main()) 

118 

119 .. testoutput:: 

120 :hide: 

121 

122 Most applications should not attempt to construct an `IOLoop` directly, 

123 and instead initialize the `asyncio` event loop and use `IOLoop.current()`. 

124 In some cases, such as in test frameworks when initializing an `IOLoop` 

125 to be run in a secondary thread, it may be appropriate to construct 

126 an `IOLoop` with ``IOLoop(make_current=False)``. 

127 

128 In general, an `IOLoop` cannot survive a fork or be shared across processes 

129 in any way. When multiple processes are being used, each process should 

130 create its own `IOLoop`, which also implies that any objects which depend on 

131 the `IOLoop` (such as `.AsyncHTTPClient`) must also be created in the child 

132 processes. As a guideline, anything that starts processes (including the 

133 `tornado.process` and `multiprocessing` modules) should do so as early as 

134 possible, ideally the first thing the application does after loading its 

135 configuration, and *before* any calls to `.IOLoop.start` or `asyncio.run`. 

136 

137 .. versionchanged:: 4.2 

138 Added the ``make_current`` keyword argument to the `IOLoop` 

139 constructor. 

140 

141 .. versionchanged:: 5.0 

142 

143 Uses the `asyncio` event loop by default. The ``IOLoop.configure`` method 

144 cannot be used on Python 3 except to redundantly specify the `asyncio` 

145 event loop. 

146 

147 .. versionchanged:: 6.3 

148 ``make_current=True`` is now the default when creating an IOLoop - 

149 previously the default was to make the event loop current if there wasn't 

150 already a current one. 

151 """ 

152 

153 # These constants were originally based on constants from the epoll module. 

154 NONE = 0 

155 READ = 0x001 

156 WRITE = 0x004 

157 ERROR = 0x018 

158 

159 # In Python 3, _ioloop_for_asyncio maps from asyncio loops to IOLoops. 

160 _ioloop_for_asyncio = dict() # type: Dict[asyncio.AbstractEventLoop, IOLoop] 

161 

162 @classmethod 

163 def configure( 

164 cls, impl: "Union[None, str, Type[Configurable]]", **kwargs: Any 

165 ) -> None: 

166 from tornado.platform.asyncio import BaseAsyncIOLoop 

167 

168 if isinstance(impl, str): 

169 impl = import_object(impl) 

170 if isinstance(impl, type) and not issubclass(impl, BaseAsyncIOLoop): 

171 raise RuntimeError("only AsyncIOLoop is allowed when asyncio is available") 

172 super(IOLoop, cls).configure(impl, **kwargs) 

173 

174 @staticmethod 

175 def instance() -> "IOLoop": 

176 """Deprecated alias for `IOLoop.current()`. 

177 

178 .. versionchanged:: 5.0 

179 

180 Previously, this method returned a global singleton 

181 `IOLoop`, in contrast with the per-thread `IOLoop` returned 

182 by `current()`. In nearly all cases the two were the same 

183 (when they differed, it was generally used from non-Tornado 

184 threads to communicate back to the main thread's `IOLoop`). 

185 This distinction is not present in `asyncio`, so in order 

186 to facilitate integration with that package `instance()` 

187 was changed to be an alias to `current()`. Applications 

188 using the cross-thread communications aspect of 

189 `instance()` should instead set their own global variable 

190 to point to the `IOLoop` they want to use. 

191 

192 .. deprecated:: 5.0 

193 """ 

194 return IOLoop.current() 

195 

196 def install(self) -> None: 

197 """Deprecated alias for `make_current()`. 

198 

199 .. versionchanged:: 5.0 

200 

201 Previously, this method would set this `IOLoop` as the 

202 global singleton used by `IOLoop.instance()`. Now that 

203 `instance()` is an alias for `current()`, `install()` 

204 is an alias for `make_current()`. 

205 

206 .. deprecated:: 5.0 

207 """ 

208 self.make_current() 

209 

210 @staticmethod 

211 def clear_instance() -> None: 

212 """Deprecated alias for `clear_current()`. 

213 

214 .. versionchanged:: 5.0 

215 

216 Previously, this method would clear the `IOLoop` used as 

217 the global singleton by `IOLoop.instance()`. Now that 

218 `instance()` is an alias for `current()`, 

219 `clear_instance()` is an alias for `clear_current()`. 

220 

221 .. deprecated:: 5.0 

222 

223 """ 

224 IOLoop.clear_current() 

225 

226 @typing.overload 

227 @staticmethod 

228 def current() -> "IOLoop": 

229 pass 

230 

231 @typing.overload 

232 @staticmethod 

233 def current(instance: bool = True) -> Optional["IOLoop"]: # noqa: F811 

234 pass 

235 

236 @staticmethod 

237 def current(instance: bool = True) -> Optional["IOLoop"]: # noqa: F811 

238 """Returns the current thread's `IOLoop`. 

239 

240 If an `IOLoop` is currently running or has been marked as 

241 current by `make_current`, returns that instance. If there is 

242 no current `IOLoop` and ``instance`` is true, creates one. 

243 

244 .. versionchanged:: 4.1 

245 Added ``instance`` argument to control the fallback to 

246 `IOLoop.instance()`. 

247 .. versionchanged:: 5.0 

248 On Python 3, control of the current `IOLoop` is delegated 

249 to `asyncio`, with this and other methods as pass-through accessors. 

250 The ``instance`` argument now controls whether an `IOLoop` 

251 is created automatically when there is none, instead of 

252 whether we fall back to `IOLoop.instance()` (which is now 

253 an alias for this method). ``instance=False`` is deprecated, 

254 since even if we do not create an `IOLoop`, this method 

255 may initialize the asyncio loop. 

256 

257 .. deprecated:: 6.2 

258 It is deprecated to call ``IOLoop.current()`` when no `asyncio` 

259 event loop is running. 

260 """ 

261 try: 

262 loop = asyncio.get_event_loop() 

263 except RuntimeError: 

264 if not instance: 

265 return None 

266 # Create a new asyncio event loop for this thread. 

267 loop = asyncio.new_event_loop() 

268 asyncio.set_event_loop(loop) 

269 

270 try: 

271 return IOLoop._ioloop_for_asyncio[loop] 

272 except KeyError: 

273 if instance: 

274 from tornado.platform.asyncio import AsyncIOMainLoop 

275 

276 current = AsyncIOMainLoop() # type: Optional[IOLoop] 

277 else: 

278 current = None 

279 return current 

280 

281 def make_current(self) -> None: 

282 """Makes this the `IOLoop` for the current thread. 

283 

284 An `IOLoop` automatically becomes current for its thread 

285 when it is started, but it is sometimes useful to call 

286 `make_current` explicitly before starting the `IOLoop`, 

287 so that code run at startup time can find the right 

288 instance. 

289 

290 .. versionchanged:: 4.1 

291 An `IOLoop` created while there is no current `IOLoop` 

292 will automatically become current. 

293 

294 .. versionchanged:: 5.0 

295 This method also sets the current `asyncio` event loop. 

296 

297 .. deprecated:: 6.2 

298 Setting and clearing the current event loop through Tornado is 

299 deprecated. Use ``asyncio.set_event_loop`` instead if you need this. 

300 """ 

301 warnings.warn( 

302 "make_current is deprecated; start the event loop first", 

303 DeprecationWarning, 

304 stacklevel=2, 

305 ) 

306 self._make_current() 

307 

308 def _make_current(self) -> None: 

309 # The asyncio event loops override this method. 

310 raise NotImplementedError() 

311 

312 @staticmethod 

313 def clear_current() -> None: 

314 """Clears the `IOLoop` for the current thread. 

315 

316 Intended primarily for use by test frameworks in between tests. 

317 

318 .. versionchanged:: 5.0 

319 This method also clears the current `asyncio` event loop. 

320 .. deprecated:: 6.2 

321 """ 

322 warnings.warn( 

323 "clear_current is deprecated", 

324 DeprecationWarning, 

325 stacklevel=2, 

326 ) 

327 IOLoop._clear_current() 

328 

329 @staticmethod 

330 def _clear_current() -> None: 

331 old = IOLoop.current(instance=False) 

332 if old is not None: 

333 old._clear_current_hook() 

334 

335 def _clear_current_hook(self) -> None: 

336 """Instance method called when an IOLoop ceases to be current. 

337 

338 May be overridden by subclasses as a counterpart to make_current. 

339 """ 

340 pass 

341 

342 @classmethod 

343 def configurable_base(cls) -> Type[Configurable]: 

344 return IOLoop 

345 

346 @classmethod 

347 def configurable_default(cls) -> Type[Configurable]: 

348 from tornado.platform.asyncio import AsyncIOLoop 

349 

350 return AsyncIOLoop 

351 

352 def initialize(self, make_current: bool = True) -> None: 

353 if make_current: 

354 self._make_current() 

355 

356 def close(self, all_fds: bool = False) -> None: 

357 """Closes the `IOLoop`, freeing any resources used. 

358 

359 If ``all_fds`` is true, all file descriptors registered on the 

360 IOLoop will be closed (not just the ones created by the 

361 `IOLoop` itself). 

362 

363 Many applications will only use a single `IOLoop` that runs for the 

364 entire lifetime of the process. In that case closing the `IOLoop` 

365 is not necessary since everything will be cleaned up when the 

366 process exits. `IOLoop.close` is provided mainly for scenarios 

367 such as unit tests, which create and destroy a large number of 

368 ``IOLoops``. 

369 

370 An `IOLoop` must be completely stopped before it can be closed. This 

371 means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must 

372 be allowed to return before attempting to call `IOLoop.close()`. 

373 Therefore the call to `close` will usually appear just after 

374 the call to `start` rather than near the call to `stop`. 

375 

376 .. versionchanged:: 3.1 

377 If the `IOLoop` implementation supports non-integer objects 

378 for "file descriptors", those objects will have their 

379 ``close`` method when ``all_fds`` is true. 

380 """ 

381 raise NotImplementedError() 

382 

383 @typing.overload 

384 def add_handler( 

385 self, fd: int, handler: Callable[[int, int], None], events: int 

386 ) -> None: 

387 pass 

388 

389 @typing.overload # noqa: F811 

390 def add_handler( 

391 self, fd: _S, handler: Callable[[_S, int], None], events: int 

392 ) -> None: 

393 pass 

394 

395 def add_handler( # noqa: F811 

396 self, fd: Union[int, _Selectable], handler: Callable[..., None], events: int 

397 ) -> None: 

398 """Registers the given handler to receive the given events for ``fd``. 

399 

400 The ``fd`` argument may either be an integer file descriptor or 

401 a file-like object with a ``fileno()`` and ``close()`` method. 

402 

403 The ``events`` argument is a bitwise or of the constants 

404 ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``. 

405 

406 When an event occurs, ``handler(fd, events)`` will be run. 

407 

408 .. versionchanged:: 4.0 

409 Added the ability to pass file-like objects in addition to 

410 raw file descriptors. 

411 """ 

412 raise NotImplementedError() 

413 

414 def update_handler(self, fd: Union[int, _Selectable], events: int) -> None: 

415 """Changes the events we listen for ``fd``. 

416 

417 .. versionchanged:: 4.0 

418 Added the ability to pass file-like objects in addition to 

419 raw file descriptors. 

420 """ 

421 raise NotImplementedError() 

422 

423 def remove_handler(self, fd: Union[int, _Selectable]) -> None: 

424 """Stop listening for events on ``fd``. 

425 

426 .. versionchanged:: 4.0 

427 Added the ability to pass file-like objects in addition to 

428 raw file descriptors. 

429 """ 

430 raise NotImplementedError() 

431 

432 def start(self) -> None: 

433 """Starts the I/O loop. 

434 

435 The loop will run until one of the callbacks calls `stop()`, which 

436 will make the loop stop after the current event iteration completes. 

437 """ 

438 raise NotImplementedError() 

439 

440 def stop(self) -> None: 

441 """Stop the I/O loop. 

442 

443 If the event loop is not currently running, the next call to `start()` 

444 will return immediately. 

445 

446 Note that even after `stop` has been called, the `IOLoop` is not 

447 completely stopped until `IOLoop.start` has also returned. 

448 Some work that was scheduled before the call to `stop` may still 

449 be run before the `IOLoop` shuts down. 

450 """ 

451 raise NotImplementedError() 

452 

453 def run_sync(self, func: Callable, timeout: Optional[float] = None) -> Any: 

454 """Starts the `IOLoop`, runs the given function, and stops the loop. 

455 

456 The function must return either an awaitable object or 

457 ``None``. If the function returns an awaitable object, the 

458 `IOLoop` will run until the awaitable is resolved (and 

459 `run_sync()` will return the awaitable's result). If it raises 

460 an exception, the `IOLoop` will stop and the exception will be 

461 re-raised to the caller. 

462 

463 The keyword-only argument ``timeout`` may be used to set 

464 a maximum duration for the function. If the timeout expires, 

465 a `asyncio.TimeoutError` is raised. 

466 

467 This method is useful to allow asynchronous calls in a 

468 ``main()`` function:: 

469 

470 async def main(): 

471 # do stuff... 

472 

473 if __name__ == '__main__': 

474 IOLoop.current().run_sync(main) 

475 

476 .. versionchanged:: 4.3 

477 Returning a non-``None``, non-awaitable value is now an error. 

478 

479 .. versionchanged:: 5.0 

480 If a timeout occurs, the ``func`` coroutine will be cancelled. 

481 

482 .. versionchanged:: 6.2 

483 ``tornado.util.TimeoutError`` is now an alias to ``asyncio.TimeoutError``. 

484 """ 

485 future_cell = [None] # type: List[Optional[Future]] 

486 

487 def run() -> None: 

488 try: 

489 result = func() 

490 if result is not None: 

491 from tornado.gen import convert_yielded 

492 

493 result = convert_yielded(result) 

494 except Exception: 

495 fut = Future() # type: Future[Any] 

496 future_cell[0] = fut 

497 future_set_exc_info(fut, sys.exc_info()) 

498 else: 

499 if is_future(result): 

500 future_cell[0] = result 

501 else: 

502 fut = Future() 

503 future_cell[0] = fut 

504 fut.set_result(result) 

505 assert future_cell[0] is not None 

506 self.add_future(future_cell[0], lambda future: self.stop()) 

507 

508 self.add_callback(run) 

509 if timeout is not None: 

510 

511 def timeout_callback() -> None: 

512 # If we can cancel the future, do so and wait on it. If not, 

513 # Just stop the loop and return with the task still pending. 

514 # (If we neither cancel nor wait for the task, a warning 

515 # will be logged). 

516 assert future_cell[0] is not None 

517 if not future_cell[0].cancel(): 

518 self.stop() 

519 

520 timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback) 

521 self.start() 

522 if timeout is not None: 

523 self.remove_timeout(timeout_handle) 

524 assert future_cell[0] is not None 

525 if future_cell[0].cancelled() or not future_cell[0].done(): 

526 raise TimeoutError("Operation timed out after %s seconds" % timeout) 

527 return future_cell[0].result() 

528 

529 def time(self) -> float: 

530 """Returns the current time according to the `IOLoop`'s clock. 

531 

532 The return value is a floating-point number relative to an 

533 unspecified time in the past. 

534 

535 Historically, the IOLoop could be customized to use e.g. 

536 `time.monotonic` instead of `time.time`, but this is not 

537 currently supported and so this method is equivalent to 

538 `time.time`. 

539 

540 """ 

541 return time.time() 

542 

543 def add_timeout( 

544 self, 

545 deadline: Union[float, datetime.timedelta], 

546 callback: Callable, 

547 *args: Any, 

548 **kwargs: Any 

549 ) -> object: 

550 """Runs the ``callback`` at the time ``deadline`` from the I/O loop. 

551 

552 Returns an opaque handle that may be passed to 

553 `remove_timeout` to cancel. 

554 

555 ``deadline`` may be a number denoting a time (on the same 

556 scale as `IOLoop.time`, normally `time.time`), or a 

557 `datetime.timedelta` object for a deadline relative to the 

558 current time. Since Tornado 4.0, `call_later` is a more 

559 convenient alternative for the relative case since it does not 

560 require a timedelta object. 

561 

562 Note that it is not safe to call `add_timeout` from other threads. 

563 Instead, you must use `add_callback` to transfer control to the 

564 `IOLoop`'s thread, and then call `add_timeout` from there. 

565 

566 Subclasses of IOLoop must implement either `add_timeout` or 

567 `call_at`; the default implementations of each will call 

568 the other. `call_at` is usually easier to implement, but 

569 subclasses that wish to maintain compatibility with Tornado 

570 versions prior to 4.0 must use `add_timeout` instead. 

571 

572 .. versionchanged:: 4.0 

573 Now passes through ``*args`` and ``**kwargs`` to the callback. 

574 """ 

575 if isinstance(deadline, numbers.Real): 

576 return self.call_at(deadline, callback, *args, **kwargs) 

577 elif isinstance(deadline, datetime.timedelta): 

578 return self.call_at( 

579 self.time() + deadline.total_seconds(), callback, *args, **kwargs 

580 ) 

581 else: 

582 raise TypeError("Unsupported deadline %r" % deadline) 

583 

584 def call_later( 

585 self, delay: float, callback: Callable, *args: Any, **kwargs: Any 

586 ) -> object: 

587 """Runs the ``callback`` after ``delay`` seconds have passed. 

588 

589 Returns an opaque handle that may be passed to `remove_timeout` 

590 to cancel. Note that unlike the `asyncio` method of the same 

591 name, the returned object does not have a ``cancel()`` method. 

592 

593 See `add_timeout` for comments on thread-safety and subclassing. 

594 

595 .. versionadded:: 4.0 

596 """ 

597 return self.call_at(self.time() + delay, callback, *args, **kwargs) 

598 

599 def call_at( 

600 self, when: float, callback: Callable, *args: Any, **kwargs: Any 

601 ) -> object: 

602 """Runs the ``callback`` at the absolute time designated by ``when``. 

603 

604 ``when`` must be a number using the same reference point as 

605 `IOLoop.time`. 

606 

607 Returns an opaque handle that may be passed to `remove_timeout` 

608 to cancel. Note that unlike the `asyncio` method of the same 

609 name, the returned object does not have a ``cancel()`` method. 

610 

611 See `add_timeout` for comments on thread-safety and subclassing. 

612 

613 .. versionadded:: 4.0 

614 """ 

615 return self.add_timeout(when, callback, *args, **kwargs) 

616 

617 def remove_timeout(self, timeout: object) -> None: 

618 """Cancels a pending timeout. 

619 

620 The argument is a handle as returned by `add_timeout`. It is 

621 safe to call `remove_timeout` even if the callback has already 

622 been run. 

623 """ 

624 raise NotImplementedError() 

625 

626 def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None: 

627 """Calls the given callback on the next I/O loop iteration. 

628 

629 It is safe to call this method from any thread at any time, 

630 except from a signal handler. Note that this is the **only** 

631 method in `IOLoop` that makes this thread-safety guarantee; all 

632 other interaction with the `IOLoop` must be done from that 

633 `IOLoop`'s thread. `add_callback()` may be used to transfer 

634 control from other threads to the `IOLoop`'s thread. 

635 

636 To add a callback from a signal handler, see 

637 `add_callback_from_signal`. 

638 """ 

639 raise NotImplementedError() 

640 

641 def add_callback_from_signal( 

642 self, callback: Callable, *args: Any, **kwargs: Any 

643 ) -> None: 

644 """Calls the given callback on the next I/O loop iteration. 

645 

646 Safe for use from a Python signal handler; should not be used 

647 otherwise. 

648 """ 

649 raise NotImplementedError() 

650 

651 def spawn_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None: 

652 """Calls the given callback on the next IOLoop iteration. 

653 

654 As of Tornado 6.0, this method is equivalent to `add_callback`. 

655 

656 .. versionadded:: 4.0 

657 """ 

658 self.add_callback(callback, *args, **kwargs) 

659 

660 def add_future( 

661 self, 

662 future: "Union[Future[_T], concurrent.futures.Future[_T]]", 

663 callback: Callable[["Future[_T]"], None], 

664 ) -> None: 

665 """Schedules a callback on the ``IOLoop`` when the given 

666 `.Future` is finished. 

667 

668 The callback is invoked with one argument, the 

669 `.Future`. 

670 

671 This method only accepts `.Future` objects and not other 

672 awaitables (unlike most of Tornado where the two are 

673 interchangeable). 

674 """ 

675 if isinstance(future, Future): 

676 # Note that we specifically do not want the inline behavior of 

677 # tornado.concurrent.future_add_done_callback. We always want 

678 # this callback scheduled on the next IOLoop iteration (which 

679 # asyncio.Future always does). 

680 # 

681 # Wrap the callback in self._run_callback so we control 

682 # the error logging (i.e. it goes to tornado.log.app_log 

683 # instead of asyncio's log). 

684 future.add_done_callback( 

685 lambda f: self._run_callback(functools.partial(callback, future)) 

686 ) 

687 else: 

688 assert is_future(future) 

689 # For concurrent futures, we use self.add_callback, so 

690 # it's fine if future_add_done_callback inlines that call. 

691 future_add_done_callback( 

692 future, lambda f: self.add_callback(callback, future) 

693 ) 

694 

695 def run_in_executor( 

696 self, 

697 executor: Optional[concurrent.futures.Executor], 

698 func: Callable[..., _T], 

699 *args: Any 

700 ) -> Awaitable[_T]: 

701 """Runs a function in a ``concurrent.futures.Executor``. If 

702 ``executor`` is ``None``, the IO loop's default executor will be used. 

703 

704 Use `functools.partial` to pass keyword arguments to ``func``. 

705 

706 .. versionadded:: 5.0 

707 """ 

708 if executor is None: 

709 if not hasattr(self, "_executor"): 

710 from tornado.process import cpu_count 

711 

712 self._executor = concurrent.futures.ThreadPoolExecutor( 

713 max_workers=(cpu_count() * 5) 

714 ) # type: concurrent.futures.Executor 

715 executor = self._executor 

716 c_future = executor.submit(func, *args) 

717 # Concurrent Futures are not usable with await. Wrap this in a 

718 # Tornado Future instead, using self.add_future for thread-safety. 

719 t_future = Future() # type: Future[_T] 

720 self.add_future(c_future, lambda f: chain_future(f, t_future)) 

721 return t_future 

722 

723 def set_default_executor(self, executor: concurrent.futures.Executor) -> None: 

724 """Sets the default executor to use with :meth:`run_in_executor`. 

725 

726 .. versionadded:: 5.0 

727 """ 

728 self._executor = executor 

729 

730 def _run_callback(self, callback: Callable[[], Any]) -> None: 

731 """Runs a callback with error handling. 

732 

733 .. versionchanged:: 6.0 

734 

735 CancelledErrors are no longer logged. 

736 """ 

737 try: 

738 ret = callback() 

739 if ret is not None: 

740 from tornado import gen 

741 

742 # Functions that return Futures typically swallow all 

743 # exceptions and store them in the Future. If a Future 

744 # makes it out to the IOLoop, ensure its exception (if any) 

745 # gets logged too. 

746 try: 

747 ret = gen.convert_yielded(ret) 

748 except gen.BadYieldError: 

749 # It's not unusual for add_callback to be used with 

750 # methods returning a non-None and non-yieldable 

751 # result, which should just be ignored. 

752 pass 

753 else: 

754 self.add_future(ret, self._discard_future_result) 

755 except asyncio.CancelledError: 

756 pass 

757 except Exception: 

758 app_log.error("Exception in callback %r", callback, exc_info=True) 

759 

760 def _discard_future_result(self, future: Future) -> None: 

761 """Avoid unhandled-exception warnings from spawned coroutines.""" 

762 future.result() 

763 

764 def split_fd( 

765 self, fd: Union[int, _Selectable] 

766 ) -> Tuple[int, Union[int, _Selectable]]: 

767 # """Returns an (fd, obj) pair from an ``fd`` parameter. 

768 

769 # We accept both raw file descriptors and file-like objects as 

770 # input to `add_handler` and related methods. When a file-like 

771 # object is passed, we must retain the object itself so we can 

772 # close it correctly when the `IOLoop` shuts down, but the 

773 # poller interfaces favor file descriptors (they will accept 

774 # file-like objects and call ``fileno()`` for you, but they 

775 # always return the descriptor itself). 

776 

777 # This method is provided for use by `IOLoop` subclasses and should 

778 # not generally be used by application code. 

779 

780 # .. versionadded:: 4.0 

781 # """ 

782 if isinstance(fd, int): 

783 return fd, fd 

784 return fd.fileno(), fd 

785 

786 def close_fd(self, fd: Union[int, _Selectable]) -> None: 

787 # """Utility method to close an ``fd``. 

788 

789 # If ``fd`` is a file-like object, we close it directly; otherwise 

790 # we use `os.close`. 

791 

792 # This method is provided for use by `IOLoop` subclasses (in 

793 # implementations of ``IOLoop.close(all_fds=True)`` and should 

794 # not generally be used by application code. 

795 

796 # .. versionadded:: 4.0 

797 # """ 

798 try: 

799 if isinstance(fd, int): 

800 os.close(fd) 

801 else: 

802 fd.close() 

803 except OSError: 

804 pass 

805 

806 

807class _Timeout(object): 

808 """An IOLoop timeout, a UNIX timestamp and a callback""" 

809 

810 # Reduce memory overhead when there are lots of pending callbacks 

811 __slots__ = ["deadline", "callback", "tdeadline"] 

812 

813 def __init__( 

814 self, deadline: float, callback: Callable[[], None], io_loop: IOLoop 

815 ) -> None: 

816 if not isinstance(deadline, numbers.Real): 

817 raise TypeError("Unsupported deadline %r" % deadline) 

818 self.deadline = deadline 

819 self.callback = callback 

820 self.tdeadline = ( 

821 deadline, 

822 next(io_loop._timeout_counter), 

823 ) # type: Tuple[float, int] 

824 

825 # Comparison methods to sort by deadline, with object id as a tiebreaker 

826 # to guarantee a consistent ordering. The heapq module uses __le__ 

827 # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons 

828 # use __lt__). 

829 def __lt__(self, other: "_Timeout") -> bool: 

830 return self.tdeadline < other.tdeadline 

831 

832 def __le__(self, other: "_Timeout") -> bool: 

833 return self.tdeadline <= other.tdeadline 

834 

835 

836class PeriodicCallback(object): 

837 """Schedules the given callback to be called periodically. 

838 

839 The callback is called every ``callback_time`` milliseconds when 

840 ``callback_time`` is a float. Note that the timeout is given in 

841 milliseconds, while most other time-related functions in Tornado use 

842 seconds. ``callback_time`` may alternatively be given as a 

843 `datetime.timedelta` object. 

844 

845 If ``jitter`` is specified, each callback time will be randomly selected 

846 within a window of ``jitter * callback_time`` milliseconds. 

847 Jitter can be used to reduce alignment of events with similar periods. 

848 A jitter of 0.1 means allowing a 10% variation in callback time. 

849 The window is centered on ``callback_time`` so the total number of calls 

850 within a given interval should not be significantly affected by adding 

851 jitter. 

852 

853 If the callback runs for longer than ``callback_time`` milliseconds, 

854 subsequent invocations will be skipped to get back on schedule. 

855 

856 `start` must be called after the `PeriodicCallback` is created. 

857 

858 .. versionchanged:: 5.0 

859 The ``io_loop`` argument (deprecated since version 4.1) has been removed. 

860 

861 .. versionchanged:: 5.1 

862 The ``jitter`` argument is added. 

863 

864 .. versionchanged:: 6.2 

865 If the ``callback`` argument is a coroutine, and a callback runs for 

866 longer than ``callback_time``, subsequent invocations will be skipped. 

867 Previously this was only true for regular functions, not coroutines, 

868 which were "fire-and-forget" for `PeriodicCallback`. 

869 

870 The ``callback_time`` argument now accepts `datetime.timedelta` objects, 

871 in addition to the previous numeric milliseconds. 

872 """ 

873 

874 def __init__( 

875 self, 

876 callback: Callable[[], Optional[Awaitable]], 

877 callback_time: Union[datetime.timedelta, float], 

878 jitter: float = 0, 

879 ) -> None: 

880 self.callback = callback 

881 if isinstance(callback_time, datetime.timedelta): 

882 self.callback_time = callback_time / datetime.timedelta(milliseconds=1) 

883 else: 

884 if callback_time <= 0: 

885 raise ValueError("Periodic callback must have a positive callback_time") 

886 self.callback_time = callback_time 

887 self.jitter = jitter 

888 self._running = False 

889 self._timeout = None # type: object 

890 

891 def start(self) -> None: 

892 """Starts the timer.""" 

893 # Looking up the IOLoop here allows to first instantiate the 

894 # PeriodicCallback in another thread, then start it using 

895 # IOLoop.add_callback(). 

896 self.io_loop = IOLoop.current() 

897 self._running = True 

898 self._next_timeout = self.io_loop.time() 

899 self._schedule_next() 

900 

901 def stop(self) -> None: 

902 """Stops the timer.""" 

903 self._running = False 

904 if self._timeout is not None: 

905 self.io_loop.remove_timeout(self._timeout) 

906 self._timeout = None 

907 

908 def is_running(self) -> bool: 

909 """Returns ``True`` if this `.PeriodicCallback` has been started. 

910 

911 .. versionadded:: 4.1 

912 """ 

913 return self._running 

914 

915 async def _run(self) -> None: 

916 if not self._running: 

917 return 

918 try: 

919 val = self.callback() 

920 if val is not None and isawaitable(val): 

921 await val 

922 except Exception: 

923 app_log.error("Exception in callback %r", self.callback, exc_info=True) 

924 finally: 

925 self._schedule_next() 

926 

927 def _schedule_next(self) -> None: 

928 if self._running: 

929 self._update_next(self.io_loop.time()) 

930 self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run) 

931 

932 def _update_next(self, current_time: float) -> None: 

933 callback_time_sec = self.callback_time / 1000.0 

934 if self.jitter: 

935 # apply jitter fraction 

936 callback_time_sec *= 1 + (self.jitter * (random.random() - 0.5)) 

937 if self._next_timeout <= current_time: 

938 # The period should be measured from the start of one call 

939 # to the start of the next. If one call takes too long, 

940 # skip cycles to get back to a multiple of the original 

941 # schedule. 

942 self._next_timeout += ( 

943 math.floor((current_time - self._next_timeout) / callback_time_sec) + 1 

944 ) * callback_time_sec 

945 else: 

946 # If the clock moved backwards, ensure we advance the next 

947 # timeout instead of recomputing the same value again. 

948 # This may result in long gaps between callbacks if the 

949 # clock jumps backwards by a lot, but the far more common 

950 # scenario is a small NTP adjustment that should just be 

951 # ignored. 

952 # 

953 # Note that on some systems if time.time() runs slower 

954 # than time.monotonic() (most common on windows), we 

955 # effectively experience a small backwards time jump on 

956 # every iteration because PeriodicCallback uses 

957 # time.time() while asyncio schedules callbacks using 

958 # time.monotonic(). 

959 # https://github.com/tornadoweb/tornado/issues/2333 

960 self._next_timeout += callback_time_sec