Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/prompt_toolkit/application/application.py: 19%

581 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:05 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import contextvars 

5import os 

6import re 

7import signal 

8import sys 

9import threading 

10import time 

11from asyncio import ( 

12 AbstractEventLoop, 

13 Future, 

14 Task, 

15 ensure_future, 

16 get_running_loop, 

17 sleep, 

18) 

19from contextlib import ExitStack, contextmanager 

20from subprocess import Popen 

21from traceback import format_tb 

22from typing import ( 

23 Any, 

24 Callable, 

25 Coroutine, 

26 Generator, 

27 Generic, 

28 Hashable, 

29 Iterable, 

30 Iterator, 

31 TypeVar, 

32 cast, 

33 overload, 

34) 

35 

36from prompt_toolkit.buffer import Buffer 

37from prompt_toolkit.cache import SimpleCache 

38from prompt_toolkit.clipboard import Clipboard, InMemoryClipboard 

39from prompt_toolkit.cursor_shapes import AnyCursorShapeConfig, to_cursor_shape_config 

40from prompt_toolkit.data_structures import Size 

41from prompt_toolkit.enums import EditingMode 

42from prompt_toolkit.eventloop import ( 

43 get_traceback_from_context, 

44 run_in_executor_with_context, 

45) 

46from prompt_toolkit.eventloop.utils import call_soon_threadsafe 

47from prompt_toolkit.filters import Condition, Filter, FilterOrBool, to_filter 

48from prompt_toolkit.formatted_text import AnyFormattedText 

49from prompt_toolkit.input.base import Input 

50from prompt_toolkit.input.typeahead import get_typeahead, store_typeahead 

51from prompt_toolkit.key_binding.bindings.page_navigation import ( 

52 load_page_navigation_bindings, 

53) 

54from prompt_toolkit.key_binding.defaults import load_key_bindings 

55from prompt_toolkit.key_binding.emacs_state import EmacsState 

56from prompt_toolkit.key_binding.key_bindings import ( 

57 Binding, 

58 ConditionalKeyBindings, 

59 GlobalOnlyKeyBindings, 

60 KeyBindings, 

61 KeyBindingsBase, 

62 KeysTuple, 

63 merge_key_bindings, 

64) 

65from prompt_toolkit.key_binding.key_processor import KeyPressEvent, KeyProcessor 

66from prompt_toolkit.key_binding.vi_state import ViState 

67from prompt_toolkit.keys import Keys 

68from prompt_toolkit.layout.containers import Container, Window 

69from prompt_toolkit.layout.controls import BufferControl, UIControl 

70from prompt_toolkit.layout.dummy import create_dummy_layout 

71from prompt_toolkit.layout.layout import Layout, walk 

72from prompt_toolkit.output import ColorDepth, Output 

73from prompt_toolkit.renderer import Renderer, print_formatted_text 

74from prompt_toolkit.search import SearchState 

75from prompt_toolkit.styles import ( 

76 BaseStyle, 

77 DummyStyle, 

78 DummyStyleTransformation, 

79 DynamicStyle, 

80 StyleTransformation, 

81 default_pygments_style, 

82 default_ui_style, 

83 merge_styles, 

84) 

85from prompt_toolkit.utils import Event, in_main_thread 

86 

87from .current import get_app_session, set_app 

88from .run_in_terminal import in_terminal, run_in_terminal 

89 

90__all__ = [ 

91 "Application", 

92] 

93 

94 

95E = KeyPressEvent 

96_AppResult = TypeVar("_AppResult") 

97ApplicationEventHandler = Callable[["Application[_AppResult]"], None] 

98 

99_SIGWINCH = getattr(signal, "SIGWINCH", None) 

100_SIGTSTP = getattr(signal, "SIGTSTP", None) 

101 

102 

103class Application(Generic[_AppResult]): 

104 """ 

105 The main Application class! 

106 This glues everything together. 

107 

108 :param layout: A :class:`~prompt_toolkit.layout.Layout` instance. 

109 :param key_bindings: 

110 :class:`~prompt_toolkit.key_binding.KeyBindingsBase` instance for 

111 the key bindings. 

112 :param clipboard: :class:`~prompt_toolkit.clipboard.Clipboard` to use. 

113 :param full_screen: When True, run the application on the alternate screen buffer. 

114 :param color_depth: Any :class:`~.ColorDepth` value, a callable that 

115 returns a :class:`~.ColorDepth` or `None` for default. 

116 :param erase_when_done: (bool) Clear the application output when it finishes. 

117 :param reverse_vi_search_direction: Normally, in Vi mode, a '/' searches 

118 forward and a '?' searches backward. In Readline mode, this is usually 

119 reversed. 

120 :param min_redraw_interval: Number of seconds to wait between redraws. Use 

121 this for applications where `invalidate` is called a lot. This could cause 

122 a lot of terminal output, which some terminals are not able to process. 

123 

124 `None` means that every `invalidate` will be scheduled right away 

125 (which is usually fine). 

126 

127 When one `invalidate` is called, but a scheduled redraw of a previous 

128 `invalidate` call has not been executed yet, nothing will happen in any 

129 case. 

130 

131 :param max_render_postpone_time: When there is high CPU (a lot of other 

132 scheduled calls), postpone the rendering max x seconds. '0' means: 

133 don't postpone. '.5' means: try to draw at least twice a second. 

134 

135 :param refresh_interval: Automatically invalidate the UI every so many 

136 seconds. When `None` (the default), only invalidate when `invalidate` 

137 has been called. 

138 

139 :param terminal_size_polling_interval: Poll the terminal size every so many 

140 seconds. Useful if the applications runs in a thread other then then 

141 main thread where SIGWINCH can't be handled, or on Windows. 

142 

143 Filters: 

144 

145 :param mouse_support: (:class:`~prompt_toolkit.filters.Filter` or 

146 boolean). When True, enable mouse support. 

147 :param paste_mode: :class:`~prompt_toolkit.filters.Filter` or boolean. 

148 :param editing_mode: :class:`~prompt_toolkit.enums.EditingMode`. 

149 

150 :param enable_page_navigation_bindings: When `True`, enable the page 

151 navigation key bindings. These include both Emacs and Vi bindings like 

152 page-up, page-down and so on to scroll through pages. Mostly useful for 

153 creating an editor or other full screen applications. Probably, you 

154 don't want this for the implementation of a REPL. By default, this is 

155 enabled if `full_screen` is set. 

156 

157 Callbacks (all of these should accept an 

158 :class:`~prompt_toolkit.application.Application` object as input.) 

159 

160 :param on_reset: Called during reset. 

161 :param on_invalidate: Called when the UI has been invalidated. 

162 :param before_render: Called right before rendering. 

163 :param after_render: Called right after rendering. 

164 

165 I/O: 

166 (Note that the preferred way to change the input/output is by creating an 

167 `AppSession` with the required input/output objects. If you need multiple 

168 applications running at the same time, you have to create a separate 

169 `AppSession` using a `with create_app_session():` block. 

170 

171 :param input: :class:`~prompt_toolkit.input.Input` instance. 

172 :param output: :class:`~prompt_toolkit.output.Output` instance. (Probably 

173 Vt100_Output or Win32Output.) 

174 

175 Usage: 

176 

177 app = Application(...) 

178 app.run() 

179 

180 # Or 

181 await app.run_async() 

182 """ 

183 

