Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/application/application.py: 20%

551 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import contextvars 

5import os 

6import re 

7import signal 

8import sys 

9import threading 

10import time 

11from asyncio import ( 

12 AbstractEventLoop, 

13 Future, 

14 Task, 

15 ensure_future, 

16 get_running_loop, 

17 sleep, 

18) 

19from contextlib import ExitStack, contextmanager 

20from subprocess import Popen 

21from traceback import format_tb 

22from typing import ( 

23 Any, 

24 Callable, 

25 Coroutine, 

26 Dict, 

27 FrozenSet, 

28 Generator, 

29 Generic, 

30 Hashable, 

31 Iterable, 

32 Iterator, 

33 List, 

34 Optional, 

35 Set, 

36 Tuple, 

37 Type, 

38 TypeVar, 

39 Union, 

40 cast, 

41 overload, 

42) 

43 

44from prompt_toolkit.buffer import Buffer 

45from prompt_toolkit.cache import SimpleCache 

46from prompt_toolkit.clipboard import Clipboard, InMemoryClipboard 

47from prompt_toolkit.cursor_shapes import AnyCursorShapeConfig, to_cursor_shape_config 

48from prompt_toolkit.data_structures import Size 

49from prompt_toolkit.enums import EditingMode 

50from prompt_toolkit.eventloop import ( 

51 get_traceback_from_context, 

52 run_in_executor_with_context, 

53) 

54from prompt_toolkit.eventloop.utils import call_soon_threadsafe 

55from prompt_toolkit.filters import Condition, Filter, FilterOrBool, to_filter 

56from prompt_toolkit.formatted_text import AnyFormattedText 

57from prompt_toolkit.input.base import Input 

58from prompt_toolkit.input.typeahead import get_typeahead, store_typeahead 

59from prompt_toolkit.key_binding.bindings.page_navigation import ( 

60 load_page_navigation_bindings, 

61) 

62from prompt_toolkit.key_binding.defaults import load_key_bindings 

63from prompt_toolkit.key_binding.emacs_state import EmacsState 

64from prompt_toolkit.key_binding.key_bindings import ( 

65 Binding, 

66 ConditionalKeyBindings, 

67 GlobalOnlyKeyBindings, 

68 KeyBindings, 

69 KeyBindingsBase, 

70 KeysTuple, 

71 merge_key_bindings, 

72) 

73from prompt_toolkit.key_binding.key_processor import KeyPressEvent, KeyProcessor 

74from prompt_toolkit.key_binding.vi_state import ViState 

75from prompt_toolkit.keys import Keys 

76from prompt_toolkit.layout.containers import Container, Window 

77from prompt_toolkit.layout.controls import BufferControl, UIControl 

78from prompt_toolkit.layout.dummy import create_dummy_layout 

79from prompt_toolkit.layout.layout import Layout, walk 

80from prompt_toolkit.output import ColorDepth, Output 

81from prompt_toolkit.renderer import Renderer, print_formatted_text 

82from prompt_toolkit.search import SearchState 

83from prompt_toolkit.styles import ( 

84 BaseStyle, 

85 DummyStyle, 

86 DummyStyleTransformation, 

87 DynamicStyle, 

88 StyleTransformation, 

89 default_pygments_style, 

90 default_ui_style, 

91 merge_styles, 

92) 

93from prompt_toolkit.utils import Event, in_main_thread 

94 

95from .current import get_app_session, set_app 

96from .run_in_terminal import in_terminal, run_in_terminal 

97 

98__all__ = [ 

99 "Application", 

100] 

101 

102 

103E = KeyPressEvent 

104_AppResult = TypeVar("_AppResult") 

105ApplicationEventHandler = Callable[["Application[_AppResult]"], None] 

106 

107_SIGWINCH = getattr(signal, "SIGWINCH", None) 

108_SIGTSTP = getattr(signal, "SIGTSTP", None) 

109 

110 

111class Application(Generic[_AppResult]): 

112 """ 

113 The main Application class! 

114 This glues everything together. 

115 

116 :param layout: A :class:`~prompt_toolkit.layout.Layout` instance. 

117 :param key_bindings: 

118 :class:`~prompt_toolkit.key_binding.KeyBindingsBase` instance for 

119 the key bindings. 

120 :param clipboard: :class:`~prompt_toolkit.clipboard.Clipboard` to use. 

121 :param full_screen: When True, run the application on the alternate screen buffer. 

122 :param color_depth: Any :class:`~.ColorDepth` value, a callable that 

123 returns a :class:`~.ColorDepth` or `None` for default. 

124 :param erase_when_done: (bool) Clear the application output when it finishes. 

125 :param reverse_vi_search_direction: Normally, in Vi mode, a '/' searches 

126 forward and a '?' searches backward. In Readline mode, this is usually 

127 reversed. 

128 :param min_redraw_interval: Number of seconds to wait between redraws. Use 

129 this for applications where `invalidate` is called a lot. This could cause 

130 a lot of terminal output, which some terminals are not able to process. 

131 

132 `None` means that every `invalidate` will be scheduled right away 

133 (which is usually fine). 

134 

135 When one `invalidate` is called, but a scheduled redraw of a previous 

136 `invalidate` call has not been executed yet, nothing will happen in any 

137 case. 

138 

139 :param max_render_postpone_time: When there is high CPU (a lot of other 

140 scheduled calls), postpone the rendering max x seconds. '0' means: 

141 don't postpone. '.5' means: try to draw at least twice a second. 

142 

143 :param refresh_interval: Automatically invalidate the UI every so many 

144 seconds. When `None` (the default), only invalidate when `invalidate` 

145 has been called. 

146 

147 :param terminal_size_polling_interval: Poll the terminal size every so many 

148 seconds. Useful if the applications runs in a thread other then then 

149 main thread where SIGWINCH can't be handled, or on Windows. 

150 

151 Filters: 

152 

153 :param mouse_support: (:class:`~prompt_toolkit.filters.Filter` or 

154 boolean). When True, enable mouse support. 

155 :param paste_mode: :class:`~prompt_toolkit.filters.Filter` or boolean. 

156 :param editing_mode: :class:`~prompt_toolkit.enums.EditingMode`. 

157 

158 :param enable_page_navigation_bindings: When `True`, enable the page 

159 navigation key bindings. These include both Emacs and Vi bindings like 

160 page-up, page-down and so on to scroll through pages. Mostly useful for 

161 creating an editor or other full screen applications. Probably, you 

162 don't want this for the implementation of a REPL. By default, this is 

163 enabled if `full_screen` is set. 

164 

165 Callbacks (all of these should accept an 

166 :class:`~prompt_toolkit.application.Application` object as input.) 

167 

168 :param on_reset: Called during reset. 

169 :param on_invalidate: Called when the UI has been invalidated. 

170 :param before_render: Called right before rendering. 

171 :param after_render: Called right after rendering. 

172 

173 I/O: 

174 (Note that the preferred way to change the input/output is by creating an 

175 `AppSession` with the required input/output objects. If you need multiple 

176 applications running at the same time, you have to create a separate 

177 `AppSession` using a `with create_app_session():` block. 

178 

179 :param input: :class:`~prompt_toolkit.input.Input` instance. 

180 :param output: :class:`~prompt_toolkit.output.Output` instance. (Probably 

181 Vt100_Output or Win32Output.) 

182 

183 Usage: 

184 

185 app = Application(...) 

186 app.run() 

187 

188 # Or 

189 await app.run_async() 

190 """ 

