Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/application/application.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

620 statements  

1from __future__ import annotations 

2 

3import asyncio 

4import contextvars 

5import os 

6import re 

7import signal 

8import sys 

9import threading 

10import time 

11from asyncio import ( 

12 AbstractEventLoop, 

13 Future, 

14 Task, 

15 ensure_future, 

16 get_running_loop, 

17 sleep, 

18) 

19from collections.abc import Callable, Coroutine, Generator, Hashable, Iterable, Iterator 

20from contextlib import ExitStack, contextmanager 

21from subprocess import Popen 

22from traceback import format_tb 

23from typing import ( 

24 Any, 

25 Generic, 

26 TypeVar, 

27 cast, 

28 overload, 

29) 

30 

31from prompt_toolkit.buffer import Buffer 

32from prompt_toolkit.cache import SimpleCache 

33from prompt_toolkit.clipboard import Clipboard, InMemoryClipboard 

34from prompt_toolkit.cursor_shapes import AnyCursorShapeConfig, to_cursor_shape_config 

35from prompt_toolkit.data_structures import Size 

36from prompt_toolkit.enums import EditingMode 

37from prompt_toolkit.eventloop import ( 

38 InputHook, 

39 get_traceback_from_context, 

40 new_eventloop_with_inputhook, 

41 run_in_executor_with_context, 

42) 

43from prompt_toolkit.eventloop.utils import call_soon_threadsafe 

44from prompt_toolkit.filters import Condition, Filter, FilterOrBool, to_filter 

45from prompt_toolkit.formatted_text import AnyFormattedText 

46from prompt_toolkit.input.base import Input 

47from prompt_toolkit.input.typeahead import get_typeahead, store_typeahead 

48from prompt_toolkit.key_binding.bindings.page_navigation import ( 

49 load_page_navigation_bindings, 

50) 

51from prompt_toolkit.key_binding.defaults import load_key_bindings 

52from prompt_toolkit.key_binding.emacs_state import EmacsState 

53from prompt_toolkit.key_binding.key_bindings import ( 

54 Binding, 

55 ConditionalKeyBindings, 

56 GlobalOnlyKeyBindings, 

57 KeyBindings, 

58 KeyBindingsBase, 

59 KeysTuple, 

60 merge_key_bindings, 

61) 

62from prompt_toolkit.key_binding.key_processor import KeyPressEvent, KeyProcessor 

63from prompt_toolkit.key_binding.vi_state import ViState 

64from prompt_toolkit.keys import Keys 

65from prompt_toolkit.layout.containers import Container, Window 

66from prompt_toolkit.layout.controls import BufferControl, UIControl 

67from prompt_toolkit.layout.dummy import create_dummy_layout 

68from prompt_toolkit.layout.layout import Layout, walk 

69from prompt_toolkit.output import ColorDepth, Output 

70from prompt_toolkit.renderer import Renderer, print_formatted_text 

71from prompt_toolkit.search import SearchState 

72from prompt_toolkit.styles import ( 

73 BaseStyle, 

74 DummyStyle, 

75 DummyStyleTransformation, 

76 DynamicStyle, 

77 StyleTransformation, 

78 default_pygments_style, 

79 default_ui_style, 

80 merge_styles, 

81) 

82from prompt_toolkit.utils import Event, in_main_thread 

83 

84from .current import get_app_session, set_app 

85from .run_in_terminal import in_terminal, run_in_terminal 

86 

87__all__ = [ 

88 "Application", 

89] 

90 

91 

92E = KeyPressEvent 

93_AppResult = TypeVar("_AppResult") 

94ApplicationEventHandler = Callable[["Application[_AppResult]"], None] 

95 

96_SIGWINCH = getattr(signal, "SIGWINCH", None) 

97_SIGTSTP = getattr(signal, "SIGTSTP", None) 

98 

99 

100class Application(Generic[_AppResult]): 

101 """ 

102 The main Application class! 

103 This glues everything together. 

104 

105 :param layout: A :class:`~prompt_toolkit.layout.Layout` instance. 

106 :param key_bindings: 

107 :class:`~prompt_toolkit.key_binding.KeyBindingsBase` instance for 

108 the key bindings. 

109 :param clipboard: :class:`~prompt_toolkit.clipboard.Clipboard` to use. 

110 :param full_screen: When True, run the application on the alternate screen buffer. 

111 :param color_depth: Any :class:`~.ColorDepth` value, a callable that 

112 returns a :class:`~.ColorDepth` or `None` for default. 

113 :param erase_when_done: (bool) Clear the application output when it finishes. 

114 :param reverse_vi_search_direction: Normally, in Vi mode, a '/' searches 

115 forward and a '?' searches backward. In Readline mode, this is usually 

116 reversed. 

117 :param min_redraw_interval: Number of seconds to wait between redraws. Use 

118 this for applications where `invalidate` is called a lot. This could cause 

119 a lot of terminal output, which some terminals are not able to process. 

120 

121 `None` means that every `invalidate` will be scheduled right away 

122 (which is usually fine). 

123 

124 When one `invalidate` is called, but a scheduled redraw of a previous 

125 `invalidate` call has not been executed yet, nothing will happen in any 

126 case. 

127 

128 :param max_render_postpone_time: When there is high CPU (a lot of other 

129 scheduled calls), postpone the rendering max x seconds. '0' means: 

130 don't postpone. '.5' means: try to draw at least twice a second. 

131 

132 :param refresh_interval: Automatically invalidate the UI every so many 

133 seconds. When `None` (the default), only invalidate when `invalidate` 

134 has been called. 

135 

136 :param terminal_size_polling_interval: Poll the terminal size every so many 

137 seconds. Useful if the applications runs in a thread other then then 

138 main thread where SIGWINCH can't be handled, or on Windows. 

139 

140 Filters: 

141 

142 :param mouse_support: (:class:`~prompt_toolkit.filters.Filter` or 

143 boolean). When True, enable mouse support. 

144 :param paste_mode: :class:`~prompt_toolkit.filters.Filter` or boolean. 

145 :param editing_mode: :class:`~prompt_toolkit.enums.EditingMode`. 

146 

147 :param enable_page_navigation_bindings: When `True`, enable the page 

148 navigation key bindings. These include both Emacs and Vi bindings like 

149 page-up, page-down and so on to scroll through pages. Mostly useful for 

150 creating an editor or other full screen applications. Probably, you 

151 don't want this for the implementation of a REPL. By default, this is 

152 enabled if `full_screen` is set. 

153 

154 Callbacks (all of these should accept an 

155 :class:`~prompt_toolkit.application.Application` object as input.) 

156 

157 :param on_reset: Called during reset. 

158 :param on_invalidate: Called when the UI has been invalidated. 

159 :param before_render: Called right before rendering. 

160 :param after_render: Called right after rendering. 

161 

162 I/O: 

163 (Note that the preferred way to change the input/output is by creating an 

164 `AppSession` with the required input/output objects. If you need multiple 

165 applications running at the same time, you have to create a separate 

166 `AppSession` using a `with create_app_session():` block. 

167 

168 :param input: :class:`~prompt_toolkit.input.Input` instance. 

169 :param output: :class:`~prompt_toolkit.output.Output` instance. (Probably 

170 Vt100_Output or Win32Output.) 

171 

172 Usage: 

173 

174 app = Application(...) 

175 app.run() 

176 

177 # Or 

178 await app.run_async() 

179 """ 

180 