184 def __init__( 

185 self, 

186 layout: Layout | None = None, 

187 style: BaseStyle | None = None, 

188 include_default_pygments_style: FilterOrBool = True, 

189 style_transformation: StyleTransformation | None = None, 

190 key_bindings: KeyBindingsBase | None = None, 

191 clipboard: Clipboard | None = None, 

192 full_screen: bool = False, 

193 color_depth: (ColorDepth | Callable[[], ColorDepth | None] | None) = None, 

194 mouse_support: FilterOrBool = False, 

195 enable_page_navigation_bindings: None 

196 | (FilterOrBool) = None, # Can be None, True or False. 

197 paste_mode: FilterOrBool = False, 

198 editing_mode: EditingMode = EditingMode.EMACS, 

199 erase_when_done: bool = False, 

200 reverse_vi_search_direction: FilterOrBool = False, 

201 min_redraw_interval: float | int | None = None, 

202 max_render_postpone_time: float | int | None = 0.01, 

203 refresh_interval: float | None = None, 

204 terminal_size_polling_interval: float | None = 0.5, 

205 cursor: AnyCursorShapeConfig = None, 

206 on_reset: ApplicationEventHandler[_AppResult] | None = None, 

207 on_invalidate: ApplicationEventHandler[_AppResult] | None = None, 

208 before_render: ApplicationEventHandler[_AppResult] | None = None, 

209 after_render: ApplicationEventHandler[_AppResult] | None = None, 

210 # I/O. 

211 input: Input | None = None, 

212 output: Output | None = None, 

213 ) -> None: 

214 # If `enable_page_navigation_bindings` is not specified, enable it in 

215 # case of full screen applications only. This can be overridden by the user. 

216 if enable_page_navigation_bindings is None: 

217 enable_page_navigation_bindings = Condition(lambda: self.full_screen) 

218 

219 paste_mode = to_filter(paste_mode) 

220 mouse_support = to_filter(mouse_support) 

221 reverse_vi_search_direction = to_filter(reverse_vi_search_direction) 

222 enable_page_navigation_bindings = to_filter(enable_page_navigation_bindings) 

223 include_default_pygments_style = to_filter(include_default_pygments_style) 

224 

225 if layout is None: 

226 layout = create_dummy_layout() 

227 

228 if style_transformation is None: 

229 style_transformation = DummyStyleTransformation() 

230 

231 self.style = style 

232 self.style_transformation = style_transformation 

233 

234 # Key bindings. 

235 self.key_bindings = key_bindings 

236 self._default_bindings = load_key_bindings() 

237 self._page_navigation_bindings = load_page_navigation_bindings() 

238 

239 self.layout = layout 

240 self.clipboard = clipboard or InMemoryClipboard() 

241 self.full_screen: bool = full_screen 

242 self._color_depth = color_depth 

243 self.mouse_support = mouse_support 

244 

245 self.paste_mode = paste_mode 

246 self.editing_mode = editing_mode 

247 self.erase_when_done = erase_when_done 

248 self.reverse_vi_search_direction = reverse_vi_search_direction 

249 self.enable_page_navigation_bindings = enable_page_navigation_bindings 

250 self.min_redraw_interval = min_redraw_interval 

251 self.max_render_postpone_time = max_render_postpone_time 

252 self.refresh_interval = refresh_interval 

253 self.terminal_size_polling_interval = terminal_size_polling_interval 

254 

255 self.cursor = to_cursor_shape_config(cursor) 

256 

257 # Events. 

258 self.on_invalidate = Event(self, on_invalidate) 

259 self.on_reset = Event(self, on_reset) 

260 self.before_render = Event(self, before_render) 

261 self.after_render = Event(self, after_render) 

262 

263 # I/O. 

264 session = get_app_session() 

265 self.output = output or session.output 

266 self.input = input or session.input 

267 

268 # List of 'extra' functions to execute before a Application.run. 

269 self.pre_run_callables: list[Callable[[], None]] = [] 

270 

271 self._is_running = False 

272 self.future: Future[_AppResult] | None = None 

273 self.loop: AbstractEventLoop | None = None 

274 self._loop_thread: threading.Thread | None = None 

275 self.context: contextvars.Context | None = None 

276 

277 #: Quoted insert. This flag is set if we go into quoted insert mode. 

278 self.quoted_insert = False 

279 

280 #: Vi state. (For Vi key bindings.) 

281 self.vi_state = ViState() 

282 self.emacs_state = EmacsState() 

283 

284 #: When to flush the input (For flushing escape keys.) This is important 

285 #: on terminals that use vt100 input. We can't distinguish the escape 

286 #: key from for instance the left-arrow key, if we don't know what follows 

287 #: after "\x1b". This little timer will consider "\x1b" to be escape if 

288 #: nothing did follow in this time span. 

289 #: This seems to work like the `ttimeoutlen` option in Vim. 

290 self.ttimeoutlen = 0.5 # Seconds. 

291 

292 #: Like Vim's `timeoutlen` option. This can be `None` or a float. For 

293 #: instance, suppose that we have a key binding AB and a second key 

294 #: binding A. If the uses presses A and then waits, we don't handle 

295 #: this binding yet (unless it was marked 'eager'), because we don't 

296 #: know what will follow. This timeout is the maximum amount of time 

297 #: that we wait until we call the handlers anyway. Pass `None` to 

298 #: disable this timeout. 

299 self.timeoutlen = 1.0 

300 

301 #: The `Renderer` instance. 

302 # Make sure that the same stdout is used, when a custom renderer has been passed. 

303 self._merged_style = self._create_merged_style(include_default_pygments_style) 

304 

305 self.renderer = Renderer( 

306 self._merged_style, 

307 self.output, 

308 full_screen=full_screen, 

309 mouse_support=mouse_support, 

310 cpr_not_supported_callback=self.cpr_not_supported_callback, 

311 ) 

312 

313 #: Render counter. This one is increased every time the UI is rendered. 

314 #: It can be used as a key for caching certain information during one 

315 #: rendering. 

316 self.render_counter = 0 

317 

318 # Invalidate flag. When 'True', a repaint has been scheduled. 

319 self._invalidated = False 

320 self._invalidate_events: list[ 

321 Event[object] 

322 ] = [] # Collection of 'invalidate' Event objects. 

323 self._last_redraw_time = 0.0 # Unix timestamp of last redraw. Used when 

324 # `min_redraw_interval` is given. 

325 

326 #: The `InputProcessor` instance. 

327 self.key_processor = KeyProcessor(_CombinedRegistry(self)) 

328 

329 # If `run_in_terminal` was called. This will point to a `Future` what will be 

330 # set at the point when the previous run finishes. 

331 self._running_in_terminal = False 

332 self._running_in_terminal_f: Future[None] | None = None 

333 

334 # Trigger initialize callback. 

335 self.reset() 

336 

337 def _create_merged_style(self, include_default_pygments_style: Filter) -> BaseStyle: 

338 """ 

339 Create a `Style` object that merges the default UI style, the default 

340 pygments style, and the custom user style. 

341 """ 

342 dummy_style = DummyStyle() 

343 pygments_style = default_pygments_style() 

344 

345 @DynamicStyle 

346 def conditional_pygments_style() -> BaseStyle: 

347 if include_default_pygments_style(): 

348 return pygments_style 

349 else: 

350 return dummy_style 

351 