191 

192 def __init__( 

193 self, 

194 layout: Layout | None = None, 

195 style: BaseStyle | None = None, 

196 include_default_pygments_style: FilterOrBool = True, 

197 style_transformation: StyleTransformation | None = None, 

198 key_bindings: KeyBindingsBase | None = None, 

199 clipboard: Clipboard | None = None, 

200 full_screen: bool = False, 

201 color_depth: (ColorDepth | Callable[[], ColorDepth | None] | None) = None, 

202 mouse_support: FilterOrBool = False, 

203 enable_page_navigation_bindings: None 

204 | (FilterOrBool) = None, # Can be None, True or False. 

205 paste_mode: FilterOrBool = False, 

206 editing_mode: EditingMode = EditingMode.EMACS, 

207 erase_when_done: bool = False, 

208 reverse_vi_search_direction: FilterOrBool = False, 

209 min_redraw_interval: float | int | None = None, 

210 max_render_postpone_time: float | int | None = 0.01, 

211 refresh_interval: float | None = None, 

212 terminal_size_polling_interval: float | None = 0.5, 

213 cursor: AnyCursorShapeConfig = None, 

214 on_reset: ApplicationEventHandler[_AppResult] | None = None, 

215 on_invalidate: ApplicationEventHandler[_AppResult] | None = None, 

216 before_render: ApplicationEventHandler[_AppResult] | None = None, 

217 after_render: ApplicationEventHandler[_AppResult] | None = None, 

218 # I/O. 

219 input: Input | None = None, 

220 output: Output | None = None, 

221 ) -> None: 

222 # If `enable_page_navigation_bindings` is not specified, enable it in 

223 # case of full screen applications only. This can be overridden by the user. 

224 if enable_page_navigation_bindings is None: 

225 enable_page_navigation_bindings = Condition(lambda: self.full_screen) 

226 

227 paste_mode = to_filter(paste_mode) 

228 mouse_support = to_filter(mouse_support) 

229 reverse_vi_search_direction = to_filter(reverse_vi_search_direction) 

230 enable_page_navigation_bindings = to_filter(enable_page_navigation_bindings) 

231 include_default_pygments_style = to_filter(include_default_pygments_style) 

232 

233 if layout is None: 

234 layout = create_dummy_layout() 

235 

236 if style_transformation is None: 

237 style_transformation = DummyStyleTransformation() 

238 

239 self.style = style 

240 self.style_transformation = style_transformation 

241 

242 # Key bindings. 

243 self.key_bindings = key_bindings 

244 self._default_bindings = load_key_bindings() 

245 self._page_navigation_bindings = load_page_navigation_bindings() 

246 

247 self.layout = layout 

248 self.clipboard = clipboard or InMemoryClipboard() 

249 self.full_screen: bool = full_screen 

250 self._color_depth = color_depth 

251 self.mouse_support = mouse_support 

252 

253 self.paste_mode = paste_mode 

254 self.editing_mode = editing_mode 

255 self.erase_when_done = erase_when_done 

256 self.reverse_vi_search_direction = reverse_vi_search_direction 

257 self.enable_page_navigation_bindings = enable_page_navigation_bindings 

258 self.min_redraw_interval = min_redraw_interval 

259 self.max_render_postpone_time = max_render_postpone_time 

260 self.refresh_interval = refresh_interval 

261 self.terminal_size_polling_interval = terminal_size_polling_interval 

262 

263 self.cursor = to_cursor_shape_config(cursor) 

264 

265 # Events. 

266 self.on_invalidate = Event(self, on_invalidate) 

267 self.on_reset = Event(self, on_reset) 

268 self.before_render = Event(self, before_render) 

269 self.after_render = Event(self, after_render) 

270 

271 # I/O. 

272 session = get_app_session() 

273 self.output = output or session.output 

274 self.input = input or session.input 

275 

276 # List of 'extra' functions to execute before a Application.run. 

277 self.pre_run_callables: list[Callable[[], None]] = [] 

278 

279 self._is_running = False 

280 self.future: Future[_AppResult] | None = None 

281 self.loop: AbstractEventLoop | None = None 

282 self.context: contextvars.Context | None = None 

283 

284 #: Quoted insert. This flag is set if we go into quoted insert mode. 

285 self.quoted_insert = False 

286 

287 #: Vi state. (For Vi key bindings.) 

288 self.vi_state = ViState() 

289 self.emacs_state = EmacsState() 

290 

291 #: When to flush the input (For flushing escape keys.) This is important 

292 #: on terminals that use vt100 input. We can't distinguish the escape 

293 #: key from for instance the left-arrow key, if we don't know what follows 

294 #: after "\x1b". This little timer will consider "\x1b" to be escape if 

295 #: nothing did follow in this time span. 

296 #: This seems to work like the `ttimeoutlen` option in Vim. 

297 self.ttimeoutlen = 0.5 # Seconds. 

298 

299 #: Like Vim's `timeoutlen` option. This can be `None` or a float. For 

300 #: instance, suppose that we have a key binding AB and a second key 

301 #: binding A. If the uses presses A and then waits, we don't handle 

302 #: this binding yet (unless it was marked 'eager'), because we don't 

303 #: know what will follow. This timeout is the maximum amount of time 

304 #: that we wait until we call the handlers anyway. Pass `None` to 

305 #: disable this timeout. 

306 self.timeoutlen = 1.0 

307 

308 #: The `Renderer` instance. 

309 # Make sure that the same stdout is used, when a custom renderer has been passed. 

310 self._merged_style = self._create_merged_style(include_default_pygments_style) 

311 

312 self.renderer = Renderer( 

313 self._merged_style, 

314 self.output, 

315 full_screen=full_screen, 

316 mouse_support=mouse_support, 

317 cpr_not_supported_callback=self.cpr_not_supported_callback, 

318 ) 

319 

320 #: Render counter. This one is increased every time the UI is rendered. 

321 #: It can be used as a key for caching certain information during one 

322 #: rendering. 

323 self.render_counter = 0 

324 

325 # Invalidate flag. When 'True', a repaint has been scheduled. 

326 self._invalidated = False 

327 self._invalidate_events: list[ 

328 Event[object] 

329 ] = [] # Collection of 'invalidate' Event objects. 

330 self._last_redraw_time = 0.0 # Unix timestamp of last redraw. Used when 

331 # `min_redraw_interval` is given. 

332 

333 #: The `InputProcessor` instance. 

334 self.key_processor = KeyProcessor(_CombinedRegistry(self)) 

335 

336 # If `run_in_terminal` was called. This will point to a `Future` what will be 

337 # set at the point when the previous run finishes. 

338 self._running_in_terminal = False 