181 def __init__( 

182 self, 

183 layout: Layout | None = None, 

184 style: BaseStyle | None = None, 

185 include_default_pygments_style: FilterOrBool = True, 

186 style_transformation: StyleTransformation | None = None, 

187 key_bindings: KeyBindingsBase | None = None, 

188 clipboard: Clipboard | None = None, 

189 full_screen: bool = False, 

190 color_depth: (ColorDepth | Callable[[], ColorDepth | None] | None) = None, 

191 mouse_support: FilterOrBool = False, 

192 enable_page_navigation_bindings: None 

193 | (FilterOrBool) = None, # Can be None, True or False. 

194 paste_mode: FilterOrBool = False, 

195 editing_mode: EditingMode = EditingMode.EMACS, 

196 erase_when_done: bool = False, 

197 reverse_vi_search_direction: FilterOrBool = False, 

198 min_redraw_interval: float | int | None = None, 

199 max_render_postpone_time: float | int | None = 0.01, 

200 refresh_interval: float | None = None, 

201 terminal_size_polling_interval: float | None = 0.5, 

202 cursor: AnyCursorShapeConfig = None, 

203 on_reset: ApplicationEventHandler[_AppResult] | None = None, 

204 on_invalidate: ApplicationEventHandler[_AppResult] | None = None, 

205 before_render: ApplicationEventHandler[_AppResult] | None = None, 

206 after_render: ApplicationEventHandler[_AppResult] | None = None, 

207 # I/O. 

208 input: Input | None = None, 

209 output: Output | None = None, 

210 ) -> None: 

211 # If `enable_page_navigation_bindings` is not specified, enable it in 

212 # case of full screen applications only. This can be overridden by the user. 

213 if enable_page_navigation_bindings is None: 

214 enable_page_navigation_bindings = Condition(lambda: self.full_screen) 

215 

216 paste_mode = to_filter(paste_mode) 

217 mouse_support = to_filter(mouse_support) 

218 reverse_vi_search_direction = to_filter(reverse_vi_search_direction) 

219 enable_page_navigation_bindings = to_filter(enable_page_navigation_bindings) 

220 include_default_pygments_style = to_filter(include_default_pygments_style) 

221 

222 if layout is None: 

223 layout = create_dummy_layout() 

224 

225 if style_transformation is None: 

226 style_transformation = DummyStyleTransformation() 

227 

228 self.style = style 

229 self.style_transformation = style_transformation 

230 

231 # Key bindings. 

232 self.key_bindings = key_bindings 

233 self._default_bindings = load_key_bindings() 

234 self._page_navigation_bindings = load_page_navigation_bindings() 

235 

236 self.layout = layout 

237 self.clipboard = clipboard or InMemoryClipboard() 

238 self.full_screen: bool = full_screen 

239 self._color_depth = color_depth 

240 self.mouse_support = mouse_support 

241 

242 self.paste_mode = paste_mode 

243 self.editing_mode = editing_mode 

244 self.erase_when_done = erase_when_done 

245 self.reverse_vi_search_direction = reverse_vi_search_direction 

246 self.enable_page_navigation_bindings = enable_page_navigation_bindings 

247 self.min_redraw_interval = min_redraw_interval 

248 self.max_render_postpone_time = max_render_postpone_time 

249 self.refresh_interval = refresh_interval 

250 self.terminal_size_polling_interval = terminal_size_polling_interval 

251 

252 self.cursor = to_cursor_shape_config(cursor) 

253 

254 # Events. 

255 self.on_invalidate = Event(self, on_invalidate) 

256 self.on_reset = Event(self, on_reset) 

257 self.before_render = Event(self, before_render) 

258 self.after_render = Event(self, after_render) 

259 

260 # I/O. 

261 session = get_app_session() 

262 self.output = output or session.output 

263 self.input = input or session.input 

264 

265 # List of 'extra' functions to execute before a Application.run. 

266 self.pre_run_callables: list[Callable[[], None]] = [] 

267 

268 self._is_running = False 

269 self.future: Future[_AppResult] | None = None 

270 self.loop: AbstractEventLoop | None = None 

271 self._loop_thread: threading.Thread | None = None 

272 self.context: contextvars.Context | None = None 

273 

274 #: Quoted insert. This flag is set if we go into quoted insert mode. 

275 self.quoted_insert = False 

276 

277 #: Vi state. (For Vi key bindings.) 

278 self.vi_state = ViState() 

279 self.emacs_state = EmacsState() 

280 

281 #: When to flush the input (For flushing escape keys.) This is important 

282 #: on terminals that use vt100 input. We can't distinguish the escape 

283 #: key from for instance the left-arrow key, if we don't know what follows 

284 #: after "\x1b". This little timer will consider "\x1b" to be escape if 

285 #: nothing did follow in this time span. 

286 #: This seems to work like the `ttimeoutlen` option in Vim. 

287 self.ttimeoutlen = 0.5 # Seconds. 

288 

289 #: Like Vim's `timeoutlen` option. This can be `None` or a float. For 

290 #: instance, suppose that we have a key binding AB and a second key 

291 #: binding A. If the uses presses A and then waits, we don't handle 

292 #: this binding yet (unless it was marked 'eager'), because we don't 

293 #: know what will follow. This timeout is the maximum amount of time 

294 #: that we wait until we call the handlers anyway. Pass `None` to 

295 #: disable this timeout. 

296 self.timeoutlen = 1.0 

297 

298 #: The `Renderer` instance. 

299 # Make sure that the same stdout is used, when a custom renderer has been passed. 

300 self._merged_style = self._create_merged_style(include_default_pygments_style) 

301 

302 self.renderer = Renderer( 

303 self._merged_style, 

304 self.output, 

305 full_screen=full_screen, 

306 mouse_support=mouse_support, 

307 cpr_not_supported_callback=self.cpr_not_supported_callback, 

308 ) 

309 

310 #: Render counter. This one is increased every time the UI is rendered. 

311 #: It can be used as a key for caching certain information during one 

312 #: rendering. 

313 self.render_counter = 0 

314 

315 # Invalidate flag. When 'True', a repaint has been scheduled. 

316 self._invalidated = False 

317 self._invalidate_events: list[ 

318 Event[object] 

319 ] = [] # Collection of 'invalidate' Event objects. 

320 self._last_redraw_time = 0.0 # Unix timestamp of last redraw. Used when 

321 # `min_redraw_interval` is given. 

322 

323 #: The `InputProcessor` instance. 

324 self.key_processor = KeyProcessor(_CombinedRegistry(self)) 

325 

326 # If `run_in_terminal` was called. This will point to a `Future` what will be 

327 # set at the point when the previous run finishes. 

328 self._running_in_terminal = False 

329 self._running_in_terminal_f: Future[None] | None = None 

330 

331 # Trigger initialize callback. 

332 self.reset() 

333 

334 def _create_merged_style(self, include_default_pygments_style: Filter) -> BaseStyle: 

335 """ 

336 Create a `Style` object that merges the default UI style, the default 

337 pygments style, and the custom user style. 

338 """ 

339 dummy_style = DummyStyle() 

340 pygments_style = default_pygments_style() 

341 

342 @DynamicStyle 

343 def conditional_pygments_style() -> BaseStyle: 

344 if include_default_pygments_style(): 

345 return pygments_style 

346 else: 

347 return dummy_style 

348 

349 return merge_styles( 

350 [ 

351 default_ui_style(), 

352 conditional_pygments_style, 

353 DynamicStyle(lambda: self.style), 

354 ] 

355 ) 

356 

357 @property 

358 def color_depth(self) -> ColorDepth: 

359 """ 

360 The active :class:`.ColorDepth`. 

361 

362 The current value is determined as follows: 

363 

364 - If a color depth was given explicitly to this application, use that 

365 value. 

366 - Otherwise, fall back to the color depth that is reported by the 

367 :class:`.Output` implementation. If the :class:`.Output` class was 

368 created using `output.defaults.create_output`, then this value is 

369 coming from the $PROMPT_TOOLKIT_COLOR_DEPTH environment variable. 

370 """ 

371 depth = self._color_depth 

372 