352 return merge_styles( 

353 [ 

354 default_ui_style(), 

355 conditional_pygments_style, 

356 DynamicStyle(lambda: self.style), 

357 ] 

358 ) 

359 

360 @property 

361 def color_depth(self) -> ColorDepth: 

362 """ 

363 The active :class:`.ColorDepth`. 

364 

365 The current value is determined as follows: 

366 

367 - If a color depth was given explicitly to this application, use that 

368 value. 

369 - Otherwise, fall back to the color depth that is reported by the 

370 :class:`.Output` implementation. If the :class:`.Output` class was 

371 created using `output.defaults.create_output`, then this value is 

372 coming from the $PROMPT_TOOLKIT_COLOR_DEPTH environment variable. 

373 """ 

374 depth = self._color_depth 

375 

376 if callable(depth): 

377 depth = depth() 

378 

379 if depth is None: 

380 depth = self.output.get_default_color_depth() 

381 

382 return depth 

383 

384 @property 

385 def current_buffer(self) -> Buffer: 

386 """ 

387 The currently focused :class:`~.Buffer`. 

388 

389 (This returns a dummy :class:`.Buffer` when none of the actual buffers 

390 has the focus. In this case, it's really not practical to check for 

391 `None` values or catch exceptions every time.) 

392 """ 

393 return self.layout.current_buffer or Buffer( 

394 name="dummy-buffer" 

395 ) # Dummy buffer. 

396 

397 @property 

398 def current_search_state(self) -> SearchState: 

399 """ 

400 Return the current :class:`.SearchState`. (The one for the focused 

401 :class:`.BufferControl`.) 

402 """ 

403 ui_control = self.layout.current_control 

404 if isinstance(ui_control, BufferControl): 

405 return ui_control.search_state 

406 else: 

407 return SearchState() # Dummy search state. (Don't return None!) 

408 

409 def reset(self) -> None: 

410 """ 

411 Reset everything, for reading the next input. 

412 """ 

413 # Notice that we don't reset the buffers. (This happens just before 

414 # returning, and when we have multiple buffers, we clearly want the 

415 # content in the other buffers to remain unchanged between several 

416 # calls of `run`. (And the same is true for the focus stack.) 

417 

418 self.exit_style = "" 

419 

420 self._background_tasks: set[Task[None]] = set() 

421 

422 self.renderer.reset() 

423 self.key_processor.reset() 

424 self.layout.reset() 

425 self.vi_state.reset() 

426 self.emacs_state.reset() 

427 

428 # Trigger reset event. 

429 self.on_reset.fire() 

430 

431 # Make sure that we have a 'focusable' widget focused. 

432 # (The `Layout` class can't determine this.) 

433 layout = self.layout 

434 

435 if not layout.current_control.is_focusable(): 

436 for w in layout.find_all_windows(): 

437 if w.content.is_focusable(): 

438 layout.current_window = w 

439 break 

440 

441 def invalidate(self) -> None: 

442 """ 

443 Thread safe way of sending a repaint trigger to the input event loop. 

444 """ 

445 if not self._is_running: 

446 # Don't schedule a redraw if we're not running. 

447 # Otherwise, `get_running_loop()` in `call_soon_threadsafe` can fail. 

448 # See: https://github.com/dbcli/mycli/issues/797 

449 return 

450 

451 # `invalidate()` called if we don't have a loop yet (not running?), or 

452 # after the event loop was closed. 

453 if self.loop is None or self.loop.is_closed(): 

454 return 

455 

456 # Never schedule a second redraw, when a previous one has not yet been 

457 # executed. (This should protect against other threads calling 

458 # 'invalidate' many times, resulting in 100% CPU.) 

459 if self._invalidated: 

460 return 

461 else: 

462 self._invalidated = True 

463 

464 # Trigger event. 

465 self.loop.call_soon_threadsafe(self.on_invalidate.fire) 

466 

467 def redraw() -> None: 

468 self._invalidated = False 

469 self._redraw() 

470 

471 def schedule_redraw() -> None: 

472 call_soon_threadsafe( 

473 redraw, max_postpone_time=self.max_render_postpone_time, loop=self.loop 

474 ) 

475 

476 if self.min_redraw_interval: 

477 # When a minimum redraw interval is set, wait minimum this amount 

478 # of time between redraws. 

479 diff = time.time() - self._last_redraw_time 

480 if diff < self.min_redraw_interval: 

481 

482 async def redraw_in_future() -> None: 

483 await sleep(cast(float, self.min_redraw_interval) - diff) 

484 schedule_redraw() 

485 

486 self.loop.call_soon_threadsafe( 

487 lambda: self.create_background_task(redraw_in_future()) 

488 ) 

489 else: 

490 schedule_redraw() 

491 else: 

492 schedule_redraw() 

493 

494 @property 

495 def invalidated(self) -> bool: 

496 "True when a redraw operation has been scheduled." 

497 return self._invalidated 

498 

499 def _redraw(self, render_as_done: bool = False) -> None: 

500 """ 

501 Render the command line again. (Not thread safe!) (From other threads, 

502 or if unsure, use :meth:`.Application.invalidate`.) 

503 

504 :param render_as_done: make sure to put the cursor after the UI. 

505 """ 

506 

507 def run_in_context() -> None: 

508 # Only draw when no sub application was started. 

509 if self._is_running and not self._running_in_terminal: 

510 if self.min_redraw_interval: 

511 self._last_redraw_time = time.time() 

512 

513 # Render 

514 self.render_counter += 1 

515 self.before_render.fire() 

516 

517 if render_as_done: 

518 if self.erase_when_done: 

519 self.renderer.erase() 

520 else: 

521 # Draw in 'done' state and reset renderer. 

522 self.renderer.render(self, self.layout, is_done=render_as_done) 

523 else: 

524 self.renderer.render(self, self.layout) 

525 

526 self.layout.update_parents_relations() 

527 

528 # Fire render event. 

529 self.after_render.fire() 

530 

531 self._update_invalidate_events() 

532 

533 # NOTE: We want to make sure this Application is the active one. The 

534 # invalidate function is often called from a context where this 

535 # application is not the active one. (Like the 

536 # `PromptSession._auto_refresh_context`). 

537 # We copy the context in case the context was already active, to 

538 # prevent RuntimeErrors. (The rendering is not supposed to change 

539 # any context variables.) 

540 if self.context is not None: 

541 self.context.copy().run(run_in_context) 

542 

543 def _start_auto_refresh_task(self) -> None: 

544 """ 

545 Start a while/true loop in the background for automatic invalidation of 

546 the UI. 

547 """ 

548 if self.refresh_interval is not None and self.refresh_interval != 0: 

549 

550 async def auto_refresh(refresh_interval: float) -> None: 

551 while True: 

552 await sleep(refresh_interval) 

553 self.invalidate() 

554 

555 self.create_background_task(auto_refresh(self.refresh_interval)) 

556 

557 def _update_invalidate_events(self) -> None: 

558 """ 

559 Make sure to attach 'invalidate' handlers to all invalidate events in 

560 the UI. 

561 """ 

562 # Remove all the original event handlers. (Components can be removed 

563 # from the UI.) 

564 for ev in self._invalidate_events: 

565 ev -= self._invalidate_handler 

566 

567 # Gather all new events. 

568 # (All controls are able to invalidate themselves.) 

569 def gather_events() -> Iterable[Event[object]]: 

570 for c in self.layout.find_all_controls(): 