339 self._running_in_terminal_f: Future[None] | None = None 

340 

341 # Trigger initialize callback. 

342 self.reset() 

343 

344 def _create_merged_style(self, include_default_pygments_style: Filter) -> BaseStyle: 

345 """ 

346 Create a `Style` object that merges the default UI style, the default 

347 pygments style, and the custom user style. 

348 """ 

349 dummy_style = DummyStyle() 

350 pygments_style = default_pygments_style() 

351 

352 @DynamicStyle 

353 def conditional_pygments_style() -> BaseStyle: 

354 if include_default_pygments_style(): 

355 return pygments_style 

356 else: 

357 return dummy_style 

358 

359 return merge_styles( 

360 [ 

361 default_ui_style(), 

362 conditional_pygments_style, 

363 DynamicStyle(lambda: self.style), 

364 ] 

365 ) 

366 

367 @property 

368 def color_depth(self) -> ColorDepth: 

369 """ 

370 The active :class:`.ColorDepth`. 

371 

372 The current value is determined as follows: 

373 

374 - If a color depth was given explicitly to this application, use that 

375 value. 

376 - Otherwise, fall back to the color depth that is reported by the 

377 :class:`.Output` implementation. If the :class:`.Output` class was 

378 created using `output.defaults.create_output`, then this value is 

379 coming from the $PROMPT_TOOLKIT_COLOR_DEPTH environment variable. 

380 """ 

381 depth = self._color_depth 

382 

383 if callable(depth): 

384 depth = depth() 

385 

386 if depth is None: 

387 depth = self.output.get_default_color_depth() 

388 

389 return depth 

390 

391 @property 

392 def current_buffer(self) -> Buffer: 

393 """ 

394 The currently focused :class:`~.Buffer`. 

395 

396 (This returns a dummy :class:`.Buffer` when none of the actual buffers 

397 has the focus. In this case, it's really not practical to check for 

398 `None` values or catch exceptions every time.) 

399 """ 

400 return self.layout.current_buffer or Buffer( 

401 name="dummy-buffer" 

402 ) # Dummy buffer. 

403 

404 @property 

405 def current_search_state(self) -> SearchState: 

406 """ 

407 Return the current :class:`.SearchState`. (The one for the focused 

408 :class:`.BufferControl`.) 

409 """ 

410 ui_control = self.layout.current_control 

411 if isinstance(ui_control, BufferControl): 

412 return ui_control.search_state 

413 else: 

414 return SearchState() # Dummy search state. (Don't return None!) 

415 

416 def reset(self) -> None: 

417 """ 

418 Reset everything, for reading the next input. 

419 """ 

420 # Notice that we don't reset the buffers. (This happens just before 

421 # returning, and when we have multiple buffers, we clearly want the 

422 # content in the other buffers to remain unchanged between several 

423 # calls of `run`. (And the same is true for the focus stack.) 

424 

425 self.exit_style = "" 

426 

427 self._background_tasks: set[Task[None]] = set() 

428 

429 self.renderer.reset() 

430 self.key_processor.reset() 

431 self.layout.reset() 

432 self.vi_state.reset() 

433 self.emacs_state.reset() 

434 

435 # Trigger reset event. 

436 self.on_reset.fire() 

437 

438 # Make sure that we have a 'focusable' widget focused. 

439 # (The `Layout` class can't determine this.) 

440 layout = self.layout 

441 

442 if not layout.current_control.is_focusable(): 

443 for w in layout.find_all_windows(): 

444 if w.content.is_focusable(): 

445 layout.current_window = w 

446 break 

447 

448 def invalidate(self) -> None: 

449 """ 

450 Thread safe way of sending a repaint trigger to the input event loop. 

451 """ 

452 if not self._is_running: 

453 # Don't schedule a redraw if we're not running. 

454 # Otherwise, `get_running_loop()` in `call_soon_threadsafe` can fail. 

455 # See: https://github.com/dbcli/mycli/issues/797 

456 return 

457 

458 # `invalidate()` called if we don't have a loop yet (not running?), or 

459 # after the event loop was closed. 

460 if self.loop is None or self.loop.is_closed(): 

461 return 

462 

463 # Never schedule a second redraw, when a previous one has not yet been 

464 # executed. (This should protect against other threads calling 

465 # 'invalidate' many times, resulting in 100% CPU.) 

466 if self._invalidated: 

467 return 

468 else: 

469 self._invalidated = True 

470 

471 # Trigger event. 

472 self.loop.call_soon_threadsafe(self.on_invalidate.fire) 

473 

474 def redraw() -> None: 

475 self._invalidated = False 

476 self._redraw() 

477 

478 def schedule_redraw() -> None: 

479 call_soon_threadsafe( 

480 redraw, max_postpone_time=self.max_render_postpone_time, loop=self.loop 

481 ) 

482 

483 if self.min_redraw_interval: 

484 # When a minimum redraw interval is set, wait minimum this amount 

485 # of time between redraws. 

486 diff = time.time() - self._last_redraw_time 

487 if diff < self.min_redraw_interval: 

488 

489 async def redraw_in_future() -> None: 

490 await sleep(cast(float, self.min_redraw_interval) - diff) 

491 schedule_redraw() 

492 

493 self.loop.call_soon_threadsafe( 

494 lambda: self.create_background_task(redraw_in_future()) 

495 ) 

496 else: 

497 schedule_redraw() 

498 else: 

499 schedule_redraw() 

500 

501 @property 

502 def invalidated(self) -> bool: 

503 "True when a redraw operation has been scheduled." 

504 return self._invalidated 

505 

506 def _redraw(self, render_as_done: bool = False) -> None: 

507 """ 

508 Render the command line again. (Not thread safe!) (From other threads, 

509 or if unsure, use :meth:`.Application.invalidate`.) 

510 

511 :param render_as_done: make sure to put the cursor after the UI. 

512 """ 

513 

514 def run_in_context() -> None: 

515 # Only draw when no sub application was started. 

516 if self._is_running and not self._running_in_terminal: 

517 if self.min_redraw_interval: 

518 self._last_redraw_time = time.time() 

519 

520 # Render 

521 self.render_counter += 1 

522 self.before_render.fire() 

523 

524 if render_as_done: 

525 if self.erase_when_done: 

526 self.renderer.erase() 

527 else: 

528 # Draw in 'done' state and reset renderer. 

529 self.renderer.render(self, self.layout, is_done=render_as_done) 

530 else: 

531 self.renderer.render(self, self.layout) 

532 

533 self.layout.update_parents_relations() 

534 

535 # Fire render event. 

536 self.after_render.fire() 

537 

538 self._update_invalidate_events() 

539 

540 # NOTE: We want to make sure this Application is the active one. The 

541 # invalidate function is often called from a context where this 

542 # application is not the active one. (Like the 

543 # `PromptSession._auto_refresh_context`). 

544 # We copy the context in case the context was already active, to 

545 # prevent RuntimeErrors. (The rendering is not supposed to change 

546 # any context variables.) 

547 if self.context is not None: 

548 self.context.copy().run(run_in_context) 

549 

550 def _start_auto_refresh_task(self) -> None: 