373 if callable(depth): 

374 depth = depth() 

375 

376 if depth is None: 

377 depth = self.output.get_default_color_depth() 

378 

379 return depth 

380 

381 @property 

382 def current_buffer(self) -> Buffer: 

383 """ 

384 The currently focused :class:`~.Buffer`. 

385 

386 (This returns a dummy :class:`.Buffer` when none of the actual buffers 

387 has the focus. In this case, it's really not practical to check for 

388 `None` values or catch exceptions every time.) 

389 """ 

390 return self.layout.current_buffer or Buffer( 

391 name="dummy-buffer" 

392 ) # Dummy buffer. 

393 

394 @property 

395 def current_search_state(self) -> SearchState: 

396 """ 

397 Return the current :class:`.SearchState`. (The one for the focused 

398 :class:`.BufferControl`.) 

399 """ 

400 ui_control = self.layout.current_control 

401 if isinstance(ui_control, BufferControl): 

402 return ui_control.search_state 

403 else: 

404 return SearchState() # Dummy search state. (Don't return None!) 

405 

406 def reset(self) -> None: 

407 """ 

408 Reset everything, for reading the next input. 

409 """ 

410 # Notice that we don't reset the buffers. (This happens just before 

411 # returning, and when we have multiple buffers, we clearly want the 

412 # content in the other buffers to remain unchanged between several 

413 # calls of `run`. (And the same is true for the focus stack.) 

414 

415 self.exit_style = "" 

416 

417 self._background_tasks: set[Task[None]] = set() 

418 

419 self.renderer.reset() 

420 self.key_processor.reset() 

421 self.layout.reset() 

422 self.vi_state.reset() 

423 self.emacs_state.reset() 

424 

425 # Trigger reset event. 

426 self.on_reset.fire() 

427 

428 # Make sure that we have a 'focusable' widget focused. 

429 # (The `Layout` class can't determine this.) 

430 layout = self.layout 

431 

432 if not layout.current_control.is_focusable(): 

433 for w in layout.find_all_windows(): 

434 if w.content.is_focusable(): 

435 layout.current_window = w 

436 break 

437 

438 def invalidate(self) -> None: 

439 """ 

440 Thread safe way of sending a repaint trigger to the input event loop. 

441 """ 

442 if not self._is_running: 

443 # Don't schedule a redraw if we're not running. 

444 # Otherwise, `get_running_loop()` in `call_soon_threadsafe` can fail. 

445 # See: https://github.com/dbcli/mycli/issues/797 

446 return 

447 

448 # `invalidate()` called if we don't have a loop yet (not running?), or 

449 # after the event loop was closed. 

450 if self.loop is None or self.loop.is_closed(): 

451 return 

452 

453 # Never schedule a second redraw, when a previous one has not yet been 

454 # executed. (This should protect against other threads calling 

455 # 'invalidate' many times, resulting in 100% CPU.) 

456 if self._invalidated: 

457 return 

458 else: 

459 self._invalidated = True 

460 

461 # Trigger event. 

462 self.loop.call_soon_threadsafe(self.on_invalidate.fire) 

463 

464 def redraw() -> None: 

465 self._invalidated = False 

466 self._redraw() 

467 

468 def schedule_redraw() -> None: 

469 call_soon_threadsafe( 

470 redraw, max_postpone_time=self.max_render_postpone_time, loop=self.loop 

471 ) 

472 

473 if self.min_redraw_interval: 

474 # When a minimum redraw interval is set, wait minimum this amount 

475 # of time between redraws. 

476 diff = time.time() - self._last_redraw_time 

477 if diff < self.min_redraw_interval: 

478 

479 async def redraw_in_future() -> None: 

480 await sleep(cast(float, self.min_redraw_interval) - diff) 

481 schedule_redraw() 

482 

483 self.loop.call_soon_threadsafe( 

484 lambda: self.create_background_task(redraw_in_future()) 

485 ) 

486 else: 

487 schedule_redraw() 

488 else: 

489 schedule_redraw() 

490 

491 @property 

492 def invalidated(self) -> bool: 

493 "True when a redraw operation has been scheduled." 

494 return self._invalidated 

495 

496 def _redraw(self, render_as_done: bool = False) -> None: 

497 """ 

498 Render the command line again. (Not thread safe!) (From other threads, 

499 or if unsure, use :meth:`.Application.invalidate`.) 

500 

501 :param render_as_done: make sure to put the cursor after the UI. 

502 """ 

503 

504 def run_in_context() -> None: 

505 # Only draw when no sub application was started. 

506 if self._is_running and not self._running_in_terminal: 

507 if self.min_redraw_interval: 

508 self._last_redraw_time = time.time() 

509 

510 # Render 

511 self.render_counter += 1 

512 self.before_render.fire() 

513 

514 if render_as_done: 

515 if self.erase_when_done: 

516 self.renderer.erase() 

517 else: 

518 # Draw in 'done' state and reset renderer. 

519 self.renderer.render(self, self.layout, is_done=render_as_done) 

520 else: 

521 self.renderer.render(self, self.layout) 

522 

523 self.layout.update_parents_relations() 

524 

525 # Fire render event. 

526 self.after_render.fire() 

527 

528 self._update_invalidate_events() 

529 

530 # NOTE: We want to make sure this Application is the active one. The 

531 # invalidate function is often called from a context where this 

532 # application is not the active one. (Like the 

533 # `PromptSession._auto_refresh_context`). 

534 # We copy the context in case the context was already active, to 

535 # prevent RuntimeErrors. (The rendering is not supposed to change 

536 # any context variables.) 

537 if self.context is not None: 

538 self.context.copy().run(run_in_context) 

539 

540 def _start_auto_refresh_task(self) -> None: 

541 """ 

542 Start a while/true loop in the background for automatic invalidation of 

543 the UI. 

544 """ 

545 if self.refresh_interval is not None and self.refresh_interval != 0: 

546 

547 async def auto_refresh(refresh_interval: float) -> None: 

548 while True: 

549 await sleep(refresh_interval) 

550 self.invalidate() 

551 

552 self.create_background_task(auto_refresh(self.refresh_interval)) 

553 

554 def _update_invalidate_events(self) -> None: 

555 """ 

556 Make sure to attach 'invalidate' handlers to all invalidate events in 

557 the UI. 

558 """ 

559 # Remove all the original event handlers. (Components can be removed 

560 # from the UI.) 

561 for ev in self._invalidate_events: 

562 ev -= self._invalidate_handler 

563 

564 # Gather all new events. 

565 # (All controls are able to invalidate themselves.) 

566 def gather_events() -> Iterable[Event[object]]: 

567 for c in self.layout.find_all_controls(): 

568 yield from c.get_invalidate_events() 

569 

570 self._invalidate_events = list(gather_events()) 

571 

572 for ev in self._invalidate_events: 

573 ev += self._invalidate_handler 

574 

575 def _invalidate_handler(self, sender: object) -> None: 

576 """ 

577 Handler for invalidate events coming from UIControls. 

578 

579 (This handles the difference in signature between event handler and 

580 `self.invalidate`. It also needs to be a method -not a nested 

581 function-, so that we can remove it again .) 

582 """ 

583 self.invalidate() 

584 

585 def _on_resize(self) -> None: 

586 """ 

587 When the window size changes, we erase the current output and request 

588 again the cursor position. When the CPR answer arrives, the output is 

589 drawn again. 

590 """ 

591 # Erase, request position (when cursor is at the start position) 

592 # and redraw again. -- The order is important. 

593 self.renderer.erase(leave_alternate_screen=False) 

594 self._request_absolute_cursor_position() 

595 self._redraw() 

596 

597 def _pre_run(self, pre_run: Callable[[], None] | None = None) -> None: 