571 yield from c.get_invalidate_events() 

572 

573 self._invalidate_events = list(gather_events()) 

574 

575 for ev in self._invalidate_events: 

576 ev += self._invalidate_handler 

577 

578 def _invalidate_handler(self, sender: object) -> None: 

579 """ 

580 Handler for invalidate events coming from UIControls. 

581 

582 (This handles the difference in signature between event handler and 

583 `self.invalidate`. It also needs to be a method -not a nested 

584 function-, so that we can remove it again .) 

585 """ 

586 self.invalidate() 

587 

588 def _on_resize(self) -> None: 

589 """ 

590 When the window size changes, we erase the current output and request 

591 again the cursor position. When the CPR answer arrives, the output is 

592 drawn again. 

593 """ 

594 # Erase, request position (when cursor is at the start position) 

595 # and redraw again. -- The order is important. 

596 self.renderer.erase(leave_alternate_screen=False) 

597 self._request_absolute_cursor_position() 

598 self._redraw() 

599 

600 def _pre_run(self, pre_run: Callable[[], None] | None = None) -> None: 

601 """ 

602 Called during `run`. 

603 

604 `self.future` should be set to the new future at the point where this 

605 is called in order to avoid data races. `pre_run` can be used to set a 

606 `threading.Event` to synchronize with UI termination code, running in 

607 another thread that would call `Application.exit`. (See the progress 

608 bar code for an example.) 

609 """ 

610 if pre_run: 

611 pre_run() 

612 

613 # Process registered "pre_run_callables" and clear list. 

614 for c in self.pre_run_callables: 

615 c() 

616 del self.pre_run_callables[:] 

617 

618 async def run_async( 

619 self, 

620 pre_run: Callable[[], None] | None = None, 

621 set_exception_handler: bool = True, 

622 handle_sigint: bool = True, 

623 slow_callback_duration: float = 0.5, 

624 ) -> _AppResult: 

625 """ 

626 Run the prompt_toolkit :class:`~prompt_toolkit.application.Application` 

627 until :meth:`~prompt_toolkit.application.Application.exit` has been 

628 called. Return the value that was passed to 

629 :meth:`~prompt_toolkit.application.Application.exit`. 

630 

631 This is the main entry point for a prompt_toolkit 

632 :class:`~prompt_toolkit.application.Application` and usually the only 

633 place where the event loop is actually running. 

634 

635 :param pre_run: Optional callable, which is called right after the 

636 "reset" of the application. 

637 :param set_exception_handler: When set, in case of an exception, go out 

638 of the alternate screen and hide the application, display the 

639 exception, and wait for the user to press ENTER. 

640 :param handle_sigint: Handle SIGINT signal if possible. This will call 

641 the `<sigint>` key binding when a SIGINT is received. (This only 

642 works in the main thread.) 

643 :param slow_callback_duration: Display warnings if code scheduled in 

644 the asyncio event loop takes more time than this. The asyncio 

645 default of `0.1` is sometimes not sufficient on a slow system, 

646 because exceptionally, the drawing of the app, which happens in the 

647 event loop, can take a bit longer from time to time. 

648 """ 

649 assert not self._is_running, "Application is already running." 

650 

651 if not in_main_thread() or sys.platform == "win32": 

652 # Handling signals in other threads is not supported. 

653 # Also on Windows, `add_signal_handler(signal.SIGINT, ...)` raises 

654 # `NotImplementedError`. 

655 # See: https://github.com/prompt-toolkit/python-prompt-toolkit/issues/1553 

656 handle_sigint = False 

657 

658 async def _run_async(f: "asyncio.Future[_AppResult]") -> _AppResult: 

659 context = contextvars.copy_context() 

660 self.context = context 

661 

662 # Counter for cancelling 'flush' timeouts. Every time when a key is 

663 # pressed, we start a 'flush' timer for flushing our escape key. But 

664 # when any subsequent input is received, a new timer is started and 

665 # the current timer will be ignored. 

666 flush_task: asyncio.Task[None] | None = None 

667 

668 # Reset. 

669 # (`self.future` needs to be set when `pre_run` is called.) 

670 self.reset() 

671 self._pre_run(pre_run) 

672 

673 # Feed type ahead input first. 

674 self.key_processor.feed_multiple(get_typeahead(self.input)) 

675 self.key_processor.process_keys() 

676 

677 def read_from_input() -> None: 

678 nonlocal flush_task 

679 

680 # Ignore when we aren't running anymore. This callback will 

681 # removed from the loop next time. (It could be that it was 

682 # still in the 'tasks' list of the loop.) 

683 # Except: if we need to process incoming CPRs. 

684 if not self._is_running and not self.renderer.waiting_for_cpr: 

685 return 

686 

687 # Get keys from the input object. 

688 keys = self.input.read_keys() 

689 

690 # Feed to key processor. 

691 self.key_processor.feed_multiple(keys) 

692 self.key_processor.process_keys() 

693 

694 # Quit when the input stream was closed. 

695 if self.input.closed: 

696 if not f.done(): 

697 f.set_exception(EOFError) 

698 else: 

699 # Automatically flush keys. 

700 if flush_task: 

701 flush_task.cancel() 

702 flush_task = self.create_background_task(auto_flush_input()) 

703 

704 def read_from_input_in_context() -> None: 

705 # Ensure that key bindings callbacks are always executed in the 

706 # current context. This is important when key bindings are 

707 # accessing contextvars. (These callbacks are currently being 

708 # called from a different context. Underneath, 

709 # `loop.add_reader` is used to register the stdin FD.) 

710 # (We copy the context to avoid a `RuntimeError` in case the 

711 # context is already active.) 

712 context.copy().run(read_from_input) 

713 

714 async def auto_flush_input() -> None: 

715 # Flush input after timeout. 

716 # (Used for flushing the enter key.) 

717 # This sleep can be cancelled, in that case we won't flush yet. 

718 await sleep(self.ttimeoutlen) 

719 flush_input() 

720 

721 def flush_input() -> None: 

722 if not self.is_done: 

723 # Get keys, and feed to key processor. 

724 keys = self.input.flush_keys() 

725 self.key_processor.feed_multiple(keys) 

726 self.key_processor.process_keys() 

727 

728 if self.input.closed: 

729 f.set_exception(EOFError) 

730 

731 # Enter raw mode, attach input and attach WINCH event handler. 

732 with self.input.raw_mode(), self.input.attach( 

733 read_from_input_in_context 

734 ), attach_winch_signal_handler(self._on_resize): 

735 # Draw UI. 

736 self._request_absolute_cursor_position() 

737 self._redraw() 

738 self._start_auto_refresh_task() 

739 

740 self.create_background_task(self._poll_output_size()) 

741 

742 # Wait for UI to finish. 

743 try: 

744 result = await f 

745 finally: 

746 # In any case, when the application finishes. 

747 # (Successful, or because of an error.) 

748 try: 

749 self._redraw(render_as_done=True) 

750 finally: 

751 # _redraw has a good chance to fail if it calls widgets 

752 # with bad code. Make sure to reset the renderer 

753 # anyway. 

754 self.renderer.reset() 

755 

756 # Unset `is_running`, this ensures that possibly 

757 # scheduled draws won't paint during the following 

758 # yield. 

759 self._is_running = False 

760 

761 # Detach event handlers for invalidate events. 

762 # (Important when a UIControl is embedded in multiple 