551 """ 

552 Start a while/true loop in the background for automatic invalidation of 

553 the UI. 

554 """ 

555 if self.refresh_interval is not None and self.refresh_interval != 0: 

556 

557 async def auto_refresh(refresh_interval: float) -> None: 

558 while True: 

559 await sleep(refresh_interval) 

560 self.invalidate() 

561 

562 self.create_background_task(auto_refresh(self.refresh_interval)) 

563 

564 def _update_invalidate_events(self) -> None: 

565 """ 

566 Make sure to attach 'invalidate' handlers to all invalidate events in 

567 the UI. 

568 """ 

569 # Remove all the original event handlers. (Components can be removed 

570 # from the UI.) 

571 for ev in self._invalidate_events: 

572 ev -= self._invalidate_handler 

573 

574 # Gather all new events. 

575 # (All controls are able to invalidate themselves.) 

576 def gather_events() -> Iterable[Event[object]]: 

577 for c in self.layout.find_all_controls(): 

578 yield from c.get_invalidate_events() 

579 

580 self._invalidate_events = list(gather_events()) 

581 

582 for ev in self._invalidate_events: 

583 ev += self._invalidate_handler 

584 

585 def _invalidate_handler(self, sender: object) -> None: 

586 """ 

587 Handler for invalidate events coming from UIControls. 

588 

589 (This handles the difference in signature between event handler and 

590 `self.invalidate`. It also needs to be a method -not a nested 

591 function-, so that we can remove it again .) 

592 """ 

593 self.invalidate() 

594 

595 def _on_resize(self) -> None: 

596 """ 

597 When the window size changes, we erase the current output and request 

598 again the cursor position. When the CPR answer arrives, the output is 

599 drawn again. 

600 """ 

601 # Erase, request position (when cursor is at the start position) 

602 # and redraw again. -- The order is important. 

603 self.renderer.erase(leave_alternate_screen=False) 

604 self._request_absolute_cursor_position() 

605 self._redraw() 

606 

607 def _pre_run(self, pre_run: Callable[[], None] | None = None) -> None: 

608 """ 

609 Called during `run`. 

610 

611 `self.future` should be set to the new future at the point where this 

612 is called in order to avoid data races. `pre_run` can be used to set a 

613 `threading.Event` to synchronize with UI termination code, running in 

614 another thread that would call `Application.exit`. (See the progress 

615 bar code for an example.) 

616 """ 

617 if pre_run: 

618 pre_run() 

619 

620 # Process registered "pre_run_callables" and clear list. 

621 for c in self.pre_run_callables: 

622 c() 

623 del self.pre_run_callables[:] 

624 

625 async def run_async( 

626 self, 

627 pre_run: Callable[[], None] | None = None, 

628 set_exception_handler: bool = True, 

629 handle_sigint: bool = True, 

630 slow_callback_duration: float = 0.5, 

631 ) -> _AppResult: 

632 """ 

633 Run the prompt_toolkit :class:`~prompt_toolkit.application.Application` 

634 until :meth:`~prompt_toolkit.application.Application.exit` has been 

635 called. Return the value that was passed to 

636 :meth:`~prompt_toolkit.application.Application.exit`. 

637 

638 This is the main entry point for a prompt_toolkit 

639 :class:`~prompt_toolkit.application.Application` and usually the only 

640 place where the event loop is actually running. 

641 

642 :param pre_run: Optional callable, which is called right after the 

643 "reset" of the application. 

644 :param set_exception_handler: When set, in case of an exception, go out 

645 of the alternate screen and hide the application, display the 

646 exception, and wait for the user to press ENTER. 

647 :param handle_sigint: Handle SIGINT signal if possible. This will call 

648 the `<sigint>` key binding when a SIGINT is received. (This only 

649 works in the main thread.) 

650 :param slow_callback_duration: Display warnings if code scheduled in 

651 the asyncio event loop takes more time than this. The asyncio 

652 default of `0.1` is sometimes not sufficient on a slow system, 

653 because exceptionally, the drawing of the app, which happens in the 

654 event loop, can take a bit longer from time to time. 

655 """ 

656 assert not self._is_running, "Application is already running." 

657 

658 if not in_main_thread() or sys.platform == "win32": 

659 # Handling signals in other threads is not supported. 

660 # Also on Windows, `add_signal_handler(signal.SIGINT, ...)` raises 

661 # `NotImplementedError`. 

662 # See: https://github.com/prompt-toolkit/python-prompt-toolkit/issues/1553 

663 handle_sigint = False 

664 

665 async def _run_async(f: "asyncio.Future[_AppResult]") -> _AppResult: 

666 self.context = contextvars.copy_context() 

667 

668 # Counter for cancelling 'flush' timeouts. Every time when a key is 

669 # pressed, we start a 'flush' timer for flushing our escape key. But 

670 # when any subsequent input is received, a new timer is started and 

671 # the current timer will be ignored. 

672 flush_task: asyncio.Task[None] | None = None 

673 

674 # Reset. 

675 # (`self.future` needs to be set when `pre_run` is called.) 

676 self.reset() 

677 self._pre_run(pre_run) 

678 

679 # Feed type ahead input first. 

680 self.key_processor.feed_multiple(get_typeahead(self.input)) 

681 self.key_processor.process_keys() 

682 

683 def read_from_input() -> None: 

684 nonlocal flush_task 

685 

686 # Ignore when we aren't running anymore. This callback will 

687 # removed from the loop next time. (It could be that it was 

688 # still in the 'tasks' list of the loop.) 

689 # Except: if we need to process incoming CPRs. 

690 if not self._is_running and not self.renderer.waiting_for_cpr: 

691 return 

692 

693 # Get keys from the input object. 

694 keys = self.input.read_keys() 

695 

696 # Feed to key processor. 

697 self.key_processor.feed_multiple(keys) 

698 self.key_processor.process_keys() 

699 

700 # Quit when the input stream was closed. 

701 if self.input.closed: 

702 if not f.done(): 

703 f.set_exception(EOFError) 

704 else: 

705 # Automatically flush keys. 

706 if flush_task: 

707 flush_task.cancel() 

708 flush_task = self.create_background_task(auto_flush_input()) 

709 

710 async def auto_flush_input() -> None: 

711 # Flush input after timeout. 

712 # (Used for flushing the enter key.) 

713 # This sleep can be cancelled, in that case we won't flush yet. 

714 await sleep(self.ttimeoutlen) 

715 flush_input() 

716 

717 def flush_input() -> None: 

718 if not self.is_done: 

719 # Get keys, and feed to key processor. 

720 keys = self.input.flush_keys() 

721 self.key_processor.feed_multiple(keys) 

722 self.key_processor.process_keys() 

723 

724 if self.input.closed: 

725 f.set_exception(EOFError) 

726 

727 # Enter raw mode, attach input and attach WINCH event handler. 

728 with self.input.raw_mode(), self.input.attach( 

729 read_from_input 

730 ), attach_winch_signal_handler(self._on_resize): 

731 # Draw UI. 

732 self._request_absolute_cursor_position() 