598 """ 

599 Called during `run`. 

600 

601 `self.future` should be set to the new future at the point where this 

602 is called in order to avoid data races. `pre_run` can be used to set a 

603 `threading.Event` to synchronize with UI termination code, running in 

604 another thread that would call `Application.exit`. (See the progress 

605 bar code for an example.) 

606 """ 

607 if pre_run: 

608 pre_run() 

609 

610 # Process registered "pre_run_callables" and clear list. 

611 for c in self.pre_run_callables: 

612 c() 

613 del self.pre_run_callables[:] 

614 

615 async def run_async( 

616 self, 

617 pre_run: Callable[[], None] | None = None, 

618 set_exception_handler: bool = True, 

619 handle_sigint: bool = True, 

620 slow_callback_duration: float = 0.5, 

621 ) -> _AppResult: 

622 """ 

623 Run the prompt_toolkit :class:`~prompt_toolkit.application.Application` 

624 until :meth:`~prompt_toolkit.application.Application.exit` has been 

625 called. Return the value that was passed to 

626 :meth:`~prompt_toolkit.application.Application.exit`. 

627 

628 This is the main entry point for a prompt_toolkit 

629 :class:`~prompt_toolkit.application.Application` and usually the only 

630 place where the event loop is actually running. 

631 

632 :param pre_run: Optional callable, which is called right after the 

633 "reset" of the application. 

634 :param set_exception_handler: When set, in case of an exception, go out 

635 of the alternate screen and hide the application, display the 

636 exception, and wait for the user to press ENTER. 

637 :param handle_sigint: Handle SIGINT signal if possible. This will call 

638 the `<sigint>` key binding when a SIGINT is received. (This only 

639 works in the main thread.) 

640 :param slow_callback_duration: Display warnings if code scheduled in 

641 the asyncio event loop takes more time than this. The asyncio 

642 default of `0.1` is sometimes not sufficient on a slow system, 

643 because exceptionally, the drawing of the app, which happens in the 

644 event loop, can take a bit longer from time to time. 

645 """ 

646 assert not self._is_running, "Application is already running." 

647 

648 if not in_main_thread() or sys.platform == "win32": 

649 # Handling signals in other threads is not supported. 

650 # Also on Windows, `add_signal_handler(signal.SIGINT, ...)` raises 

651 # `NotImplementedError`. 

652 # See: https://github.com/prompt-toolkit/python-prompt-toolkit/issues/1553 

653 handle_sigint = False 

654 

655 async def _run_async(f: asyncio.Future[_AppResult]) -> _AppResult: 

656 context = contextvars.copy_context() 

657 self.context = context 

658 

659 # Counter for cancelling 'flush' timeouts. Every time when a key is 

660 # pressed, we start a 'flush' timer for flushing our escape key. But 

661 # when any subsequent input is received, a new timer is started and 

662 # the current timer will be ignored. 

663 flush_task: asyncio.Task[None] | None = None 

664 

665 # Reset. 

666 # (`self.future` needs to be set when `pre_run` is called.) 

667 self.reset() 

668 self._pre_run(pre_run) 

669 

670 # Feed type ahead input first. 

671 self.key_processor.feed_multiple(get_typeahead(self.input)) 

672 self.key_processor.process_keys() 

673 

674 def read_from_input() -> None: 

675 nonlocal flush_task 

676 

677 # Ignore when we aren't running anymore. This callback will 

678 # removed from the loop next time. (It could be that it was 

679 # still in the 'tasks' list of the loop.) 

680 # Except: if we need to process incoming CPRs. 

681 if not self._is_running and not self.renderer.waiting_for_cpr: 

682 return 

683 

684 # Get keys from the input object. 

685 keys = self.input.read_keys() 

686 

687 # Feed to key processor. 

688 self.key_processor.feed_multiple(keys) 

689 self.key_processor.process_keys() 

690 

691 # Quit when the input stream was closed. 

692 if self.input.closed: 

693 if not f.done(): 

694 f.set_exception(EOFError) 

695 else: 

696 # Automatically flush keys. 

697 if flush_task: 

698 flush_task.cancel() 

699 flush_task = self.create_background_task(auto_flush_input()) 

700 

701 def read_from_input_in_context() -> None: 

702 # Ensure that key bindings callbacks are always executed in the 

703 # current context. This is important when key bindings are 

704 # accessing contextvars. (These callbacks are currently being 

705 # called from a different context. Underneath, 

706 # `loop.add_reader` is used to register the stdin FD.) 

707 # (We copy the context to avoid a `RuntimeError` in case the 

708 # context is already active.) 

709 context.copy().run(read_from_input) 

710 

711 async def auto_flush_input() -> None: 

712 # Flush input after timeout. 

713 # (Used for flushing the enter key.) 

714 # This sleep can be cancelled, in that case we won't flush yet. 

715 await sleep(self.ttimeoutlen) 

716 flush_input() 

717 

718 def flush_input() -> None: 

719 if not self.is_done: 

720 # Get keys, and feed to key processor. 

721 keys = self.input.flush_keys() 

722 self.key_processor.feed_multiple(keys) 

723 self.key_processor.process_keys() 

724 

725 if self.input.closed: 

726 f.set_exception(EOFError) 

727 

728 # Enter raw mode, attach input and attach WINCH event handler. 

729 with ( 

730 self.input.raw_mode(), 

731 self.input.attach(read_from_input_in_context), 

732 attach_winch_signal_handler(self._on_resize), 

733 ): 

734 # Draw UI. 

735 self._request_absolute_cursor_position() 

736 self._redraw() 

737 self._start_auto_refresh_task() 

738 

739 self.create_background_task(self._poll_output_size()) 

740 

741 # Wait for UI to finish. 

742 try: 

743 result = await f 

744 finally: 

745 # In any case, when the application finishes. 

746 # (Successful, or because of an error.) 

747 try: 

748 self._redraw(render_as_done=True) 

749 finally: 

750 # _redraw has a good chance to fail if it calls widgets 

751 # with bad code. Make sure to reset the renderer 

752 # anyway. 

753 self.renderer.reset() 

754 

755 # Unset `is_running`, this ensures that possibly 

756 # scheduled draws won't paint during the following 

757 # yield. 

758 self._is_running = False 

759 

760 # Detach event handlers for invalidate events. 

761 # (Important when a UIControl is embedded in multiple 

762 # applications, like ptterm in pymux. An invalidate 

763 # should not trigger a repaint in terminated 

764 # applications.) 

765 for ev in self._invalidate_events: 

766 ev -= self._invalidate_handler 

767 self._invalidate_events = [] 

768 

769 # Wait for CPR responses. 

770 if self.output.responds_to_cpr: 

771 await self.renderer.wait_for_cpr_responses() 

772 

773 # Wait for the run-in-terminals to terminate. 

774 previous_run_in_terminal_f = self._running_in_terminal_f 

775 

776 if previous_run_in_terminal_f: 

777 await previous_run_in_terminal_f 

778 

779 # Store unprocessed input as typeahead for next time. 

780 store_typeahead(self.input, self.key_processor.empty_queue()) 

781 

782 return result 

783 

784 @contextmanager 

785 def set_loop() -> Iterator[AbstractEventLoop]: 

786 loop = get_running_loop() 

787 self.loop = loop 

788 self._loop_thread = threading.current_thread() 

789 

790 try: 

791 yield loop 

792 finally: 

793 self.loop = None 

794 self._loop_thread = None 

795 

796 @contextmanager 

797 def set_is_running() -> Iterator[None]: 

798 self._is_running = True 

799 try: 

800 yield 

801 finally: 

802 self._is_running = False 

803 

804 @contextmanager 

805 def set_handle_sigint(loop: AbstractEventLoop) -> Iterator[None]: 

806 if handle_sigint: 