763 # applications, like ptterm in pymux. An invalidate 

764 # should not trigger a repaint in terminated 

765 # applications.) 

766 for ev in self._invalidate_events: 

767 ev -= self._invalidate_handler 

768 self._invalidate_events = [] 

769 

770 # Wait for CPR responses. 

771 if self.output.responds_to_cpr: 

772 await self.renderer.wait_for_cpr_responses() 

773 

774 # Wait for the run-in-terminals to terminate. 

775 previous_run_in_terminal_f = self._running_in_terminal_f 

776 

777 if previous_run_in_terminal_f: 

778 await previous_run_in_terminal_f 

779 

780 # Store unprocessed input as typeahead for next time. 

781 store_typeahead(self.input, self.key_processor.empty_queue()) 

782 

783 return result 

784 

785 @contextmanager 

786 def set_loop() -> Iterator[AbstractEventLoop]: 

787 loop = get_running_loop() 

788 self.loop = loop 

789 self._loop_thread = threading.current_thread() 

790 

791 try: 

792 yield loop 

793 finally: 

794 self.loop = None 

795 self._loop_thread = None 

796 

797 @contextmanager 

798 def set_is_running() -> Iterator[None]: 

799 self._is_running = True 

800 try: 

801 yield 

802 finally: 

803 self._is_running = False 

804 

805 @contextmanager 

806 def set_handle_sigint(loop: AbstractEventLoop) -> Iterator[None]: 

807 if handle_sigint: 

808 loop.add_signal_handler( 

809 signal.SIGINT, 

810 lambda *_: loop.call_soon_threadsafe( 

811 self.key_processor.send_sigint 

812 ), 

813 ) 

814 try: 

815 yield 

816 finally: 

817 loop.remove_signal_handler(signal.SIGINT) 

818 else: 

819 yield 

820 

821 @contextmanager 

822 def set_exception_handler_ctx(loop: AbstractEventLoop) -> Iterator[None]: 

823 if set_exception_handler: 

824 previous_exc_handler = loop.get_exception_handler() 

825 loop.set_exception_handler(self._handle_exception) 

826 try: 

827 yield 

828 finally: 

829 loop.set_exception_handler(previous_exc_handler) 

830 

831 else: 

832 yield 

833 

834 @contextmanager 

835 def set_callback_duration(loop: AbstractEventLoop) -> Iterator[None]: 

836 # Set slow_callback_duration. 

837 original_slow_callback_duration = loop.slow_callback_duration 

838 loop.slow_callback_duration = slow_callback_duration 

839 try: 

840 yield 

841 finally: 

842 # Reset slow_callback_duration. 

843 loop.slow_callback_duration = original_slow_callback_duration 

844 

845 @contextmanager 

846 def create_future( 

847 loop: AbstractEventLoop, 

848 ) -> Iterator[asyncio.Future[_AppResult]]: 

849 f = loop.create_future() 

850 self.future = f # XXX: make sure to set this before calling '_redraw'. 

851 

852 try: 

853 yield f 

854 finally: 

855 # Also remove the Future again. (This brings the 

856 # application back to its initial state, where it also 

857 # doesn't have a Future.) 

858 self.future = None 

859 

860 with ExitStack() as stack: 

861 stack.enter_context(set_is_running()) 

862 

863 # Make sure to set `_invalidated` to `False` to begin with, 

864 # otherwise we're not going to paint anything. This can happen if 

865 # this application had run before on a different event loop, and a 

866 # paint was scheduled using `call_soon_threadsafe` with 

867 # `max_postpone_time`. 

868 self._invalidated = False 

869 

870 loop = stack.enter_context(set_loop()) 

871 

872 stack.enter_context(set_handle_sigint(loop)) 

873 stack.enter_context(set_exception_handler_ctx(loop)) 

874 stack.enter_context(set_callback_duration(loop)) 

875 stack.enter_context(set_app(self)) 

876 stack.enter_context(self._enable_breakpointhook()) 

877 

878 f = stack.enter_context(create_future(loop)) 

879 

880 try: 

881 return await _run_async(f) 

882 finally: 

883 # Wait for the background tasks to be done. This needs to 

884 # go in the finally! If `_run_async` raises 

885 # `KeyboardInterrupt`, we still want to wait for the 

886 # background tasks. 

887 await self.cancel_and_wait_for_background_tasks() 

888 

889 # The `ExitStack` above is defined in typeshed in a way that it can 

890 # swallow exceptions. Without next line, mypy would think that there's 

891 # a possibility we don't return here. See: 

892 # https://github.com/python/mypy/issues/7726 

893 assert False, "unreachable" 

894 

895 def run( 

896 self, 

897 pre_run: Callable[[], None] | None = None, 

898 set_exception_handler: bool = True, 

899 handle_sigint: bool = True, 

900 in_thread: bool = False, 

901 ) -> _AppResult: 

902 """ 

903 A blocking 'run' call that waits until the UI is finished. 

904 

905 This will start the current asyncio event loop. If no loop is set for 

906 the current thread, then it will create a new loop. If a new loop was 

907 created, this won't close the new loop (if `in_thread=False`). 

908 

909 :param pre_run: Optional callable, which is called right after the 

910 "reset" of the application. 

911 :param set_exception_handler: When set, in case of an exception, go out 

912 of the alternate screen and hide the application, display the 

913 exception, and wait for the user to press ENTER. 

914 :param in_thread: When true, run the application in a background 

915 thread, and block the current thread until the application 

916 terminates. This is useful if we need to be sure the application 

917 won't use the current event loop (asyncio does not support nested 

918 event loops). A new event loop will be created in this background 

919 thread, and that loop will also be closed when the background 

920 thread terminates. When this is used, it's especially important to 

921 make sure that all asyncio background tasks are managed through 

922 `get_appp().create_background_task()`, so that unfinished tasks are 

923 properly cancelled before the event loop is closed. This is used 

924 for instance in ptpython. 

925 :param handle_sigint: Handle SIGINT signal. Call the key binding for 

926 `Keys.SIGINT`. (This only works in the main thread.) 

927 """ 

928 if in_thread: 

929 result: _AppResult 

930 exception: BaseException | None = None 

931 

932 def run_in_thread() -> None: 

933 nonlocal result, exception 

934 try: 

935 result = self.run( 

936 pre_run=pre_run, 

937 set_exception_handler=set_exception_handler, 

938 # Signal handling only works in the main thread. 

939 handle_sigint=False, 

940 ) 

941 except BaseException as e: 

942 exception = e 

943 

944 thread = threading.Thread(target=run_in_thread) 

945 thread.start() 

946 thread.join() 

947 

948 if exception is not None: 

949 raise exception 

950 return result 

951 

952 coro = self.run_async( 

953 pre_run=pre_run, 

954 set_exception_handler=set_exception_handler, 

955 handle_sigint=handle_sigint, 

956 ) 

957 try: 

958 # See whether a loop was installed already. If so, use that. That's 

959 # required for the input hooks to work, they are installed using 

960 # `set_event_loop`. 

961 loop = asyncio.get_event_loop() 

962 except RuntimeError: 

963 # No loop installed. Run like usual. 

964 return asyncio.run(coro) 

965 else: 

966 # Use existing loop. 

967 return loop.run_until_complete(coro) 

968 

969 def _handle_exception( 

970 self, loop: AbstractEventLoop, context: dict[str, Any] 

971 ) -> None: 