733 self._redraw() 

734 self._start_auto_refresh_task() 

735 

736 self.create_background_task(self._poll_output_size()) 

737 

738 # Wait for UI to finish. 

739 try: 

740 result = await f 

741 finally: 

742 # In any case, when the application finishes. 

743 # (Successful, or because of an error.) 

744 try: 

745 self._redraw(render_as_done=True) 

746 finally: 

747 # _redraw has a good chance to fail if it calls widgets 

748 # with bad code. Make sure to reset the renderer 

749 # anyway. 

750 self.renderer.reset() 

751 

752 # Unset `is_running`, this ensures that possibly 

753 # scheduled draws won't paint during the following 

754 # yield. 

755 self._is_running = False 

756 

757 # Detach event handlers for invalidate events. 

758 # (Important when a UIControl is embedded in multiple 

759 # applications, like ptterm in pymux. An invalidate 

760 # should not trigger a repaint in terminated 

761 # applications.) 

762 for ev in self._invalidate_events: 

763 ev -= self._invalidate_handler 

764 self._invalidate_events = [] 

765 

766 # Wait for CPR responses. 

767 if self.output.responds_to_cpr: 

768 await self.renderer.wait_for_cpr_responses() 

769 

770 # Wait for the run-in-terminals to terminate. 

771 previous_run_in_terminal_f = self._running_in_terminal_f 

772 

773 if previous_run_in_terminal_f: 

774 await previous_run_in_terminal_f 

775 

776 # Store unprocessed input as typeahead for next time. 

777 store_typeahead(self.input, self.key_processor.empty_queue()) 

778 

779 return result 

780 

781 @contextmanager 

782 def get_loop() -> Iterator[AbstractEventLoop]: 

783 loop = get_running_loop() 

784 self.loop = loop 

785 

786 try: 

787 yield loop 

788 finally: 

789 self.loop = None 

790 

791 @contextmanager 

792 def set_is_running() -> Iterator[None]: 

793 self._is_running = True 

794 try: 

795 yield 

796 finally: 

797 self._is_running = False 

798 

799 @contextmanager 

800 def set_handle_sigint(loop: AbstractEventLoop) -> Iterator[None]: 

801 if handle_sigint: 

802 loop.add_signal_handler( 

803 signal.SIGINT, 

804 lambda *_: loop.call_soon_threadsafe( 

805 self.key_processor.send_sigint 

806 ), 

807 ) 

808 try: 

809 yield 

810 finally: 

811 loop.remove_signal_handler(signal.SIGINT) 

812 else: 

813 yield 

814 

815 @contextmanager 

816 def set_exception_handler_ctx(loop: AbstractEventLoop) -> Iterator[None]: 

817 if set_exception_handler: 

818 previous_exc_handler = loop.get_exception_handler() 

819 loop.set_exception_handler(self._handle_exception) 

820 try: 

821 yield 

822 finally: 

823 loop.set_exception_handler(previous_exc_handler) 

824 

825 else: 

826 yield 

827 

828 @contextmanager 

829 def set_callback_duration(loop: AbstractEventLoop) -> Iterator[None]: 

830 # Set slow_callback_duration. 

831 original_slow_callback_duration = loop.slow_callback_duration 

832 loop.slow_callback_duration = slow_callback_duration 

833 try: 

834 yield 

835 finally: 

836 # Reset slow_callback_duration. 

837 loop.slow_callback_duration = original_slow_callback_duration 

838 

839 @contextmanager 

840 def create_future( 

841 loop: AbstractEventLoop, 

842 ) -> Iterator[asyncio.Future[_AppResult]]: 

843 f = loop.create_future() 

844 self.future = f # XXX: make sure to set this before calling '_redraw'. 

845 

846 try: 

847 yield f 

848 finally: 

849 # Also remove the Future again. (This brings the 

850 # application back to its initial state, where it also 

851 # doesn't have a Future.) 

852 self.future = None 

853 

854 with ExitStack() as stack: 

855 stack.enter_context(set_is_running()) 

856 

857 # Make sure to set `_invalidated` to `False` to begin with, 

858 # otherwise we're not going to paint anything. This can happen if 

859 # this application had run before on a different event loop, and a 

860 # paint was scheduled using `call_soon_threadsafe` with 

861 # `max_postpone_time`. 

862 self._invalidated = False 

863 

864 loop = stack.enter_context(get_loop()) 

865 

866 stack.enter_context(set_handle_sigint(loop)) 

867 stack.enter_context(set_exception_handler_ctx(loop)) 

868 stack.enter_context(set_callback_duration(loop)) 

869 stack.enter_context(set_app(self)) 

870 stack.enter_context(self._enable_breakpointhook()) 

871 

872 f = stack.enter_context(create_future(loop)) 

873 

874 try: 

875 return await _run_async(f) 

876 finally: 

877 # Wait for the background tasks to be done. This needs to 

878 # go in the finally! If `_run_async` raises 

879 # `KeyboardInterrupt`, we still want to wait for the 

880 # background tasks. 

881 await self.cancel_and_wait_for_background_tasks() 

882 

883 # The `ExitStack` above is defined in typeshed in a way that it can 

884 # swallow exceptions. Without next line, mypy would think that there's 

885 # a possibility we don't return here. See: 

886 # https://github.com/python/mypy/issues/7726 

887 assert False, "unreachable" 

888 

889 def run( 

890 self, 

891 pre_run: Callable[[], None] | None = None, 

892 set_exception_handler: bool = True, 

893 handle_sigint: bool = True, 

894 in_thread: bool = False, 

895 ) -> _AppResult: 

896 """ 

897 A blocking 'run' call that waits until the UI is finished. 

898 

899 This will start the current asyncio event loop. If no loop is set for 

900 the current thread, then it will create a new loop. If a new loop was 

901 created, this won't close the new loop (if `in_thread=False`). 

902 

903 :param pre_run: Optional callable, which is called right after the 

904 "reset" of the application. 

905 :param set_exception_handler: When set, in case of an exception, go out 

906 of the alternate screen and hide the application, display the 

907 exception, and wait for the user to press ENTER. 

908 :param in_thread: When true, run the application in a background 

909 thread, and block the current thread until the application 

910 terminates. This is useful if we need to be sure the application 

911 won't use the current event loop (asyncio does not support nested 

912 event loops). A new event loop will be created in this background 

913 thread, and that loop will also be closed when the background 

914 thread terminates. When this is used, it's especially important to 

915 make sure that all asyncio background tasks are managed through 

916 `get_appp().create_background_task()`, so that unfinished tasks are 

917 properly cancelled before the event loop is closed. This is used 

918 for instance in ptpython. 

919 :param handle_sigint: Handle SIGINT signal. Call the key binding for 

920 `Keys.SIGINT`. (This only works in the main thread.) 

921 """ 

922 if in_thread: 

923 result: _AppResult 

924 exception: BaseException | None = None 

925 

926 def run_in_thread() -> None: 

927 nonlocal result, exception 

928 try: 