807 with _restore_sigint_from_ctypes(): 

808 # save sigint handlers (python and os level) 

809 # See: https://github.com/prompt-toolkit/python-prompt-toolkit/issues/1576 

810 loop.add_signal_handler( 

811 signal.SIGINT, 

812 lambda *_: loop.call_soon_threadsafe( 

813 self.key_processor.send_sigint 

814 ), 

815 ) 

816 try: 

817 yield 

818 finally: 

819 loop.remove_signal_handler(signal.SIGINT) 

820 else: 

821 yield 

822 

823 @contextmanager 

824 def set_exception_handler_ctx(loop: AbstractEventLoop) -> Iterator[None]: 

825 if set_exception_handler: 

826 previous_exc_handler = loop.get_exception_handler() 

827 loop.set_exception_handler(self._handle_exception) 

828 try: 

829 yield 

830 finally: 

831 loop.set_exception_handler(previous_exc_handler) 

832 

833 else: 

834 yield 

835 

836 @contextmanager 

837 def set_callback_duration(loop: AbstractEventLoop) -> Iterator[None]: 

838 # Set slow_callback_duration. 

839 original_slow_callback_duration = loop.slow_callback_duration 

840 loop.slow_callback_duration = slow_callback_duration 

841 try: 

842 yield 

843 finally: 

844 # Reset slow_callback_duration. 

845 loop.slow_callback_duration = original_slow_callback_duration 

846 

847 @contextmanager 

848 def create_future( 

849 loop: AbstractEventLoop, 

850 ) -> Iterator[asyncio.Future[_AppResult]]: 

851 f = loop.create_future() 

852 self.future = f # XXX: make sure to set this before calling '_redraw'. 

853 

854 try: 

855 yield f 

856 finally: 

857 # Also remove the Future again. (This brings the 

858 # application back to its initial state, where it also 

859 # doesn't have a Future.) 

860 self.future = None 

861 

862 with ExitStack() as stack: 

863 stack.enter_context(set_is_running()) 

864 

865 # Make sure to set `_invalidated` to `False` to begin with, 

866 # otherwise we're not going to paint anything. This can happen if 

867 # this application had run before on a different event loop, and a 

868 # paint was scheduled using `call_soon_threadsafe` with 

869 # `max_postpone_time`. 

870 self._invalidated = False 

871 

872 loop = stack.enter_context(set_loop()) 

873 

874 stack.enter_context(set_handle_sigint(loop)) 

875 stack.enter_context(set_exception_handler_ctx(loop)) 

876 stack.enter_context(set_callback_duration(loop)) 

877 stack.enter_context(set_app(self)) 

878 stack.enter_context(self._enable_breakpointhook()) 

879 

880 f = stack.enter_context(create_future(loop)) 

881 

882 try: 

883 return await _run_async(f) 

884 finally: 

885 # Wait for the background tasks to be done. This needs to 

886 # go in the finally! If `_run_async` raises 

887 # `KeyboardInterrupt`, we still want to wait for the 

888 # background tasks. 

889 await self.cancel_and_wait_for_background_tasks() 

890 

891 # The `ExitStack` above is defined in typeshed in a way that it can 

892 # swallow exceptions. Without next line, mypy would think that there's 

893 # a possibility we don't return here. See: 

894 # https://github.com/python/mypy/issues/7726 

895 assert False, "unreachable" 

896 

897 def run( 

898 self, 

899 pre_run: Callable[[], None] | None = None, 

900 set_exception_handler: bool = True, 

901 handle_sigint: bool = True, 

902 in_thread: bool = False, 

903 inputhook: InputHook | None = None, 

904 ) -> _AppResult: 

905 """ 

906 A blocking 'run' call that waits until the UI is finished. 

907 

908 This will run the application in a fresh asyncio event loop. 

909 

910 :param pre_run: Optional callable, which is called right after the 

911 "reset" of the application. 

912 :param set_exception_handler: When set, in case of an exception, go out 

913 of the alternate screen and hide the application, display the 

914 exception, and wait for the user to press ENTER. 

915 :param in_thread: When true, run the application in a background 

916 thread, and block the current thread until the application 

917 terminates. This is useful if we need to be sure the application 

918 won't use the current event loop (asyncio does not support nested 

919 event loops). A new event loop will be created in this background 

920 thread, and that loop will also be closed when the background 

921 thread terminates. When this is used, it's especially important to 

922 make sure that all asyncio background tasks are managed through 

923 `get_app().create_background_task()`, so that unfinished tasks are 

924 properly cancelled before the event loop is closed. This is used 

925 for instance in ptpython. 

926 :param handle_sigint: Handle SIGINT signal. Call the key binding for 

927 `Keys.SIGINT`. (This only works in the main thread.) 

928 """ 

929 if in_thread: 

930 result: _AppResult 

931 exception: BaseException | None = None 

932 

933 def run_in_thread() -> None: 

934 nonlocal result, exception 

935 try: 

936 result = self.run( 

937 pre_run=pre_run, 

938 set_exception_handler=set_exception_handler, 

939 # Signal handling only works in the main thread. 

940 handle_sigint=False, 

941 inputhook=inputhook, 

942 ) 

943 except BaseException as e: 

944 exception = e 

945 

946 thread = threading.Thread(target=run_in_thread) 

947 thread.start() 

948 thread.join() 

949 

950 if exception is not None: 

951 raise exception 

952 return result 

953 

954 coro = self.run_async( 

955 pre_run=pre_run, 

956 set_exception_handler=set_exception_handler, 

957 handle_sigint=handle_sigint, 

958 ) 

959 

960 def _called_from_ipython() -> bool: 

961 try: 

962 return ( 

963 sys.modules["IPython"].version_info < (8, 18, 0, "") 

964 and "IPython/terminal/interactiveshell.py" 

965 in sys._getframe(3).f_code.co_filename 

966 ) 

967 except BaseException: 

968 return False 

969 

970 if inputhook is not None: 

971 # Create new event loop with given input hook and run the app. 

972 # In Python 3.12, we can use asyncio.run(loop_factory=...) 

973 # For now, use `run_until_complete()`. 

974 loop = new_eventloop_with_inputhook(inputhook) 

975 result = loop.run_until_complete(coro) 

976 loop.run_until_complete(loop.shutdown_asyncgens()) 

977 loop.close() 

978 return result 

979 

980 elif _called_from_ipython(): 

981 # workaround to make input hooks work for IPython until 

982 # https://github.com/ipython/ipython/pull/14241 is merged. 

983 # IPython was setting the input hook by installing an event loop 

984 # previously. 

985 try: 

986 # See whether a loop was installed already. If so, use that. 

987 # That's required for the input hooks to work, they are 

988 # installed using `set_event_loop`. 

989 loop = asyncio.get_event_loop() 

990 except RuntimeError: 

991 # No loop installed. Run like usual. 

992 return asyncio.run(coro) 

993 else: 

994 # Use existing loop. 

995 return loop.run_until_complete(coro) 

996 

997 else: 

998 # No loop installed. Run like usual. 

999 return asyncio.run(coro) 

1000 

1001 def _handle_exception( 

1002 self, loop: AbstractEventLoop, context: dict[str, Any] 

1003 ) -> None: 

1004 """ 

1005 Handler for event loop exceptions. 

1006 This will print the exception, using run_in_terminal. 

1007 """ 

1008 # For Python 2: we have to get traceback at this point, because 

1009 # we're still in the 'except:' block of the event loop where the 

1010 # traceback is still available. Moving this code in the 

1011 # 'print_exception' coroutine will loose the exception. 

1012 tb = get_traceback_from_context(context) 

1013 formatted_tb = "".join(format_tb(tb)) 

1014 

1015 async def in_term() -> None: 

1016 async with in_terminal(): 