972 """ 

973 Handler for event loop exceptions. 

974 This will print the exception, using run_in_terminal. 

975 """ 

976 # For Python 2: we have to get traceback at this point, because 

977 # we're still in the 'except:' block of the event loop where the 

978 # traceback is still available. Moving this code in the 

979 # 'print_exception' coroutine will loose the exception. 

980 tb = get_traceback_from_context(context) 

981 formatted_tb = "".join(format_tb(tb)) 

982 

983 async def in_term() -> None: 

984 async with in_terminal(): 

985 # Print output. Similar to 'loop.default_exception_handler', 

986 # but don't use logger. (This works better on Python 2.) 

987 print("\nUnhandled exception in event loop:") 

988 print(formatted_tb) 

989 print("Exception {}".format(context.get("exception"))) 

990 

991 await _do_wait_for_enter("Press ENTER to continue...") 

992 

993 ensure_future(in_term()) 

994 

995 @contextmanager 

996 def _enable_breakpointhook(self) -> Generator[None, None, None]: 

997 """ 

998 Install our custom breakpointhook for the duration of this context 

999 manager. (We will only install the hook if no other custom hook was 

1000 set.) 

1001 """ 

1002 if sys.version_info >= (3, 7) and sys.breakpointhook == sys.__breakpointhook__: 

1003 sys.breakpointhook = self._breakpointhook 

1004 

1005 try: 

1006 yield 

1007 finally: 

1008 sys.breakpointhook = sys.__breakpointhook__ 

1009 else: 

1010 yield 

1011 

1012 def _breakpointhook(self, *a: object, **kw: object) -> None: 

1013 """ 

1014 Breakpointhook which uses PDB, but ensures that the application is 

1015 hidden and input echoing is restored during each debugger dispatch. 

1016 

1017 This can be called from any thread. In any case, the application's 

1018 event loop will be blocked while the PDB input is displayed. The event 

1019 will continue after leaving the debugger. 

1020 """ 

1021 app = self 

1022 # Inline import on purpose. We don't want to import pdb, if not needed. 

1023 import pdb 

1024 from types import FrameType 

1025 

1026 TraceDispatch = Callable[[FrameType, str, Any], Any] 

1027 

1028 @contextmanager 

1029 def hide_app_from_eventloop_thread() -> Generator[None, None, None]: 

1030 """Stop application if `__breakpointhook__` is called from within 

1031 the App's event loop.""" 

1032 # Hide application. 

1033 app.renderer.erase() 

1034 

1035 # Detach input and dispatch to debugger. 

1036 with app.input.detach(): 

1037 with app.input.cooked_mode(): 

1038 yield 

1039 

1040 # Note: we don't render the application again here, because 

1041 # there's a good chance that there's a breakpoint on the next 

1042 # line. This paint/erase cycle would move the PDB prompt back 

1043 # to the middle of the screen. 

1044 

1045 @contextmanager 

1046 def hide_app_from_other_thread() -> Generator[None, None, None]: 

1047 """Stop application if `__breakpointhook__` is called from a 

1048 thread other than the App's event loop.""" 

1049 ready = threading.Event() 

1050 done = threading.Event() 

1051 

1052 async def in_loop() -> None: 

1053 # from .run_in_terminal import in_terminal 

1054 # async with in_terminal(): 

1055 # ready.set() 

1056 # await asyncio.get_running_loop().run_in_executor(None, done.wait) 

1057 # return 

1058 

1059 # Hide application. 

1060 app.renderer.erase() 

1061 

1062 # Detach input and dispatch to debugger. 

1063 with app.input.detach(): 

1064 with app.input.cooked_mode(): 

1065 ready.set() 

1066 # Here we block the App's event loop thread until the 

1067 # debugger resumes. We could have used `with 

1068 # run_in_terminal.in_terminal():` like the commented 

1069 # code above, but it seems to work better if we 

1070 # completely stop the main event loop while debugging. 

1071 done.wait() 

1072 

1073 self.create_background_task(in_loop()) 

1074 ready.wait() 

1075 try: 

1076 yield 

1077 finally: 

1078 done.set() 

1079 

1080 class CustomPdb(pdb.Pdb): 

1081 def trace_dispatch( 

1082 self, frame: FrameType, event: str, arg: Any 

1083 ) -> TraceDispatch: 

1084 if app._loop_thread is None: 

1085 return super().trace_dispatch(frame, event, arg) 

1086 

1087 if app._loop_thread == threading.current_thread(): 

1088 with hide_app_from_eventloop_thread(): 

1089 return super().trace_dispatch(frame, event, arg) 

1090 

1091 with hide_app_from_other_thread(): 

1092 return super().trace_dispatch(frame, event, arg) 

1093 

1094 frame = sys._getframe().f_back 

1095 CustomPdb(stdout=sys.__stdout__).set_trace(frame) 

1096 

1097 def create_background_task( 

1098 self, coroutine: Coroutine[Any, Any, None] 

1099 ) -> asyncio.Task[None]: 

1100 """ 

1101 Start a background task (coroutine) for the running application. When 

1102 the `Application` terminates, unfinished background tasks will be 

1103 cancelled. 

1104 

1105 Given that we still support Python versions before 3.11, we can't use 

1106 task groups (and exception groups), because of that, these background 

1107 tasks are not allowed to raise exceptions. If they do, we'll call the 

1108 default exception handler from the event loop. 

1109 

1110 If at some point, we have Python 3.11 as the minimum supported Python 

1111 version, then we can use a `TaskGroup` (with the lifetime of 

1112 `Application.run_async()`, and run run the background tasks in there. 

1113 

1114 This is not threadsafe. 

1115 """ 

1116 loop = self.loop or get_running_loop() 

1117 task: asyncio.Task[None] = loop.create_task(coroutine) 

1118 self._background_tasks.add(task) 

1119 

1120 task.add_done_callback(self._on_background_task_done) 

1121 return task 

1122 

1123 def _on_background_task_done(self, task: asyncio.Task[None]) -> None: 

1124 """ 

1125 Called when a background task completes. Remove it from 

1126 `_background_tasks`, and handle exceptions if any. 

1127 """ 

1128 self._background_tasks.discard(task) 

1129 

1130 if task.cancelled(): 

1131 return 

1132 

1133 exc = task.exception() 

1134 if exc is not None: 

1135 get_running_loop().call_exception_handler( 

1136 { 

1137 "message": f"prompt_toolkit.Application background task {task!r} " 

1138 "raised an unexpected exception.", 

1139 "exception": exc, 

1140 "task": task, 

1141 } 

1142 ) 

1143 

1144 async def cancel_and_wait_for_background_tasks(self) -> None: 

1145 """ 

1146 Cancel all background tasks, and wait for the cancellation to complete. 

1147 If any of the background tasks raised an exception, this will also 

1148 propagate the exception. 

1149 

1150 (If we had nurseries like Trio, this would be the `__aexit__` of a 

1151 nursery.) 

1152 """ 

1153 for task in self._background_tasks: 

1154 task.cancel() 

1155 

1156 # Wait until the cancellation of the background tasks completes. 

1157 # `asyncio.wait()` does not propagate exceptions raised within any of 

1158 # these tasks, which is what we want. Otherwise, we can't distinguish 

1159 # between a `CancelledError` raised in this task because it got 

1160 # cancelled, and a `CancelledError` raised on this `await` checkpoint, 