929 result = self.run( 

930 pre_run=pre_run, 

931 set_exception_handler=set_exception_handler, 

932 # Signal handling only works in the main thread. 

933 handle_sigint=False, 

934 ) 

935 except BaseException as e: 

936 exception = e 

937 

938 thread = threading.Thread(target=run_in_thread) 

939 thread.start() 

940 thread.join() 

941 

942 if exception is not None: 

943 raise exception 

944 return result 

945 

946 coro = self.run_async( 

947 pre_run=pre_run, 

948 set_exception_handler=set_exception_handler, 

949 handle_sigint=handle_sigint, 

950 ) 

951 try: 

952 # See whether a loop was installed already. If so, use that. That's 

953 # required for the input hooks to work, they are installed using 

954 # `set_event_loop`. 

955 loop = asyncio.get_event_loop() 

956 except RuntimeError: 

957 # No loop installed. Run like usual. 

958 return asyncio.run(coro) 

959 else: 

960 # Use existing loop. 

961 return loop.run_until_complete(coro) 

962 

963 def _handle_exception( 

964 self, loop: AbstractEventLoop, context: dict[str, Any] 

965 ) -> None: 

966 """ 

967 Handler for event loop exceptions. 

968 This will print the exception, using run_in_terminal. 

969 """ 

970 # For Python 2: we have to get traceback at this point, because 

971 # we're still in the 'except:' block of the event loop where the 

972 # traceback is still available. Moving this code in the 

973 # 'print_exception' coroutine will loose the exception. 

974 tb = get_traceback_from_context(context) 

975 formatted_tb = "".join(format_tb(tb)) 

976 

977 async def in_term() -> None: 

978 async with in_terminal(): 

979 # Print output. Similar to 'loop.default_exception_handler', 

980 # but don't use logger. (This works better on Python 2.) 

981 print("\nUnhandled exception in event loop:") 

982 print(formatted_tb) 

983 print("Exception {}".format(context.get("exception"))) 

984 

985 await _do_wait_for_enter("Press ENTER to continue...") 

986 

987 ensure_future(in_term()) 

988 

989 @contextmanager 

990 def _enable_breakpointhook(self) -> Generator[None, None, None]: 

991 """ 

992 Install our custom breakpointhook for the duration of this context 

993 manager. (We will only install the hook if no other custom hook was 

994 set.) 

995 """ 

996 if sys.version_info >= (3, 7) and sys.breakpointhook == sys.__breakpointhook__: 

997 sys.breakpointhook = self._breakpointhook 

998 

999 try: 

1000 yield 

1001 finally: 

1002 sys.breakpointhook = sys.__breakpointhook__ 

1003 else: 

1004 yield 

1005 

1006 def _breakpointhook(self, *a: object, **kw: object) -> None: 

1007 """ 

1008 Breakpointhook which uses PDB, but ensures that the application is 

1009 hidden and input echoing is restored during each debugger dispatch. 

1010 """ 

1011 app = self 

1012 # Inline import on purpose. We don't want to import pdb, if not needed. 

1013 import pdb 

1014 from types import FrameType 

1015 

1016 TraceDispatch = Callable[[FrameType, str, Any], Any] 

1017 

1018 class CustomPdb(pdb.Pdb): 

1019 def trace_dispatch( 

1020 self, frame: FrameType, event: str, arg: Any 

1021 ) -> TraceDispatch: 

1022 # Hide application. 

1023 app.renderer.erase() 

1024 

1025 # Detach input and dispatch to debugger. 

1026 with app.input.detach(): 

1027 with app.input.cooked_mode(): 

1028 return super().trace_dispatch(frame, event, arg) 

1029 

1030 # Note: we don't render the application again here, because 

1031 # there's a good chance that there's a breakpoint on the next 

1032 # line. This paint/erase cycle would move the PDB prompt back 

1033 # to the middle of the screen. 

1034 

1035 frame = sys._getframe().f_back 

1036 CustomPdb(stdout=sys.__stdout__).set_trace(frame) 

1037 

1038 def create_background_task( 

1039 self, coroutine: Coroutine[Any, Any, None] 

1040 ) -> asyncio.Task[None]: 

1041 """ 

1042 Start a background task (coroutine) for the running application. When 

1043 the `Application` terminates, unfinished background tasks will be 

1044 cancelled. 

1045 

1046 Given that we still support Python versions before 3.11, we can't use 

1047 task groups (and exception groups), because of that, these background 

1048 tasks are not allowed to raise exceptions. If they do, we'll call the 

1049 default exception handler from the event loop. 

1050 

1051 If at some point, we have Python 3.11 as the minimum supported Python 

1052 version, then we can use a `TaskGroup` (with the lifetime of 

1053 `Application.run_async()`, and run run the background tasks in there. 

1054 

1055 This is not threadsafe. 

1056 """ 

1057 loop = self.loop or get_running_loop() 

1058 task: asyncio.Task[None] = loop.create_task(coroutine) 

1059 self._background_tasks.add(task) 

1060 

1061 task.add_done_callback(self._on_background_task_done) 

1062 return task 

1063 

1064 def _on_background_task_done(self, task: asyncio.Task[None]) -> None: 

1065 """ 

1066 Called when a background task completes. Remove it from 

1067 `_background_tasks`, and handle exceptions if any. 

1068 """ 

1069 self._background_tasks.discard(task) 

1070 

1071 if task.cancelled(): 

1072 return 

1073 

1074 exc = task.exception() 

1075 if exc is not None: 

1076 get_running_loop().call_exception_handler( 

1077 { 

1078 "message": f"prompt_toolkit.Application background task {task!r} " 

1079 "raised an unexpected exception.", 

1080 "exception": exc, 

1081 "task": task, 

1082 } 

1083 ) 

1084 

1085 async def cancel_and_wait_for_background_tasks(self) -> None: 

1086 """ 

1087 Cancel all background tasks, and wait for the cancellation to complete. 

1088 If any of the background tasks raised an exception, this will also 

1089 propagate the exception. 

1090 

1091 (If we had nurseries like Trio, this would be the `__aexit__` of a 

1092 nursery.) 

1093 """ 

1094 for task in self._background_tasks: 

1095 task.cancel() 

1096 

1097 # Wait until the cancellation of the background tasks completes. 

1098 # `asyncio.wait()` does not propagate exceptions raised within any of 

1099 # these tasks, which is what we want. Otherwise, we can't distinguish 

1100 # between a `CancelledError` raised in this task because it got 

1101 # cancelled, and a `CancelledError` raised on this `await` checkpoint, 

1102 # because *we* got cancelled during the teardown of the application. 

1103 # (If we get cancelled here, then it's important to not suppress the 

1104 # `CancelledError`, and have it propagate.) 

1105 # NOTE: Currently, if we get cancelled at this point then we can't wait 

1106 # for the cancellation to complete (in the future, we should be 

1107 # using anyio or Python's 3.11 TaskGroup.) 

1108 # Also, if we had exception groups, we could propagate an 

1109 # `ExceptionGroup` if something went wrong here. Right now, we 

1110 # don't propagate exceptions, but have them printed in 

1111 # `_on_background_task_done`. 