1017 # Print output. Similar to 'loop.default_exception_handler', 

1018 # but don't use logger. (This works better on Python 2.) 

1019 print("\nUnhandled exception in event loop:") 

1020 print(formatted_tb) 

1021 print("Exception {}".format(context.get("exception"))) 

1022 

1023 await _do_wait_for_enter("Press ENTER to continue...") 

1024 

1025 ensure_future(in_term()) 

1026 

1027 @contextmanager 

1028 def _enable_breakpointhook(self) -> Generator[None, None, None]: 

1029 """ 

1030 Install our custom breakpointhook for the duration of this context 

1031 manager. (We will only install the hook if no other custom hook was 

1032 set.) 

1033 """ 

1034 if sys.breakpointhook == sys.__breakpointhook__: 

1035 sys.breakpointhook = self._breakpointhook 

1036 

1037 try: 

1038 yield 

1039 finally: 

1040 sys.breakpointhook = sys.__breakpointhook__ 

1041 else: 

1042 yield 

1043 

1044 def _breakpointhook(self, *a: object, **kw: object) -> None: 

1045 """ 

1046 Breakpointhook which uses PDB, but ensures that the application is 

1047 hidden and input echoing is restored during each debugger dispatch. 

1048 

1049 This can be called from any thread. In any case, the application's 

1050 event loop will be blocked while the PDB input is displayed. The event 

1051 will continue after leaving the debugger. 

1052 """ 

1053 app = self 

1054 # Inline import on purpose. We don't want to import pdb, if not needed. 

1055 import pdb 

1056 from types import FrameType 

1057 

1058 TraceDispatch = Callable[[FrameType, str, Any], Any] 

1059 

1060 @contextmanager 

1061 def hide_app_from_eventloop_thread() -> Generator[None, None, None]: 

1062 """Stop application if `__breakpointhook__` is called from within 

1063 the App's event loop.""" 

1064 # Hide application. 

1065 app.renderer.erase() 

1066 

1067 # Detach input and dispatch to debugger. 

1068 with app.input.detach(): 

1069 with app.input.cooked_mode(): 

1070 yield 

1071 

1072 # Note: we don't render the application again here, because 

1073 # there's a good chance that there's a breakpoint on the next 

1074 # line. This paint/erase cycle would move the PDB prompt back 

1075 # to the middle of the screen. 

1076 

1077 @contextmanager 

1078 def hide_app_from_other_thread() -> Generator[None, None, None]: 

1079 """Stop application if `__breakpointhook__` is called from a 

1080 thread other than the App's event loop.""" 

1081 ready = threading.Event() 

1082 done = threading.Event() 

1083 

1084 async def in_loop() -> None: 

1085 # from .run_in_terminal import in_terminal 

1086 # async with in_terminal(): 

1087 # ready.set() 

1088 # await asyncio.get_running_loop().run_in_executor(None, done.wait) 

1089 # return 

1090 

1091 # Hide application. 

1092 app.renderer.erase() 

1093 

1094 # Detach input and dispatch to debugger. 

1095 with app.input.detach(): 

1096 with app.input.cooked_mode(): 

1097 ready.set() 

1098 # Here we block the App's event loop thread until the 

1099 # debugger resumes. We could have used `with 

1100 # run_in_terminal.in_terminal():` like the commented 

1101 # code above, but it seems to work better if we 

1102 # completely stop the main event loop while debugging. 

1103 done.wait() 

1104 

1105 self.create_background_task(in_loop()) 

1106 ready.wait() 

1107 try: 

1108 yield 

1109 finally: 

1110 done.set() 

1111 

1112 class CustomPdb(pdb.Pdb): 

1113 def trace_dispatch( 

1114 self, frame: FrameType, event: str, arg: Any 

1115 ) -> TraceDispatch: 

1116 if app._loop_thread is None: 

1117 return super().trace_dispatch(frame, event, arg) 

1118 

1119 if app._loop_thread == threading.current_thread(): 

1120 with hide_app_from_eventloop_thread(): 

1121 return super().trace_dispatch(frame, event, arg) 

1122 

1123 with hide_app_from_other_thread(): 

1124 return super().trace_dispatch(frame, event, arg) 

1125 

1126 frame = sys._getframe().f_back 

1127 CustomPdb(stdout=sys.__stdout__).set_trace(frame) 

1128 

1129 def create_background_task( 

1130 self, coroutine: Coroutine[Any, Any, None] 

1131 ) -> asyncio.Task[None]: 

1132 """ 

1133 Start a background task (coroutine) for the running application. When 

1134 the `Application` terminates, unfinished background tasks will be 

1135 cancelled. 

1136 

1137 Given that we still support Python versions before 3.11, we can't use 

1138 task groups (and exception groups), because of that, these background 

1139 tasks are not allowed to raise exceptions. If they do, we'll call the 

1140 default exception handler from the event loop. 

1141 

1142 If at some point, we have Python 3.11 as the minimum supported Python 

1143 version, then we can use a `TaskGroup` (with the lifetime of 

1144 `Application.run_async()`, and run run the background tasks in there. 

1145 

1146 This is not threadsafe. 

1147 """ 

1148 loop = self.loop or get_running_loop() 

1149 task: asyncio.Task[None] = loop.create_task(coroutine) 

1150 self._background_tasks.add(task) 

1151 

1152 task.add_done_callback(self._on_background_task_done) 

1153 return task 

1154 

1155 def _on_background_task_done(self, task: asyncio.Task[None]) -> None: 

1156 """ 

1157 Called when a background task completes. Remove it from 

1158 `_background_tasks`, and handle exceptions if any. 

1159 """ 

1160 self._background_tasks.discard(task) 

1161 

1162 if task.cancelled(): 

1163 return 

1164 

1165 exc = task.exception() 

1166 if exc is not None: 

1167 get_running_loop().call_exception_handler( 

1168 { 

1169 "message": f"prompt_toolkit.Application background task {task!r} " 

1170 "raised an unexpected exception.", 

1171 "exception": exc, 

1172 "task": task, 

1173 } 

1174 ) 

1175 

1176 async def cancel_and_wait_for_background_tasks(self) -> None: 

1177 """ 

1178 Cancel all background tasks, and wait for the cancellation to complete. 

1179 If any of the background tasks raised an exception, this will also 

1180 propagate the exception. 

1181 

1182 (If we had nurseries like Trio, this would be the `__aexit__` of a 

1183 nursery.) 

1184 """ 

1185 for task in self._background_tasks: 

1186 task.cancel() 

1187 

1188 # Wait until the cancellation of the background tasks completes. 

1189 # `asyncio.wait()` does not propagate exceptions raised within any of 

1190 # these tasks, which is what we want. Otherwise, we can't distinguish 

1191 # between a `CancelledError` raised in this task because it got 

1192 # cancelled, and a `CancelledError` raised on this `await` checkpoint, 

1193 # because *we* got cancelled during the teardown of the application. 

1194 # (If we get cancelled here, then it's important to not suppress the 

1195 # `CancelledError`, and have it propagate.) 

1196 # NOTE: Currently, if we get cancelled at this point then we can't wait 

1197 # for the cancellation to complete (in the future, we should be 

1198 # using anyio or Python's 3.11 TaskGroup.) 

1199 # Also, if we had exception groups, we could propagate an 

1200 # `ExceptionGroup` if something went wrong here. Right now, we 

1201 # don't propagate exceptions, but have them printed in 

1202 # `_on_background_task_done`. 

1203 if len(self._background_tasks) > 0: 

1204 await asyncio.wait( 

1205 self._background_tasks, timeout=None, return_when=asyncio.ALL_COMPLETED 

1206 ) 

1207 

1208 async def _poll_output_size(self) -> None: 