1161 # because *we* got cancelled during the teardown of the application. 

1162 # (If we get cancelled here, then it's important to not suppress the 

1163 # `CancelledError`, and have it propagate.) 

1164 # NOTE: Currently, if we get cancelled at this point then we can't wait 

1165 # for the cancellation to complete (in the future, we should be 

1166 # using anyio or Python's 3.11 TaskGroup.) 

1167 # Also, if we had exception groups, we could propagate an 

1168 # `ExceptionGroup` if something went wrong here. Right now, we 

1169 # don't propagate exceptions, but have them printed in 

1170 # `_on_background_task_done`. 

1171 if len(self._background_tasks) > 0: 

1172 await asyncio.wait( 

1173 self._background_tasks, timeout=None, return_when=asyncio.ALL_COMPLETED 

1174 ) 

1175 

1176 async def _poll_output_size(self) -> None: 

1177 """ 

1178 Coroutine for polling the terminal dimensions. 

1179 

1180 Useful for situations where `attach_winch_signal_handler` is not sufficient: 

1181 - If we are not running in the main thread. 

1182 - On Windows. 

1183 """ 

1184 size: Size | None = None 

1185 interval = self.terminal_size_polling_interval 

1186 

1187 if interval is None: 

1188 return 

1189 

1190 while True: 

1191 await asyncio.sleep(interval) 

1192 new_size = self.output.get_size() 

1193 

1194 if size is not None and new_size != size: 

1195 self._on_resize() 

1196 size = new_size 

1197 

1198 def cpr_not_supported_callback(self) -> None: 

1199 """ 

1200 Called when we don't receive the cursor position response in time. 

1201 """ 

1202 if not self.output.responds_to_cpr: 

1203 return # We know about this already. 

1204 

1205 def in_terminal() -> None: 

1206 self.output.write( 

1207 "WARNING: your terminal doesn't support cursor position requests (CPR).\r\n" 

1208 ) 

1209 self.output.flush() 

1210 

1211 run_in_terminal(in_terminal) 

1212 

1213 @overload 

1214 def exit(self) -> None: 

1215 "Exit without arguments." 

1216 

1217 @overload 

1218 def exit(self, *, result: _AppResult, style: str = "") -> None: 

1219 "Exit with `_AppResult`." 

1220 

1221 @overload 

1222 def exit( 

1223 self, *, exception: BaseException | type[BaseException], style: str = "" 

1224 ) -> None: 

1225 "Exit with exception." 

1226 

1227 def exit( 

1228 self, 

1229 result: _AppResult | None = None, 

1230 exception: BaseException | type[BaseException] | None = None, 

1231 style: str = "", 

1232 ) -> None: 

1233 """ 

1234 Exit application. 

1235 

1236 .. note:: 

1237 

1238 If `Application.exit` is called before `Application.run()` is 

1239 called, then the `Application` won't exit (because the 

1240 `Application.future` doesn't correspond to the current run). Use a 

1241 `pre_run` hook and an event to synchronize the closing if there's a 

1242 chance this can happen. 

1243 

1244 :param result: Set this result for the application. 

1245 :param exception: Set this exception as the result for an application. For 

1246 a prompt, this is often `EOFError` or `KeyboardInterrupt`. 

1247 :param style: Apply this style on the whole content when quitting, 

1248 often this is 'class:exiting' for a prompt. (Used when 

1249 `erase_when_done` is not set.) 

1250 """ 

1251 assert result is None or exception is None 

1252 

1253 if self.future is None: 

1254 raise Exception("Application is not running. Application.exit() failed.") 

1255 

1256 if self.future.done(): 

1257 raise Exception("Return value already set. Application.exit() failed.") 

1258 

1259 self.exit_style = style 

1260 

1261 if exception is not None: 

1262 self.future.set_exception(exception) 

1263 else: 

1264 self.future.set_result(cast(_AppResult, result)) 

1265 

1266 def _request_absolute_cursor_position(self) -> None: 

1267 """ 

1268 Send CPR request. 

1269 """ 

1270 # Note: only do this if the input queue is not empty, and a return 

1271 # value has not been set. Otherwise, we won't be able to read the 

1272 # response anyway. 

1273 if not self.key_processor.input_queue and not self.is_done: 

1274 self.renderer.request_absolute_cursor_position() 

1275 

1276 async def run_system_command( 

1277 self, 

1278 command: str, 

1279 wait_for_enter: bool = True, 

1280 display_before_text: AnyFormattedText = "", 

1281 wait_text: str = "Press ENTER to continue...", 

1282 ) -> None: 

1283 """ 

1284 Run system command (While hiding the prompt. When finished, all the 

1285 output will scroll above the prompt.) 

1286 

1287 :param command: Shell command to be executed. 

1288 :param wait_for_enter: FWait for the user to press enter, when the 

1289 command is finished. 

1290 :param display_before_text: If given, text to be displayed before the 

1291 command executes. 

1292 :return: A `Future` object. 

1293 """ 

1294 async with in_terminal(): 

1295 # Try to use the same input/output file descriptors as the one, 

1296 # used to run this application. 

1297 try: 

1298 input_fd = self.input.fileno() 

1299 except AttributeError: 

1300 input_fd = sys.stdin.fileno() 

1301 try: 

1302 output_fd = self.output.fileno() 

1303 except AttributeError: 

1304 output_fd = sys.stdout.fileno() 

1305 

1306 # Run sub process. 

1307 def run_command() -> None: 

1308 self.print_text(display_before_text) 

1309 p = Popen(command, shell=True, stdin=input_fd, stdout=output_fd) 

1310 p.wait() 

1311 

1312 await run_in_executor_with_context(run_command) 

1313 

1314 # Wait for the user to press enter. 

1315 if wait_for_enter: 

1316 await _do_wait_for_enter(wait_text) 

1317 

1318 def suspend_to_background(self, suspend_group: bool = True) -> None: 

1319 """ 

1320 (Not thread safe -- to be called from inside the key bindings.) 

1321 Suspend process. 

1322 

1323 :param suspend_group: When true, suspend the whole process group. 

1324 (This is the default, and probably what you want.) 

1325 """ 

1326 # Only suspend when the operating system supports it. 

1327 # (Not on Windows.) 

1328 if _SIGTSTP is not None: 

1329 

1330 def run() -> None: 

1331 signal = cast(int, _SIGTSTP) 

1332 # Send `SIGTSTP` to own process. 

1333 # This will cause it to suspend. 

1334 

1335 # Usually we want the whole process group to be suspended. This 

1336 # handles the case when input is piped from another process. 

1337 if suspend_group: 

1338 os.kill(0, signal) 

1339 else: 

1340 os.kill(os.getpid(), signal) 

1341 

1342 run_in_terminal(run) 

1343 

1344 def print_text( 

1345 self, text: AnyFormattedText, style: BaseStyle | None = None 

1346 ) -> None: 

1347 """ 

1348 Print a list of (style_str, text) tuples to the output. 

1349 (When the UI is running, this method has to be called through 

1350 `run_in_terminal`, otherwise it will destroy the UI.) 

1351 

1352 :param text: List of ``(style_str, text)`` tuples. 

1353 :param style: Style class to use. Defaults to the active style in the CLI. 

1354 """ 

1355 print_formatted_text( 

1356 output=self.output, 

1357 formatted_text=text, 

1358 style=style or self._merged_style, 

1359 color_depth=self.color_depth, 

1360 style_transformation=self.style_transformation, 

1361 ) 