1112 if len(self._background_tasks) > 0: 

1113 await asyncio.wait( 

1114 self._background_tasks, timeout=None, return_when=asyncio.ALL_COMPLETED 

1115 ) 

1116 

1117 async def _poll_output_size(self) -> None: 

1118 """ 

1119 Coroutine for polling the terminal dimensions. 

1120 

1121 Useful for situations where `attach_winch_signal_handler` is not sufficient: 

1122 - If we are not running in the main thread. 

1123 - On Windows. 

1124 """ 

1125 size: Size | None = None 

1126 interval = self.terminal_size_polling_interval 

1127 

1128 if interval is None: 

1129 return 

1130 

1131 while True: 

1132 await asyncio.sleep(interval) 

1133 new_size = self.output.get_size() 

1134 

1135 if size is not None and new_size != size: 

1136 self._on_resize() 

1137 size = new_size 

1138 

1139 def cpr_not_supported_callback(self) -> None: 

1140 """ 

1141 Called when we don't receive the cursor position response in time. 

1142 """ 

1143 if not self.output.responds_to_cpr: 

1144 return # We know about this already. 

1145 

1146 def in_terminal() -> None: 

1147 self.output.write( 

1148 "WARNING: your terminal doesn't support cursor position requests (CPR).\r\n" 

1149 ) 

1150 self.output.flush() 

1151 

1152 run_in_terminal(in_terminal) 

1153 

1154 @overload 

1155 def exit(self) -> None: 

1156 "Exit without arguments." 

1157 

1158 @overload 

1159 def exit(self, *, result: _AppResult, style: str = "") -> None: 

1160 "Exit with `_AppResult`." 

1161 

1162 @overload 

1163 def exit( 

1164 self, *, exception: BaseException | type[BaseException], style: str = "" 

1165 ) -> None: 

1166 "Exit with exception." 

1167 

1168 def exit( 

1169 self, 

1170 result: _AppResult | None = None, 

1171 exception: BaseException | type[BaseException] | None = None, 

1172 style: str = "", 

1173 ) -> None: 

1174 """ 

1175 Exit application. 

1176 

1177 .. note:: 

1178 

1179 If `Application.exit` is called before `Application.run()` is 

1180 called, then the `Application` won't exit (because the 

1181 `Application.future` doesn't correspond to the current run). Use a 

1182 `pre_run` hook and an event to synchronize the closing if there's a 

1183 chance this can happen. 

1184 

1185 :param result: Set this result for the application. 

1186 :param exception: Set this exception as the result for an application. For 

1187 a prompt, this is often `EOFError` or `KeyboardInterrupt`. 

1188 :param style: Apply this style on the whole content when quitting, 

1189 often this is 'class:exiting' for a prompt. (Used when 

1190 `erase_when_done` is not set.) 

1191 """ 

1192 assert result is None or exception is None 

1193 

1194 if self.future is None: 

1195 raise Exception("Application is not running. Application.exit() failed.") 

1196 

1197 if self.future.done(): 

1198 raise Exception("Return value already set. Application.exit() failed.") 

1199 

1200 self.exit_style = style 

1201 

1202 if exception is not None: 

1203 self.future.set_exception(exception) 

1204 else: 

1205 self.future.set_result(cast(_AppResult, result)) 

1206 

1207 def _request_absolute_cursor_position(self) -> None: 

1208 """ 

1209 Send CPR request. 

1210 """ 

1211 # Note: only do this if the input queue is not empty, and a return 

1212 # value has not been set. Otherwise, we won't be able to read the 

1213 # response anyway. 

1214 if not self.key_processor.input_queue and not self.is_done: 

1215 self.renderer.request_absolute_cursor_position() 

1216 

1217 async def run_system_command( 

1218 self, 

1219 command: str, 

1220 wait_for_enter: bool = True, 

1221 display_before_text: AnyFormattedText = "", 

1222 wait_text: str = "Press ENTER to continue...", 

1223 ) -> None: 

1224 """ 

1225 Run system command (While hiding the prompt. When finished, all the 

1226 output will scroll above the prompt.) 

1227 

1228 :param command: Shell command to be executed. 

1229 :param wait_for_enter: FWait for the user to press enter, when the 

1230 command is finished. 

1231 :param display_before_text: If given, text to be displayed before the 

1232 command executes. 

1233 :return: A `Future` object. 

1234 """ 

1235 async with in_terminal(): 

1236 # Try to use the same input/output file descriptors as the one, 

1237 # used to run this application. 

1238 try: 

1239 input_fd = self.input.fileno() 

1240 except AttributeError: 

1241 input_fd = sys.stdin.fileno() 

1242 try: 

1243 output_fd = self.output.fileno() 

1244 except AttributeError: 

1245 output_fd = sys.stdout.fileno() 

1246 

1247 # Run sub process. 

1248 def run_command() -> None: 

1249 self.print_text(display_before_text) 

1250 p = Popen(command, shell=True, stdin=input_fd, stdout=output_fd) 

1251 p.wait() 

1252 

1253 await run_in_executor_with_context(run_command) 

1254 

1255 # Wait for the user to press enter. 

1256 if wait_for_enter: 

1257 await _do_wait_for_enter(wait_text) 

1258 

1259 def suspend_to_background(self, suspend_group: bool = True) -> None: 

1260 """ 

1261 (Not thread safe -- to be called from inside the key bindings.) 

1262 Suspend process. 

1263 

1264 :param suspend_group: When true, suspend the whole process group. 

1265 (This is the default, and probably what you want.) 

1266 """ 

1267 # Only suspend when the operating system supports it. 

1268 # (Not on Windows.) 

1269 if _SIGTSTP is not None: 

1270 

1271 def run() -> None: 

1272 signal = cast(int, _SIGTSTP) 

1273 # Send `SIGTSTP` to own process. 

1274 # This will cause it to suspend. 

1275 

1276 # Usually we want the whole process group to be suspended. This 

1277 # handles the case when input is piped from another process. 

1278 if suspend_group: 

1279 os.kill(0, signal) 

1280 else: 

1281 os.kill(os.getpid(), signal) 

1282 

1283 run_in_terminal(run) 

1284 

1285 def print_text( 

1286 self, text: AnyFormattedText, style: BaseStyle | None = None 

1287 ) -> None: 

1288 """ 

1289 Print a list of (style_str, text) tuples to the output. 

1290 (When the UI is running, this method has to be called through 

1291 `run_in_terminal`, otherwise it will destroy the UI.) 

1292 

1293 :param text: List of ``(style_str, text)`` tuples. 

1294 :param style: Style class to use. Defaults to the active style in the CLI. 

1295 """ 

1296 print_formatted_text( 

1297 output=self.output, 

1298 formatted_text=text, 

1299 style=style or self._merged_style, 

1300 color_depth=self.color_depth, 

1301 style_transformation=self.style_transformation, 

1302 ) 

1303 

1304 @property 

1305 def is_running(self) -> bool: 

1306 "`True` when the application is currently active/running." 

1307 return self._is_running 

1308 

1309 @property 

1310 def is_done(self) -> bool: 

1311 if self.future: 