1209 """ 

1210 Coroutine for polling the terminal dimensions. 

1211 

1212 Useful for situations where `attach_winch_signal_handler` is not sufficient: 

1213 - If we are not running in the main thread. 

1214 - On Windows. 

1215 """ 

1216 size: Size | None = None 

1217 interval = self.terminal_size_polling_interval 

1218 

1219 if interval is None: 

1220 return 

1221 

1222 while True: 

1223 await asyncio.sleep(interval) 

1224 new_size = self.output.get_size() 

1225 

1226 if size is not None and new_size != size: 

1227 self._on_resize() 

1228 size = new_size 

1229 

1230 def cpr_not_supported_callback(self) -> None: 

1231 """ 

1232 Called when we don't receive the cursor position response in time. 

1233 """ 

1234 if not self.output.responds_to_cpr: 

1235 return # We know about this already. 

1236 

1237 def in_terminal() -> None: 

1238 self.output.write( 

1239 "WARNING: your terminal doesn't support cursor position requests (CPR).\r\n" 

1240 ) 

1241 self.output.flush() 

1242 

1243 run_in_terminal(in_terminal) 

1244 

1245 @overload 

1246 def exit(self) -> None: 

1247 "Exit without arguments." 

1248 

1249 @overload 

1250 def exit(self, *, result: _AppResult, style: str = "") -> None: 

1251 "Exit with `_AppResult`." 

1252 

1253 @overload 

1254 def exit( 

1255 self, *, exception: BaseException | type[BaseException], style: str = "" 

1256 ) -> None: 

1257 "Exit with exception." 

1258 

1259 def exit( 

1260 self, 

1261 result: _AppResult | None = None, 

1262 exception: BaseException | type[BaseException] | None = None, 

1263 style: str = "", 

1264 ) -> None: 

1265 """ 

1266 Exit application. 

1267 

1268 .. note:: 

1269 

1270 If `Application.exit` is called before `Application.run()` is 

1271 called, then the `Application` won't exit (because the 

1272 `Application.future` doesn't correspond to the current run). Use a 

1273 `pre_run` hook and an event to synchronize the closing if there's a 

1274 chance this can happen. 

1275 

1276 :param result: Set this result for the application. 

1277 :param exception: Set this exception as the result for an application. For 

1278 a prompt, this is often `EOFError` or `KeyboardInterrupt`. 

1279 :param style: Apply this style on the whole content when quitting, 

1280 often this is 'class:exiting' for a prompt. (Used when 

1281 `erase_when_done` is not set.) 

1282 """ 

1283 assert result is None or exception is None 

1284 

1285 if self.future is None: 

1286 raise Exception("Application is not running. Application.exit() failed.") 

1287 

1288 if self.future.done(): 

1289 raise Exception("Return value already set. Application.exit() failed.") 

1290 

1291 self.exit_style = style 

1292 

1293 if exception is not None: 

1294 self.future.set_exception(exception) 

1295 else: 

1296 self.future.set_result(cast(_AppResult, result)) 

1297 

1298 def _request_absolute_cursor_position(self) -> None: 

1299 """ 

1300 Send CPR request. 

1301 """ 

1302 # Note: only do this if the input queue is not empty, and a return 

1303 # value has not been set. Otherwise, we won't be able to read the 

1304 # response anyway. 

1305 if not self.key_processor.input_queue and not self.is_done: 

1306 self.renderer.request_absolute_cursor_position() 

1307 

1308 async def run_system_command( 

1309 self, 

1310 command: str, 

1311 wait_for_enter: bool = True, 

1312 display_before_text: AnyFormattedText = "", 

1313 wait_text: str = "Press ENTER to continue...", 

1314 ) -> None: 

1315 """ 

1316 Run system command (While hiding the prompt. When finished, all the 

1317 output will scroll above the prompt.) 

1318 

1319 :param command: Shell command to be executed. 

1320 :param wait_for_enter: FWait for the user to press enter, when the 

1321 command is finished. 

1322 :param display_before_text: If given, text to be displayed before the 

1323 command executes. 

1324 :return: A `Future` object. 

1325 """ 

1326 async with in_terminal(): 

1327 # Try to use the same input/output file descriptors as the one, 

1328 # used to run this application. 

1329 try: 

1330 input_fd = self.input.fileno() 

1331 except AttributeError: 

1332 input_fd = sys.stdin.fileno() 

1333 try: 

1334 output_fd = self.output.fileno() 

1335 except AttributeError: 

1336 output_fd = sys.stdout.fileno() 

1337 

1338 # Run sub process. 

1339 def run_command() -> None: 

1340 self.print_text(display_before_text) 

1341 p = Popen(command, shell=True, stdin=input_fd, stdout=output_fd) 

1342 p.wait() 

1343 

1344 await run_in_executor_with_context(run_command) 

1345 

1346 # Wait for the user to press enter. 

1347 if wait_for_enter: 

1348 await _do_wait_for_enter(wait_text) 

1349 

1350 def suspend_to_background(self, suspend_group: bool = True) -> None: 

1351 """ 

1352 (Not thread safe -- to be called from inside the key bindings.) 

1353 Suspend process. 

1354 

1355 :param suspend_group: When true, suspend the whole process group. 

1356 (This is the default, and probably what you want.) 

1357 """ 

1358 # Only suspend when the operating system supports it. 

1359 # (Not on Windows.) 

1360 if _SIGTSTP is not None: 

1361 

1362 def run() -> None: 

1363 signal = cast(int, _SIGTSTP) 

1364 # Send `SIGTSTP` to own process. 

1365 # This will cause it to suspend. 

1366 

1367 # Usually we want the whole process group to be suspended. This 

1368 # handles the case when input is piped from another process. 

1369 if suspend_group: 

1370 os.kill(0, signal) 

1371 else: 

1372 os.kill(os.getpid(), signal) 

1373 

1374 run_in_terminal(run) 

1375 

1376 def print_text( 

1377 self, text: AnyFormattedText, style: BaseStyle | None = None 

1378 ) -> None: 

1379 """ 

1380 Print a list of (style_str, text) tuples to the output. 

1381 (When the UI is running, this method has to be called through 

1382 `run_in_terminal`, otherwise it will destroy the UI.) 

1383 

1384 :param text: List of ``(style_str, text)`` tuples. 

1385 :param style: Style class to use. Defaults to the active style in the CLI. 

1386 """ 

1387 print_formatted_text( 

1388 output=self.output, 

1389 formatted_text=text, 

1390 style=style or self._merged_style, 

1391 color_depth=self.color_depth, 

1392 style_transformation=self.style_transformation, 

1393 ) 

1394 

1395 @property 

1396 def is_running(self) -> bool: 

1397 "`True` when the application is currently active/running." 

1398 return self._is_running 

1399 

1400 @property 

1401 def is_done(self) -> bool: 

1402 if self.future: 

1403 return self.future.done() 

1404 return False 

1405 

1406 def get_used_style_strings(self) -> list[str]: 

1407 """ 

1408 Return a list of used style strings. This is helpful for debugging, and 

1409 for writing a new `Style`. 

1410 """ 

1411 attrs_for_style = self.renderer._attrs_for_style 

1412 

1413 if attrs_for_style: 

1414 return sorted( 

1415 re.sub(r"\s+", " ", style_str).strip() 

1416 for style_str in attrs_for_style.keys() 

1417 ) 

1418 

1419 return [] 

1420 

1421 

1422class _CombinedRegistry(KeyBindingsBase): 

1423 """ 

1424 The `KeyBindings` of key bindings for a `Application`. 

1425 This merges the global key bindings with the one of the current user 

1426 control. 

1427 """ 

1428 

1429 def __init__(self, app: Application[_AppResult]) -> None: 

1430 self.app = app 

1431 self._cache: SimpleCache[ 

1432 tuple[Window, frozenset[UIControl]], KeyBindingsBase 

1433 ] = SimpleCache() 