1362 

1363 @property 

1364 def is_running(self) -> bool: 

1365 "`True` when the application is currently active/running." 

1366 return self._is_running 

1367 

1368 @property 

1369 def is_done(self) -> bool: 

1370 if self.future: 

1371 return self.future.done() 

1372 return False 

1373 

1374 def get_used_style_strings(self) -> list[str]: 

1375 """ 

1376 Return a list of used style strings. This is helpful for debugging, and 

1377 for writing a new `Style`. 

1378 """ 

1379 attrs_for_style = self.renderer._attrs_for_style 

1380 

1381 if attrs_for_style: 

1382 return sorted( 

1383 re.sub(r"\s+", " ", style_str).strip() 

1384 for style_str in attrs_for_style.keys() 

1385 ) 

1386 

1387 return [] 

1388 

1389 

1390class _CombinedRegistry(KeyBindingsBase): 

1391 """ 

1392 The `KeyBindings` of key bindings for a `Application`. 

1393 This merges the global key bindings with the one of the current user 

1394 control. 

1395 """ 

1396 

1397 def __init__(self, app: Application[_AppResult]) -> None: 

1398 self.app = app 

1399 self._cache: SimpleCache[ 

1400 tuple[Window, frozenset[UIControl]], KeyBindingsBase 

1401 ] = SimpleCache() 

1402 

1403 @property 

1404 def _version(self) -> Hashable: 

1405 """Not needed - this object is not going to be wrapped in another 

1406 KeyBindings object.""" 

1407 raise NotImplementedError 

1408 

1409 @property 

1410 def bindings(self) -> list[Binding]: 

1411 """Not needed - this object is not going to be wrapped in another 

1412 KeyBindings object.""" 

1413 raise NotImplementedError 

1414 

1415 def _create_key_bindings( 

1416 self, current_window: Window, other_controls: list[UIControl] 

1417 ) -> KeyBindingsBase: 

1418 """ 

1419 Create a `KeyBindings` object that merges the `KeyBindings` from the 

1420 `UIControl` with all the parent controls and the global key bindings. 

1421 """ 

1422 key_bindings = [] 

1423 collected_containers = set() 

1424 

1425 # Collect key bindings from currently focused control and all parent 

1426 # controls. Don't include key bindings of container parent controls. 

1427 container: Container = current_window 

1428 while True: 

1429 collected_containers.add(container) 

1430 kb = container.get_key_bindings() 

1431 if kb is not None: 

1432 key_bindings.append(kb) 

1433 

1434 if container.is_modal(): 

1435 break 

1436 

1437 parent = self.app.layout.get_parent(container) 

1438 if parent is None: 

1439 break 

1440 else: 

1441 container = parent 

1442 

1443 # Include global bindings (starting at the top-model container). 

1444 for c in walk(container): 

1445 if c not in collected_containers: 

1446 kb = c.get_key_bindings() 

1447 if kb is not None: 

1448 key_bindings.append(GlobalOnlyKeyBindings(kb)) 

1449 

1450 # Add App key bindings 

1451 if self.app.key_bindings: 

1452 key_bindings.append(self.app.key_bindings) 

1453 

1454 # Add mouse bindings. 

1455 key_bindings.append( 

1456 ConditionalKeyBindings( 

1457 self.app._page_navigation_bindings, 

1458 self.app.enable_page_navigation_bindings, 

1459 ) 

1460 ) 

1461 key_bindings.append(self.app._default_bindings) 

1462 

1463 # Reverse this list. The current control's key bindings should come 

1464 # last. They need priority. 

1465 key_bindings = key_bindings[::-1] 

1466 

1467 return merge_key_bindings(key_bindings) 

1468 

1469 @property 

1470 def _key_bindings(self) -> KeyBindingsBase: 

1471 current_window = self.app.layout.current_window 

1472 other_controls = list(self.app.layout.find_all_controls()) 

1473 key = current_window, frozenset(other_controls) 

1474 

1475 return self._cache.get( 

1476 key, lambda: self._create_key_bindings(current_window, other_controls) 

1477 ) 

1478 

1479 def get_bindings_for_keys(self, keys: KeysTuple) -> list[Binding]: 

1480 return self._key_bindings.get_bindings_for_keys(keys) 

1481 

1482 def get_bindings_starting_with_keys(self, keys: KeysTuple) -> list[Binding]: 

1483 return self._key_bindings.get_bindings_starting_with_keys(keys) 

1484 

1485 

1486async def _do_wait_for_enter(wait_text: AnyFormattedText) -> None: 

1487 """ 

1488 Create a sub application to wait for the enter key press. 

1489 This has two advantages over using 'input'/'raw_input': 

1490 - This will share the same input/output I/O. 

1491 - This doesn't block the event loop. 

1492 """ 

1493 from prompt_toolkit.shortcuts import PromptSession 

1494 

1495 key_bindings = KeyBindings() 

1496 

1497 @key_bindings.add("enter") 

1498 def _ok(event: E) -> None: 

1499 event.app.exit() 

1500 

1501 @key_bindings.add(Keys.Any) 

1502 def _ignore(event: E) -> None: 

1503 "Disallow typing." 

1504 pass 

1505 

1506 session: PromptSession[None] = PromptSession( 

1507 message=wait_text, key_bindings=key_bindings 

1508 ) 

1509 try: 

1510 await session.app.run_async() 

1511 except KeyboardInterrupt: 

1512 pass # Control-c pressed. Don't propagate this error. 

1513 

1514 

1515@contextmanager 

1516def attach_winch_signal_handler( 

1517 handler: Callable[[], None] 

1518) -> Generator[None, None, None]: 

1519 """ 

1520 Attach the given callback as a WINCH signal handler within the context 

1521 manager. Restore the original signal handler when done. 

1522 

1523 The `Application.run` method will register SIGWINCH, so that it will 

1524 properly repaint when the terminal window resizes. However, using 

1525 `run_in_terminal`, we can temporarily send an application to the 

1526 background, and run an other app in between, which will then overwrite the 

1527 SIGWINCH. This is why it's important to restore the handler when the app 

1528 terminates. 

1529 """ 

1530 # The tricky part here is that signals are registered in the Unix event 

1531 # loop with a wakeup fd, but another application could have registered 

1532 # signals using signal.signal directly. For now, the implementation is 

1533 # hard-coded for the `asyncio.unix_events._UnixSelectorEventLoop`. 

1534 

1535 # No WINCH? Then don't do anything. 

1536 sigwinch = getattr(signal, "SIGWINCH", None) 

1537 if sigwinch is None or not in_main_thread(): 

1538 yield 

1539 return 

1540 

1541 # Keep track of the previous handler. 

1542 # (Only UnixSelectorEventloop has `_signal_handlers`.) 

1543 loop = get_running_loop() 

1544 previous_winch_handler = getattr(loop, "_signal_handlers", {}).get(sigwinch) 

1545 

1546 try: 

1547 loop.add_signal_handler(sigwinch, handler) 

1548 yield 

1549 finally: 

1550 # Restore the previous signal handler. 

1551 loop.remove_signal_handler(sigwinch) 

1552 if previous_winch_handler is not None: 

1553 loop.add_signal_handler( 

1554 sigwinch, 

1555 previous_winch_handler._callback, 

1556 *previous_winch_handler._args, 

1557 )