1312 return self.future.done() 

1313 return False 

1314 

1315 def get_used_style_strings(self) -> list[str]: 

1316 """ 

1317 Return a list of used style strings. This is helpful for debugging, and 

1318 for writing a new `Style`. 

1319 """ 

1320 attrs_for_style = self.renderer._attrs_for_style 

1321 

1322 if attrs_for_style: 

1323 return sorted( 

1324 re.sub(r"\s+", " ", style_str).strip() 

1325 for style_str in attrs_for_style.keys() 

1326 ) 

1327 

1328 return [] 

1329 

1330 

1331class _CombinedRegistry(KeyBindingsBase): 

1332 """ 

1333 The `KeyBindings` of key bindings for a `Application`. 

1334 This merges the global key bindings with the one of the current user 

1335 control. 

1336 """ 

1337 

1338 def __init__(self, app: Application[_AppResult]) -> None: 

1339 self.app = app 

1340 self._cache: SimpleCache[ 

1341 tuple[Window, frozenset[UIControl]], KeyBindingsBase 

1342 ] = SimpleCache() 

1343 

1344 @property 

1345 def _version(self) -> Hashable: 

1346 """Not needed - this object is not going to be wrapped in another 

1347 KeyBindings object.""" 

1348 raise NotImplementedError 

1349 

1350 @property 

1351 def bindings(self) -> list[Binding]: 

1352 """Not needed - this object is not going to be wrapped in another 

1353 KeyBindings object.""" 

1354 raise NotImplementedError 

1355 

1356 def _create_key_bindings( 

1357 self, current_window: Window, other_controls: list[UIControl] 

1358 ) -> KeyBindingsBase: 

1359 """ 

1360 Create a `KeyBindings` object that merges the `KeyBindings` from the 

1361 `UIControl` with all the parent controls and the global key bindings. 

1362 """ 

1363 key_bindings = [] 

1364 collected_containers = set() 

1365 

1366 # Collect key bindings from currently focused control and all parent 

1367 # controls. Don't include key bindings of container parent controls. 

1368 container: Container = current_window 

1369 while True: 

1370 collected_containers.add(container) 

1371 kb = container.get_key_bindings() 

1372 if kb is not None: 

1373 key_bindings.append(kb) 

1374 

1375 if container.is_modal(): 

1376 break 

1377 

1378 parent = self.app.layout.get_parent(container) 

1379 if parent is None: 

1380 break 

1381 else: 

1382 container = parent 

1383 

1384 # Include global bindings (starting at the top-model container). 

1385 for c in walk(container): 

1386 if c not in collected_containers: 

1387 kb = c.get_key_bindings() 

1388 if kb is not None: 

1389 key_bindings.append(GlobalOnlyKeyBindings(kb)) 

1390 

1391 # Add App key bindings 

1392 if self.app.key_bindings: 

1393 key_bindings.append(self.app.key_bindings) 

1394 

1395 # Add mouse bindings. 

1396 key_bindings.append( 

1397 ConditionalKeyBindings( 

1398 self.app._page_navigation_bindings, 

1399 self.app.enable_page_navigation_bindings, 

1400 ) 

1401 ) 

1402 key_bindings.append(self.app._default_bindings) 

1403 

1404 # Reverse this list. The current control's key bindings should come 

1405 # last. They need priority. 

1406 key_bindings = key_bindings[::-1] 

1407 

1408 return merge_key_bindings(key_bindings) 

1409 

1410 @property 

1411 def _key_bindings(self) -> KeyBindingsBase: 

1412 current_window = self.app.layout.current_window 

1413 other_controls = list(self.app.layout.find_all_controls()) 

1414 key = current_window, frozenset(other_controls) 

1415 

1416 return self._cache.get( 

1417 key, lambda: self._create_key_bindings(current_window, other_controls) 

1418 ) 

1419 

1420 def get_bindings_for_keys(self, keys: KeysTuple) -> list[Binding]: 

1421 return self._key_bindings.get_bindings_for_keys(keys) 

1422 

1423 def get_bindings_starting_with_keys(self, keys: KeysTuple) -> list[Binding]: 

1424 return self._key_bindings.get_bindings_starting_with_keys(keys) 

1425 

1426 

1427async def _do_wait_for_enter(wait_text: AnyFormattedText) -> None: 

1428 """ 

1429 Create a sub application to wait for the enter key press. 

1430 This has two advantages over using 'input'/'raw_input': 

1431 - This will share the same input/output I/O. 

1432 - This doesn't block the event loop. 

1433 """ 

1434 from prompt_toolkit.shortcuts import PromptSession 

1435 

1436 key_bindings = KeyBindings() 

1437 

1438 @key_bindings.add("enter") 

1439 def _ok(event: E) -> None: 

1440 event.app.exit() 

1441 

1442 @key_bindings.add(Keys.Any) 

1443 def _ignore(event: E) -> None: 

1444 "Disallow typing." 

1445 pass 

1446 

1447 session: PromptSession[None] = PromptSession( 

1448 message=wait_text, key_bindings=key_bindings 

1449 ) 

1450 try: 

1451 await session.app.run_async() 

1452 except KeyboardInterrupt: 

1453 pass # Control-c pressed. Don't propagate this error. 

1454 

1455 

1456@contextmanager 

1457def attach_winch_signal_handler( 

1458 handler: Callable[[], None] 

1459) -> Generator[None, None, None]: 

1460 """ 

1461 Attach the given callback as a WINCH signal handler within the context 

1462 manager. Restore the original signal handler when done. 

1463 

1464 The `Application.run` method will register SIGWINCH, so that it will 

1465 properly repaint when the terminal window resizes. However, using 

1466 `run_in_terminal`, we can temporarily send an application to the 

1467 background, and run an other app in between, which will then overwrite the 

1468 SIGWINCH. This is why it's important to restore the handler when the app 

1469 terminates. 

1470 """ 

1471 # The tricky part here is that signals are registered in the Unix event 

1472 # loop with a wakeup fd, but another application could have registered 

1473 # signals using signal.signal directly. For now, the implementation is 

1474 # hard-coded for the `asyncio.unix_events._UnixSelectorEventLoop`. 

1475 

1476 # No WINCH? Then don't do anything. 

1477 sigwinch = getattr(signal, "SIGWINCH", None) 

1478 if sigwinch is None or not in_main_thread(): 

1479 yield 

1480 return 

1481 

1482 # Keep track of the previous handler. 

1483 # (Only UnixSelectorEventloop has `_signal_handlers`.) 

1484 loop = get_running_loop() 

1485 previous_winch_handler = getattr(loop, "_signal_handlers", {}).get(sigwinch) 

1486 

1487 try: 

1488 loop.add_signal_handler(sigwinch, handler) 

1489 yield 

1490 finally: 

1491 # Restore the previous signal handler. 

1492 loop.remove_signal_handler(sigwinch) 

1493 if previous_winch_handler is not None: 

1494 loop.add_signal_handler( 

1495 sigwinch, 

1496 previous_winch_handler._callback, 

1497 *previous_winch_handler._args, 

1498 )