1434 

1435 @property 

1436 def _version(self) -> Hashable: 

1437 """Not needed - this object is not going to be wrapped in another 

1438 KeyBindings object.""" 

1439 raise NotImplementedError 

1440 

1441 @property 

1442 def bindings(self) -> list[Binding]: 

1443 """Not needed - this object is not going to be wrapped in another 

1444 KeyBindings object.""" 

1445 raise NotImplementedError 

1446 

1447 def _create_key_bindings( 

1448 self, current_window: Window, other_controls: list[UIControl] 

1449 ) -> KeyBindingsBase: 

1450 """ 

1451 Create a `KeyBindings` object that merges the `KeyBindings` from the 

1452 `UIControl` with all the parent controls and the global key bindings. 

1453 """ 

1454 key_bindings = [] 

1455 collected_containers = set() 

1456 

1457 # Collect key bindings from currently focused control and all parent 

1458 # controls. Don't include key bindings of container parent controls. 

1459 container: Container = current_window 

1460 while True: 

1461 collected_containers.add(container) 

1462 kb = container.get_key_bindings() 

1463 if kb is not None: 

1464 key_bindings.append(kb) 

1465 

1466 if container.is_modal(): 

1467 break 

1468 

1469 parent = self.app.layout.get_parent(container) 

1470 if parent is None: 

1471 break 

1472 else: 

1473 container = parent 

1474 

1475 # Include global bindings (starting at the top-model container). 

1476 for c in walk(container): 

1477 if c not in collected_containers: 

1478 kb = c.get_key_bindings() 

1479 if kb is not None: 

1480 key_bindings.append(GlobalOnlyKeyBindings(kb)) 

1481 

1482 # Add App key bindings 

1483 if self.app.key_bindings: 

1484 key_bindings.append(self.app.key_bindings) 

1485 

1486 # Add mouse bindings. 

1487 key_bindings.append( 

1488 ConditionalKeyBindings( 

1489 self.app._page_navigation_bindings, 

1490 self.app.enable_page_navigation_bindings, 

1491 ) 

1492 ) 

1493 key_bindings.append(self.app._default_bindings) 

1494 

1495 # Reverse this list. The current control's key bindings should come 

1496 # last. They need priority. 

1497 key_bindings = key_bindings[::-1] 

1498 

1499 return merge_key_bindings(key_bindings) 

1500 

1501 @property 

1502 def _key_bindings(self) -> KeyBindingsBase: 

1503 current_window = self.app.layout.current_window 

1504 other_controls = list(self.app.layout.find_all_controls()) 

1505 key = current_window, frozenset(other_controls) 

1506 

1507 return self._cache.get( 

1508 key, lambda: self._create_key_bindings(current_window, other_controls) 

1509 ) 

1510 

1511 def get_bindings_for_keys(self, keys: KeysTuple) -> list[Binding]: 

1512 return self._key_bindings.get_bindings_for_keys(keys) 

1513 

1514 def get_bindings_starting_with_keys(self, keys: KeysTuple) -> list[Binding]: 

1515 return self._key_bindings.get_bindings_starting_with_keys(keys) 

1516 

1517 

1518async def _do_wait_for_enter(wait_text: AnyFormattedText) -> None: 

1519 """ 

1520 Create a sub application to wait for the enter key press. 

1521 This has two advantages over using 'input'/'raw_input': 

1522 - This will share the same input/output I/O. 

1523 - This doesn't block the event loop. 

1524 """ 

1525 from prompt_toolkit.shortcuts import PromptSession 

1526 

1527 key_bindings = KeyBindings() 

1528 

1529 @key_bindings.add("enter") 

1530 def _ok(event: E) -> None: 

1531 event.app.exit() 

1532 

1533 @key_bindings.add(Keys.Any) 

1534 def _ignore(event: E) -> None: 

1535 "Disallow typing." 

1536 pass 

1537 

1538 session: PromptSession[None] = PromptSession( 

1539 message=wait_text, key_bindings=key_bindings 

1540 ) 

1541 try: 

1542 await session.app.run_async() 

1543 except KeyboardInterrupt: 

1544 pass # Control-c pressed. Don't propagate this error. 

1545 

1546 

1547@contextmanager 

1548def attach_winch_signal_handler( 

1549 handler: Callable[[], None], 

1550) -> Generator[None, None, None]: 

1551 """ 

1552 Attach the given callback as a WINCH signal handler within the context 

1553 manager. Restore the original signal handler when done. 

1554 

1555 The `Application.run` method will register SIGWINCH, so that it will 

1556 properly repaint when the terminal window resizes. However, using 

1557 `run_in_terminal`, we can temporarily send an application to the 

1558 background, and run an other app in between, which will then overwrite the 

1559 SIGWINCH. This is why it's important to restore the handler when the app 

1560 terminates. 

1561 """ 

1562 # The tricky part here is that signals are registered in the Unix event 

1563 # loop with a wakeup fd, but another application could have registered 

1564 # signals using signal.signal directly. For now, the implementation is 

1565 # hard-coded for the `asyncio.unix_events._UnixSelectorEventLoop`. 

1566 

1567 # No WINCH? Then don't do anything. 

1568 sigwinch = getattr(signal, "SIGWINCH", None) 

1569 if sigwinch is None or not in_main_thread(): 

1570 yield 

1571 return 

1572 

1573 # Keep track of the previous handler. 

1574 # (Only UnixSelectorEventloop has `_signal_handlers`.) 

1575 loop = get_running_loop() 

1576 previous_winch_handler = getattr(loop, "_signal_handlers", {}).get(sigwinch) 

1577 

1578 try: 

1579 loop.add_signal_handler(sigwinch, handler) 

1580 yield 

1581 finally: 

1582 # Restore the previous signal handler. 

1583 loop.remove_signal_handler(sigwinch) 

1584 if previous_winch_handler is not None: 

1585 loop.add_signal_handler( 

1586 sigwinch, 

1587 previous_winch_handler._callback, 

1588 *previous_winch_handler._args, 

1589 ) 

1590 

1591 

1592@contextmanager 

1593def _restore_sigint_from_ctypes() -> Generator[None, None, None]: 

1594 # The following functions are part of the stable ABI since python 3.2 

1595 # See: https://docs.python.org/3/c-api/sys.html#c.PyOS_getsig 

1596 # Inline import: these are not available on Pypy. 

1597 try: 

1598 from ctypes import c_int, c_void_p, pythonapi 

1599 except ImportError: 

1600 have_ctypes_signal = False 

1601 else: 

1602 # GraalPy has the functions, but they don't work 

1603 have_ctypes_signal = sys.implementation.name != "graalpy" 

1604 

1605 if have_ctypes_signal: 

1606 # PyOS_sighandler_t PyOS_getsig(int i) 

1607 pythonapi.PyOS_getsig.restype = c_void_p 

1608 pythonapi.PyOS_getsig.argtypes = (c_int,) 

1609 

1610 # PyOS_sighandler_t PyOS_setsig(int i, PyOS_sighandler_t h) 

1611 pythonapi.PyOS_setsig.restype = c_void_p 

1612 pythonapi.PyOS_setsig.argtypes = ( 

1613 c_int, 

1614 c_void_p, 

1615 ) 

1616 

1617 sigint = signal.getsignal(signal.SIGINT) 

1618 if have_ctypes_signal: 

1619 sigint_os = pythonapi.PyOS_getsig(signal.SIGINT) 

1620 

1621 try: 

1622 yield 

1623 finally: 

1624 if sigint is not None: 

1625 signal.signal(signal.SIGINT, sigint) 

1626 if have_ctypes_signal: 

1627 pythonapi.PyOS_setsig(signal.SIGINT, sigint_os)