Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/application/application.py: 18%

612 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import contextvars 

5import os 

6import re 

7import signal 

8import sys 

9import threading 

10import time 

11from asyncio import ( 

12 AbstractEventLoop, 

13 Future, 

14 Task, 

15 ensure_future, 

16 get_running_loop, 

17 sleep, 

18) 

19from contextlib import ExitStack, contextmanager 

20from subprocess import Popen 

21from traceback import format_tb 

22from typing import ( 

23 Any, 

24 Callable, 

25 Coroutine, 

26 Generator, 

27 Generic, 

28 Hashable, 

29 Iterable, 

30 Iterator, 

31 TypeVar, 

32 cast, 

33 overload, 

34) 

35 

36from prompt_toolkit.buffer import Buffer 

37from prompt_toolkit.cache import SimpleCache 

38from prompt_toolkit.clipboard import Clipboard, InMemoryClipboard 

39from prompt_toolkit.cursor_shapes import AnyCursorShapeConfig, to_cursor_shape_config 

40from prompt_toolkit.data_structures import Size 

41from prompt_toolkit.enums import EditingMode 

42from prompt_toolkit.eventloop import ( 

43 InputHook, 

44 get_traceback_from_context, 

45 new_eventloop_with_inputhook, 

46 run_in_executor_with_context, 

47) 

48from prompt_toolkit.eventloop.utils import call_soon_threadsafe 

49from prompt_toolkit.filters import Condition, Filter, FilterOrBool, to_filter 

50from prompt_toolkit.formatted_text import AnyFormattedText 

51from prompt_toolkit.input.base import Input 

52from prompt_toolkit.input.typeahead import get_typeahead, store_typeahead 

53from prompt_toolkit.key_binding.bindings.page_navigation import ( 

54 load_page_navigation_bindings, 

55) 

56from prompt_toolkit.key_binding.defaults import load_key_bindings 

57from prompt_toolkit.key_binding.emacs_state import EmacsState 

58from prompt_toolkit.key_binding.key_bindings import ( 

59 Binding, 

60 ConditionalKeyBindings, 

61 GlobalOnlyKeyBindings, 

62 KeyBindings, 

63 KeyBindingsBase, 

64 KeysTuple, 

65 merge_key_bindings, 

66) 

67from prompt_toolkit.key_binding.key_processor import KeyPressEvent, KeyProcessor 

68from prompt_toolkit.key_binding.vi_state import ViState 

69from prompt_toolkit.keys import Keys 

70from prompt_toolkit.layout.containers import Container, Window 

71from prompt_toolkit.layout.controls import BufferControl, UIControl 

72from prompt_toolkit.layout.dummy import create_dummy_layout 

73from prompt_toolkit.layout.layout import Layout, walk 

74from prompt_toolkit.output import ColorDepth, Output 

75from prompt_toolkit.renderer import Renderer, print_formatted_text 

76from prompt_toolkit.search import SearchState 

77from prompt_toolkit.styles import ( 

78 BaseStyle, 

79 DummyStyle, 

80 DummyStyleTransformation, 

81 DynamicStyle, 

82 StyleTransformation, 

83 default_pygments_style, 

84 default_ui_style, 

85 merge_styles, 

86) 

87from prompt_toolkit.utils import Event, in_main_thread 

88 

89from .current import get_app_session, set_app 

90from .run_in_terminal import in_terminal, run_in_terminal 

91 

92__all__ = [ 

93 "Application", 

94] 

95 

96 

97E = KeyPressEvent 

98_AppResult = TypeVar("_AppResult") 

99ApplicationEventHandler = Callable[["Application[_AppResult]"], None] 

100 

101_SIGWINCH = getattr(signal, "SIGWINCH", None) 

102_SIGTSTP = getattr(signal, "SIGTSTP", None) 

103 

104 

105class Application(Generic[_AppResult]): 

106 """ 

107 The main Application class! 

108 This glues everything together. 

109 

110 :param layout: A :class:`~prompt_toolkit.layout.Layout` instance. 

111 :param key_bindings: 

112 :class:`~prompt_toolkit.key_binding.KeyBindingsBase` instance for 

113 the key bindings. 

114 :param clipboard: :class:`~prompt_toolkit.clipboard.Clipboard` to use. 

115 :param full_screen: When True, run the application on the alternate screen buffer. 

116 :param color_depth: Any :class:`~.ColorDepth` value, a callable that 

117 returns a :class:`~.ColorDepth` or `None` for default. 

118 :param erase_when_done: (bool) Clear the application output when it finishes. 

119 :param reverse_vi_search_direction: Normally, in Vi mode, a '/' searches 

120 forward and a '?' searches backward. In Readline mode, this is usually 

121 reversed. 

122 :param min_redraw_interval: Number of seconds to wait between redraws. Use 

123 this for applications where `invalidate` is called a lot. This could cause 

124 a lot of terminal output, which some terminals are not able to process. 

125 

126 `None` means that every `invalidate` will be scheduled right away 

127 (which is usually fine). 

128 

129 When one `invalidate` is called, but a scheduled redraw of a previous 

130 `invalidate` call has not been executed yet, nothing will happen in any 

131 case. 

132 

133 :param max_render_postpone_time: When there is high CPU (a lot of other 

134 scheduled calls), postpone the rendering max x seconds. '0' means: 

135 don't postpone. '.5' means: try to draw at least twice a second. 

136 

137 :param refresh_interval: Automatically invalidate the UI every so many 

138 seconds. When `None` (the default), only invalidate when `invalidate` 

139 has been called. 

140 

141 :param terminal_size_polling_interval: Poll the terminal size every so many 

142 seconds. Useful if the applications runs in a thread other then then 

143 main thread where SIGWINCH can't be handled, or on Windows. 

144 

145 Filters: 

146 

147 :param mouse_support: (:class:`~prompt_toolkit.filters.Filter` or 

148 boolean). When True, enable mouse support. 

149 :param paste_mode: :class:`~prompt_toolkit.filters.Filter` or boolean. 

150 :param editing_mode: :class:`~prompt_toolkit.enums.EditingMode`. 

151 

152 :param enable_page_navigation_bindings: When `True`, enable the page 

153 navigation key bindings. These include both Emacs and Vi bindings like 

154 page-up, page-down and so on to scroll through pages. Mostly useful for 

155 creating an editor or other full screen applications. Probably, you 

156 don't want this for the implementation of a REPL. By default, this is 

157 enabled if `full_screen` is set. 

158 

159 Callbacks (all of these should accept an 

160 :class:`~prompt_toolkit.application.Application` object as input.) 

161 

162 :param on_reset: Called during reset. 

163 :param on_invalidate: Called when the UI has been invalidated. 

164 :param before_render: Called right before rendering. 

165 :param after_render: Called right after rendering. 

166 

167 I/O: 

168 (Note that the preferred way to change the input/output is by creating an 

169 `AppSession` with the required input/output objects. If you need multiple 

170 applications running at the same time, you have to create a separate 

171 `AppSession` using a `with create_app_session():` block. 

172 

173 :param input: :class:`~prompt_toolkit.input.Input` instance. 

174 :param output: :class:`~prompt_toolkit.output.Output` instance. (Probably 

175 Vt100_Output or Win32Output.) 

176 

177 Usage: 

178 

179 app = Application(...) 

180 app.run() 

181 

182 # Or 

183 await app.run_async() 

184 """ 

185 

186 def __init__( 

187 self, 

188 layout: Layout | None = None, 

189 style: BaseStyle | None = None, 

190 include_default_pygments_style: FilterOrBool = True, 

191 style_transformation: StyleTransformation | None = None, 

192 key_bindings: KeyBindingsBase | None = None, 

193 clipboard: Clipboard | None = None, 

194 full_screen: bool = False, 

195 color_depth: (ColorDepth | Callable[[], ColorDepth | None] | None) = None, 

196 mouse_support: FilterOrBool = False, 

197 enable_page_navigation_bindings: None 

198 | (FilterOrBool) = None, # Can be None, True or False. 

199 paste_mode: FilterOrBool = False, 

200 editing_mode: EditingMode = EditingMode.EMACS, 

201 erase_when_done: bool = False, 

202 reverse_vi_search_direction: FilterOrBool = False, 

203 min_redraw_interval: float | int | None = None, 

204 max_render_postpone_time: float | int | None = 0.01, 

205 refresh_interval: float | None = None, 

206 terminal_size_polling_interval: float | None = 0.5, 

207 cursor: AnyCursorShapeConfig = None, 

208 on_reset: ApplicationEventHandler[_AppResult] | None = None, 

209 on_invalidate: ApplicationEventHandler[_AppResult] | None = None, 

210 before_render: ApplicationEventHandler[_AppResult] | None = None, 

211 after_render: ApplicationEventHandler[_AppResult] | None = None, 

212 # I/O. 

213 input: Input | None = None, 

214 output: Output | None = None, 

215 ) -> None: 

216 # If `enable_page_navigation_bindings` is not specified, enable it in 

217 # case of full screen applications only. This can be overridden by the user. 

218 if enable_page_navigation_bindings is None: 

219 enable_page_navigation_bindings = Condition(lambda: self.full_screen) 

220 

221 paste_mode = to_filter(paste_mode) 

222 mouse_support = to_filter(mouse_support) 

223 reverse_vi_search_direction = to_filter(reverse_vi_search_direction) 

224 enable_page_navigation_bindings = to_filter(enable_page_navigation_bindings) 

225 include_default_pygments_style = to_filter(include_default_pygments_style) 

226 

227 if layout is None: 

228 layout = create_dummy_layout() 

229 

230 if style_transformation is None: 

231 style_transformation = DummyStyleTransformation() 

232 

233 self.style = style 

234 self.style_transformation = style_transformation 

235 

236 # Key bindings. 

237 self.key_bindings = key_bindings 

238 self._default_bindings = load_key_bindings() 

239 self._page_navigation_bindings = load_page_navigation_bindings() 

240 

241 self.layout = layout 

242 self.clipboard = clipboard or InMemoryClipboard() 

243 self.full_screen: bool = full_screen 

244 self._color_depth = color_depth 

245 self.mouse_support = mouse_support 

246 

247 self.paste_mode = paste_mode 

248 self.editing_mode = editing_mode 

249 self.erase_when_done = erase_when_done 

250 self.reverse_vi_search_direction = reverse_vi_search_direction 

251 self.enable_page_navigation_bindings = enable_page_navigation_bindings 

252 self.min_redraw_interval = min_redraw_interval 

253 self.max_render_postpone_time = max_render_postpone_time 

254 self.refresh_interval = refresh_interval 

255 self.terminal_size_polling_interval = terminal_size_polling_interval 

256 

257 self.cursor = to_cursor_shape_config(cursor) 

258 

259 # Events. 

260 self.on_invalidate = Event(self, on_invalidate) 

261 self.on_reset = Event(self, on_reset) 

262 self.before_render = Event(self, before_render) 

263 self.after_render = Event(self, after_render) 

264 

265 # I/O. 

266 session = get_app_session() 

267 self.output = output or session.output 

268 self.input = input or session.input 

269 

270 # List of 'extra' functions to execute before a Application.run. 

271 self.pre_run_callables: list[Callable[[], None]] = [] 

272 

273 self._is_running = False 

274 self.future: Future[_AppResult] | None = None 

275 self.loop: AbstractEventLoop | None = None 

276 self._loop_thread: threading.Thread | None = None 

277 self.context: contextvars.Context | None = None 

278 

279 #: Quoted insert. This flag is set if we go into quoted insert mode. 

280 self.quoted_insert = False 

281 

282 #: Vi state. (For Vi key bindings.) 

283 self.vi_state = ViState() 

284 self.emacs_state = EmacsState() 

285 

286 #: When to flush the input (For flushing escape keys.) This is important 

287 #: on terminals that use vt100 input. We can't distinguish the escape 

288 #: key from for instance the left-arrow key, if we don't know what follows 

289 #: after "\x1b". This little timer will consider "\x1b" to be escape if 

290 #: nothing did follow in this time span. 

291 #: This seems to work like the `ttimeoutlen` option in Vim. 

292 self.ttimeoutlen = 0.5 # Seconds. 

293 

294 #: Like Vim's `timeoutlen` option. This can be `None` or a float. For 

295 #: instance, suppose that we have a key binding AB and a second key 

296 #: binding A. If the uses presses A and then waits, we don't handle 

297 #: this binding yet (unless it was marked 'eager'), because we don't 

298 #: know what will follow. This timeout is the maximum amount of time 

299 #: that we wait until we call the handlers anyway. Pass `None` to 

300 #: disable this timeout. 

301 self.timeoutlen = 1.0 

302 

303 #: The `Renderer` instance. 

304 # Make sure that the same stdout is used, when a custom renderer has been passed. 

305 self._merged_style = self._create_merged_style(include_default_pygments_style) 

306 

307 self.renderer = Renderer( 

308 self._merged_style, 

309 self.output, 

310 full_screen=full_screen, 

311 mouse_support=mouse_support, 

312 cpr_not_supported_callback=self.cpr_not_supported_callback, 

313 ) 

314 

315 #: Render counter. This one is increased every time the UI is rendered. 

316 #: It can be used as a key for caching certain information during one 

317 #: rendering. 

318 self.render_counter = 0 

319 

320 # Invalidate flag. When 'True', a repaint has been scheduled. 

321 self._invalidated = False 

322 self._invalidate_events: list[ 

323 Event[object] 

324 ] = [] # Collection of 'invalidate' Event objects. 

325 self._last_redraw_time = 0.0 # Unix timestamp of last redraw. Used when 

326 # `min_redraw_interval` is given. 

327 

328 #: The `InputProcessor` instance. 

329 self.key_processor = KeyProcessor(_CombinedRegistry(self)) 

330 

331 # If `run_in_terminal` was called. This will point to a `Future` what will be 

332 # set at the point when the previous run finishes. 

333 self._running_in_terminal = False 

334 self._running_in_terminal_f: Future[None] | None = None 

335 

336 # Trigger initialize callback. 

337 self.reset() 

338 

339 def _create_merged_style(self, include_default_pygments_style: Filter) -> BaseStyle: 

340 """ 

341 Create a `Style` object that merges the default UI style, the default 

342 pygments style, and the custom user style. 

343 """ 

344 dummy_style = DummyStyle() 

345 pygments_style = default_pygments_style() 

346 

347 @DynamicStyle 

348 def conditional_pygments_style() -> BaseStyle: 

349 if include_default_pygments_style(): 

350 return pygments_style 

351 else: 

352 return dummy_style 

353 

354 return merge_styles( 

355 [ 

356 default_ui_style(), 

357 conditional_pygments_style, 

358 DynamicStyle(lambda: self.style), 

359 ] 

360 ) 

361 

362 @property 

363 def color_depth(self) -> ColorDepth: 

364 """ 

365 The active :class:`.ColorDepth`. 

366 

367 The current value is determined as follows: 

368 

369 - If a color depth was given explicitly to this application, use that 

370 value. 

371 - Otherwise, fall back to the color depth that is reported by the 

372 :class:`.Output` implementation. If the :class:`.Output` class was 

373 created using `output.defaults.create_output`, then this value is 

374 coming from the $PROMPT_TOOLKIT_COLOR_DEPTH environment variable. 

375 """ 

376 depth = self._color_depth 

377 

378 if callable(depth): 

379 depth = depth() 

380 

381 if depth is None: 

382 depth = self.output.get_default_color_depth() 

383 

384 return depth 

385 

386 @property 

387 def current_buffer(self) -> Buffer: 

388 """ 

389 The currently focused :class:`~.Buffer`. 

390 

391 (This returns a dummy :class:`.Buffer` when none of the actual buffers 

392 has the focus. In this case, it's really not practical to check for 

393 `None` values or catch exceptions every time.) 

394 """ 

395 return self.layout.current_buffer or Buffer( 

396 name="dummy-buffer" 

397 ) # Dummy buffer. 

398 

399 @property 

400 def current_search_state(self) -> SearchState: 

401 """ 

402 Return the current :class:`.SearchState`. (The one for the focused 

403 :class:`.BufferControl`.) 

404 """ 

405 ui_control = self.layout.current_control 

406 if isinstance(ui_control, BufferControl): 

407 return ui_control.search_state 

408 else: 

409 return SearchState() # Dummy search state. (Don't return None!) 

410 

411 def reset(self) -> None: 

412 """ 

413 Reset everything, for reading the next input. 

414 """ 

415 # Notice that we don't reset the buffers. (This happens just before 

416 # returning, and when we have multiple buffers, we clearly want the 

417 # content in the other buffers to remain unchanged between several 

418 # calls of `run`. (And the same is true for the focus stack.) 

419 

420 self.exit_style = "" 

421 

422 self._background_tasks: set[Task[None]] = set() 

423 

424 self.renderer.reset() 

425 self.key_processor.reset() 

426 self.layout.reset() 

427 self.vi_state.reset() 

428 self.emacs_state.reset() 

429 

430 # Trigger reset event. 

431 self.on_reset.fire() 

432 

433 # Make sure that we have a 'focusable' widget focused. 

434 # (The `Layout` class can't determine this.) 

435 layout = self.layout 

436 

437 if not layout.current_control.is_focusable(): 

438 for w in layout.find_all_windows(): 

439 if w.content.is_focusable(): 

440 layout.current_window = w 

441 break 

442 

443 def invalidate(self) -> None: 

444 """ 

445 Thread safe way of sending a repaint trigger to the input event loop. 

446 """ 

447 if not self._is_running: 

448 # Don't schedule a redraw if we're not running. 

449 # Otherwise, `get_running_loop()` in `call_soon_threadsafe` can fail. 

450 # See: https://github.com/dbcli/mycli/issues/797 

451 return 

452 

453 # `invalidate()` called if we don't have a loop yet (not running?), or 

454 # after the event loop was closed. 

455 if self.loop is None or self.loop.is_closed(): 

456 return 

457 

458 # Never schedule a second redraw, when a previous one has not yet been 

459 # executed. (This should protect against other threads calling 

460 # 'invalidate' many times, resulting in 100% CPU.) 

461 if self._invalidated: 

462 return 

463 else: 

464 self._invalidated = True 

465 

466 # Trigger event. 

467 self.loop.call_soon_threadsafe(self.on_invalidate.fire) 

468 

469 def redraw() -> None: 

470 self._invalidated = False 

471 self._redraw() 

472 

473 def schedule_redraw() -> None: 

474 call_soon_threadsafe( 

475 redraw, max_postpone_time=self.max_render_postpone_time, loop=self.loop 

476 ) 

477 

478 if self.min_redraw_interval: 

479 # When a minimum redraw interval is set, wait minimum this amount 

480 # of time between redraws. 

481 diff = time.time() - self._last_redraw_time 

482 if diff < self.min_redraw_interval: 

483 

484 async def redraw_in_future() -> None: 

485 await sleep(cast(float, self.min_redraw_interval) - diff) 

486 schedule_redraw() 

487 

488 self.loop.call_soon_threadsafe( 

489 lambda: self.create_background_task(redraw_in_future()) 

490 ) 

491 else: 

492 schedule_redraw() 

493 else: 

494 schedule_redraw() 

495 

496 @property 

497 def invalidated(self) -> bool: 

498 "True when a redraw operation has been scheduled." 

499 return self._invalidated 

500 

501 def _redraw(self, render_as_done: bool = False) -> None: 

502 """ 

503 Render the command line again. (Not thread safe!) (From other threads, 

504 or if unsure, use :meth:`.Application.invalidate`.) 

505 

506 :param render_as_done: make sure to put the cursor after the UI. 

507 """ 

508 

509 def run_in_context() -> None: 

510 # Only draw when no sub application was started. 

511 if self._is_running and not self._running_in_terminal: 

512 if self.min_redraw_interval: 

513 self._last_redraw_time = time.time() 

514 

515 # Render 

516 self.render_counter += 1 

517 self.before_render.fire() 

518 

519 if render_as_done: 

520 if self.erase_when_done: 

521 self.renderer.erase() 

522 else: 

523 # Draw in 'done' state and reset renderer. 

524 self.renderer.render(self, self.layout, is_done=render_as_done) 

525 else: 

526 self.renderer.render(self, self.layout) 

527 

528 self.layout.update_parents_relations() 

529 

530 # Fire render event. 

531 self.after_render.fire() 

532 

533 self._update_invalidate_events() 

534 

535 # NOTE: We want to make sure this Application is the active one. The 

536 # invalidate function is often called from a context where this 

537 # application is not the active one. (Like the 

538 # `PromptSession._auto_refresh_context`). 

539 # We copy the context in case the context was already active, to 

540 # prevent RuntimeErrors. (The rendering is not supposed to change 

541 # any context variables.) 

542 if self.context is not None: 

543 self.context.copy().run(run_in_context) 

544 

545 def _start_auto_refresh_task(self) -> None: 

546 """ 

547 Start a while/true loop in the background for automatic invalidation of 

548 the UI. 

549 """ 

550 if self.refresh_interval is not None and self.refresh_interval != 0: 

551 

552 async def auto_refresh(refresh_interval: float) -> None: 

553 while True: 

554 await sleep(refresh_interval) 

555 self.invalidate() 

556 

557 self.create_background_task(auto_refresh(self.refresh_interval)) 

558 

559 def _update_invalidate_events(self) -> None: 

560 """ 

561 Make sure to attach 'invalidate' handlers to all invalidate events in 

562 the UI. 

563 """ 

564 # Remove all the original event handlers. (Components can be removed 

565 # from the UI.) 

566 for ev in self._invalidate_events: 

567 ev -= self._invalidate_handler 

568 

569 # Gather all new events. 

570 # (All controls are able to invalidate themselves.) 

571 def gather_events() -> Iterable[Event[object]]: 

572 for c in self.layout.find_all_controls(): 

573 yield from c.get_invalidate_events() 

574 

575 self._invalidate_events = list(gather_events()) 

576 

577 for ev in self._invalidate_events: 

578 ev += self._invalidate_handler 

579 

580 def _invalidate_handler(self, sender: object) -> None: 

581 """ 

582 Handler for invalidate events coming from UIControls. 

583 

584 (This handles the difference in signature between event handler and 

585 `self.invalidate`. It also needs to be a method -not a nested 

586 function-, so that we can remove it again .) 

587 """ 

588 self.invalidate() 

589 

590 def _on_resize(self) -> None: 

591 """ 

592 When the window size changes, we erase the current output and request 

593 again the cursor position. When the CPR answer arrives, the output is 

594 drawn again. 

595 """ 

596 # Erase, request position (when cursor is at the start position) 

597 # and redraw again. -- The order is important. 

598 self.renderer.erase(leave_alternate_screen=False) 

599 self._request_absolute_cursor_position() 

600 self._redraw() 

601 

602 def _pre_run(self, pre_run: Callable[[], None] | None = None) -> None: 

603 """ 

604 Called during `run`. 

605 

606 `self.future` should be set to the new future at the point where this 

607 is called in order to avoid data races. `pre_run` can be used to set a 

608 `threading.Event` to synchronize with UI termination code, running in 

609 another thread that would call `Application.exit`. (See the progress 

610 bar code for an example.) 

611 """ 

612 if pre_run: 

613 pre_run() 

614 

615 # Process registered "pre_run_callables" and clear list. 

616 for c in self.pre_run_callables: 

617 c() 

618 del self.pre_run_callables[:] 

619 

620 async def run_async( 

621 self, 

622 pre_run: Callable[[], None] | None = None, 

623 set_exception_handler: bool = True, 

624 handle_sigint: bool = True, 

625 slow_callback_duration: float = 0.5, 

626 ) -> _AppResult: 

627 """ 

628 Run the prompt_toolkit :class:`~prompt_toolkit.application.Application` 

629 until :meth:`~prompt_toolkit.application.Application.exit` has been 

630 called. Return the value that was passed to 

631 :meth:`~prompt_toolkit.application.Application.exit`. 

632 

633 This is the main entry point for a prompt_toolkit 

634 :class:`~prompt_toolkit.application.Application` and usually the only 

635 place where the event loop is actually running. 

636 

637 :param pre_run: Optional callable, which is called right after the 

638 "reset" of the application. 

639 :param set_exception_handler: When set, in case of an exception, go out 

640 of the alternate screen and hide the application, display the 

641 exception, and wait for the user to press ENTER. 

642 :param handle_sigint: Handle SIGINT signal if possible. This will call 

643 the `<sigint>` key binding when a SIGINT is received. (This only 

644 works in the main thread.) 

645 :param slow_callback_duration: Display warnings if code scheduled in 

646 the asyncio event loop takes more time than this. The asyncio 

647 default of `0.1` is sometimes not sufficient on a slow system, 

648 because exceptionally, the drawing of the app, which happens in the 

649 event loop, can take a bit longer from time to time. 

650 """ 

651 assert not self._is_running, "Application is already running." 

652 

653 if not in_main_thread() or sys.platform == "win32": 

654 # Handling signals in other threads is not supported. 

655 # Also on Windows, `add_signal_handler(signal.SIGINT, ...)` raises 

656 # `NotImplementedError`. 

657 # See: https://github.com/prompt-toolkit/python-prompt-toolkit/issues/1553 

658 handle_sigint = False 

659 

660 async def _run_async(f: asyncio.Future[_AppResult]) -> _AppResult: 

661 context = contextvars.copy_context() 

662 self.context = context 

663 

664 # Counter for cancelling 'flush' timeouts. Every time when a key is 

665 # pressed, we start a 'flush' timer for flushing our escape key. But 

666 # when any subsequent input is received, a new timer is started and 

667 # the current timer will be ignored. 

668 flush_task: asyncio.Task[None] | None = None 

669 

670 # Reset. 

671 # (`self.future` needs to be set when `pre_run` is called.) 

672 self.reset() 

673 self._pre_run(pre_run) 

674 

675 # Feed type ahead input first. 

676 self.key_processor.feed_multiple(get_typeahead(self.input)) 

677 self.key_processor.process_keys() 

678 

679 def read_from_input() -> None: 

680 nonlocal flush_task 

681 

682 # Ignore when we aren't running anymore. This callback will 

683 # removed from the loop next time. (It could be that it was 

684 # still in the 'tasks' list of the loop.) 

685 # Except: if we need to process incoming CPRs. 

686 if not self._is_running and not self.renderer.waiting_for_cpr: 

687 return 

688 

689 # Get keys from the input object. 

690 keys = self.input.read_keys() 

691 

692 # Feed to key processor. 

693 self.key_processor.feed_multiple(keys) 

694 self.key_processor.process_keys() 

695 

696 # Quit when the input stream was closed. 

697 if self.input.closed: 

698 if not f.done(): 

699 f.set_exception(EOFError) 

700 else: 

701 # Automatically flush keys. 

702 if flush_task: 

703 flush_task.cancel() 

704 flush_task = self.create_background_task(auto_flush_input()) 

705 

706 def read_from_input_in_context() -> None: 

707 # Ensure that key bindings callbacks are always executed in the 

708 # current context. This is important when key bindings are 

709 # accessing contextvars. (These callbacks are currently being 

710 # called from a different context. Underneath, 

711 # `loop.add_reader` is used to register the stdin FD.) 

712 # (We copy the context to avoid a `RuntimeError` in case the 

713 # context is already active.) 

714 context.copy().run(read_from_input) 

715 

716 async def auto_flush_input() -> None: 

717 # Flush input after timeout. 

718 # (Used for flushing the enter key.) 

719 # This sleep can be cancelled, in that case we won't flush yet. 

720 await sleep(self.ttimeoutlen) 

721 flush_input() 

722 

723 def flush_input() -> None: 

724 if not self.is_done: 

725 # Get keys, and feed to key processor. 

726 keys = self.input.flush_keys() 

727 self.key_processor.feed_multiple(keys) 

728 self.key_processor.process_keys() 

729 

730 if self.input.closed: 

731 f.set_exception(EOFError) 

732 

733 # Enter raw mode, attach input and attach WINCH event handler. 

734 with self.input.raw_mode(), self.input.attach( 

735 read_from_input_in_context 

736 ), attach_winch_signal_handler(self._on_resize): 

737 # Draw UI. 

738 self._request_absolute_cursor_position() 

739 self._redraw() 

740 self._start_auto_refresh_task() 

741 

742 self.create_background_task(self._poll_output_size()) 

743 

744 # Wait for UI to finish. 

745 try: 

746 result = await f 

747 finally: 

748 # In any case, when the application finishes. 

749 # (Successful, or because of an error.) 

750 try: 

751 self._redraw(render_as_done=True) 

752 finally: 

753 # _redraw has a good chance to fail if it calls widgets 

754 # with bad code. Make sure to reset the renderer 

755 # anyway. 

756 self.renderer.reset() 

757 

758 # Unset `is_running`, this ensures that possibly 

759 # scheduled draws won't paint during the following 

760 # yield. 

761 self._is_running = False 

762 

763 # Detach event handlers for invalidate events. 

764 # (Important when a UIControl is embedded in multiple 

765 # applications, like ptterm in pymux. An invalidate 

766 # should not trigger a repaint in terminated 

767 # applications.) 

768 for ev in self._invalidate_events: 

769 ev -= self._invalidate_handler 

770 self._invalidate_events = [] 

771 

772 # Wait for CPR responses. 

773 if self.output.responds_to_cpr: 

774 await self.renderer.wait_for_cpr_responses() 

775 

776 # Wait for the run-in-terminals to terminate. 

777 previous_run_in_terminal_f = self._running_in_terminal_f 

778 

779 if previous_run_in_terminal_f: 

780 await previous_run_in_terminal_f 

781 

782 # Store unprocessed input as typeahead for next time. 

783 store_typeahead(self.input, self.key_processor.empty_queue()) 

784 

785 return result 

786 

787 @contextmanager 

788 def set_loop() -> Iterator[AbstractEventLoop]: 

789 loop = get_running_loop() 

790 self.loop = loop 

791 self._loop_thread = threading.current_thread() 

792 

793 try: 

794 yield loop 

795 finally: 

796 self.loop = None 

797 self._loop_thread = None 

798 

799 @contextmanager 

800 def set_is_running() -> Iterator[None]: 

801 self._is_running = True 

802 try: 

803 yield 

804 finally: 

805 self._is_running = False 

806 

807 @contextmanager 

808 def set_handle_sigint(loop: AbstractEventLoop) -> Iterator[None]: 

809 if handle_sigint: 

810 with _restore_sigint_from_ctypes(): 

811 # save sigint handlers (python and os level) 

812 # See: https://github.com/prompt-toolkit/python-prompt-toolkit/issues/1576 

813 loop.add_signal_handler( 

814 signal.SIGINT, 

815 lambda *_: loop.call_soon_threadsafe( 

816 self.key_processor.send_sigint 

817 ), 

818 ) 

819 try: 

820 yield 

821 finally: 

822 loop.remove_signal_handler(signal.SIGINT) 

823 else: 

824 yield 

825 

826 @contextmanager 

827 def set_exception_handler_ctx(loop: AbstractEventLoop) -> Iterator[None]: 

828 if set_exception_handler: 

829 previous_exc_handler = loop.get_exception_handler() 

830 loop.set_exception_handler(self._handle_exception) 

831 try: 

832 yield 

833 finally: 

834 loop.set_exception_handler(previous_exc_handler) 

835 

836 else: 

837 yield 

838 

839 @contextmanager 

840 def set_callback_duration(loop: AbstractEventLoop) -> Iterator[None]: 

841 # Set slow_callback_duration. 

842 original_slow_callback_duration = loop.slow_callback_duration 

843 loop.slow_callback_duration = slow_callback_duration 

844 try: 

845 yield 

846 finally: 

847 # Reset slow_callback_duration. 

848 loop.slow_callback_duration = original_slow_callback_duration 

849 

850 @contextmanager 

851 def create_future( 

852 loop: AbstractEventLoop, 

853 ) -> Iterator[asyncio.Future[_AppResult]]: 

854 f = loop.create_future() 

855 self.future = f # XXX: make sure to set this before calling '_redraw'. 

856 

857 try: 

858 yield f 

859 finally: 

860 # Also remove the Future again. (This brings the 

861 # application back to its initial state, where it also 

862 # doesn't have a Future.) 

863 self.future = None 

864 

865 with ExitStack() as stack: 

866 stack.enter_context(set_is_running()) 

867 

868 # Make sure to set `_invalidated` to `False` to begin with, 

869 # otherwise we're not going to paint anything. This can happen if 

870 # this application had run before on a different event loop, and a 

871 # paint was scheduled using `call_soon_threadsafe` with 

872 # `max_postpone_time`. 

873 self._invalidated = False 

874 

875 loop = stack.enter_context(set_loop()) 

876 

877 stack.enter_context(set_handle_sigint(loop)) 

878 stack.enter_context(set_exception_handler_ctx(loop)) 

879 stack.enter_context(set_callback_duration(loop)) 

880 stack.enter_context(set_app(self)) 

881 stack.enter_context(self._enable_breakpointhook()) 

882 

883 f = stack.enter_context(create_future(loop)) 

884 

885 try: 

886 return await _run_async(f) 

887 finally: 

888 # Wait for the background tasks to be done. This needs to 

889 # go in the finally! If `_run_async` raises 

890 # `KeyboardInterrupt`, we still want to wait for the 

891 # background tasks. 

892 await self.cancel_and_wait_for_background_tasks() 

893 

894 # The `ExitStack` above is defined in typeshed in a way that it can 

895 # swallow exceptions. Without next line, mypy would think that there's 

896 # a possibility we don't return here. See: 

897 # https://github.com/python/mypy/issues/7726 

898 assert False, "unreachable" 

899 

900 def run( 

901 self, 

902 pre_run: Callable[[], None] | None = None, 

903 set_exception_handler: bool = True, 

904 handle_sigint: bool = True, 

905 in_thread: bool = False, 

906 inputhook: InputHook | None = None, 

907 ) -> _AppResult: 

908 """ 

909 A blocking 'run' call that waits until the UI is finished. 

910 

911 This will run the application in a fresh asyncio event loop. 

912 

913 :param pre_run: Optional callable, which is called right after the 

914 "reset" of the application. 

915 :param set_exception_handler: When set, in case of an exception, go out 

916 of the alternate screen and hide the application, display the 

917 exception, and wait for the user to press ENTER. 

918 :param in_thread: When true, run the application in a background 

919 thread, and block the current thread until the application 

920 terminates. This is useful if we need to be sure the application 

921 won't use the current event loop (asyncio does not support nested 

922 event loops). A new event loop will be created in this background 

923 thread, and that loop will also be closed when the background 

924 thread terminates. When this is used, it's especially important to 

925 make sure that all asyncio background tasks are managed through 

926 `get_appp().create_background_task()`, so that unfinished tasks are 

927 properly cancelled before the event loop is closed. This is used 

928 for instance in ptpython. 

929 :param handle_sigint: Handle SIGINT signal. Call the key binding for 

930 `Keys.SIGINT`. (This only works in the main thread.) 

931 """ 

932 if in_thread: 

933 result: _AppResult 

934 exception: BaseException | None = None 

935 

936 def run_in_thread() -> None: 

937 nonlocal result, exception 

938 try: 

939 result = self.run( 

940 pre_run=pre_run, 

941 set_exception_handler=set_exception_handler, 

942 # Signal handling only works in the main thread. 

943 handle_sigint=False, 

944 inputhook=inputhook, 

945 ) 

946 except BaseException as e: 

947 exception = e 

948 

949 thread = threading.Thread(target=run_in_thread) 

950 thread.start() 

951 thread.join() 

952 

953 if exception is not None: 

954 raise exception 

955 return result 

956 

957 coro = self.run_async( 

958 pre_run=pre_run, 

959 set_exception_handler=set_exception_handler, 

960 handle_sigint=handle_sigint, 

961 ) 

962 

963 def _called_from_ipython() -> bool: 

964 try: 

965 return ( 

966 sys.modules["IPython"].version_info < (8, 18, 0, "") 

967 and "IPython/terminal/interactiveshell.py" 

968 in sys._getframe(3).f_code.co_filename 

969 ) 

970 except BaseException: 

971 return False 

972 

973 if inputhook is not None: 

974 # Create new event loop with given input hook and run the app. 

975 # In Python 3.12, we can use asyncio.run(loop_factory=...) 

976 # For now, use `run_until_complete()`. 

977 loop = new_eventloop_with_inputhook(inputhook) 

978 result = loop.run_until_complete(coro) 

979 loop.run_until_complete(loop.shutdown_asyncgens()) 

980 loop.close() 

981 return result 

982 

983 elif _called_from_ipython(): 

984 # workaround to make input hooks work for IPython until 

985 # https://github.com/ipython/ipython/pull/14241 is merged. 

986 # IPython was setting the input hook by installing an event loop 

987 # previously. 

988 try: 

989 # See whether a loop was installed already. If so, use that. 

990 # That's required for the input hooks to work, they are 

991 # installed using `set_event_loop`. 

992 loop = asyncio.get_event_loop() 

993 except RuntimeError: 

994 # No loop installed. Run like usual. 

995 return asyncio.run(coro) 

996 else: 

997 # Use existing loop. 

998 return loop.run_until_complete(coro) 

999 

1000 else: 

1001 # No loop installed. Run like usual. 

1002 return asyncio.run(coro) 

1003 

1004 def _handle_exception( 

1005 self, loop: AbstractEventLoop, context: dict[str, Any] 

1006 ) -> None: 

1007 """ 

1008 Handler for event loop exceptions. 

1009 This will print the exception, using run_in_terminal. 

1010 """ 

1011 # For Python 2: we have to get traceback at this point, because 

1012 # we're still in the 'except:' block of the event loop where the 

1013 # traceback is still available. Moving this code in the 

1014 # 'print_exception' coroutine will loose the exception. 

1015 tb = get_traceback_from_context(context) 

1016 formatted_tb = "".join(format_tb(tb)) 

1017 

1018 async def in_term() -> None: 

1019 async with in_terminal(): 

1020 # Print output. Similar to 'loop.default_exception_handler', 

1021 # but don't use logger. (This works better on Python 2.) 

1022 print("\nUnhandled exception in event loop:") 

1023 print(formatted_tb) 

1024 print("Exception {}".format(context.get("exception"))) 

1025 

1026 await _do_wait_for_enter("Press ENTER to continue...") 

1027 

1028 ensure_future(in_term()) 

1029 

1030 @contextmanager 

1031 def _enable_breakpointhook(self) -> Generator[None, None, None]: 

1032 """ 

1033 Install our custom breakpointhook for the duration of this context 

1034 manager. (We will only install the hook if no other custom hook was 

1035 set.) 

1036 """ 

1037 if sys.breakpointhook == sys.__breakpointhook__: 

1038 sys.breakpointhook = self._breakpointhook 

1039 

1040 try: 

1041 yield 

1042 finally: 

1043 sys.breakpointhook = sys.__breakpointhook__ 

1044 else: 

1045 yield 

1046 

1047 def _breakpointhook(self, *a: object, **kw: object) -> None: 

1048 """ 

1049 Breakpointhook which uses PDB, but ensures that the application is 

1050 hidden and input echoing is restored during each debugger dispatch. 

1051 

1052 This can be called from any thread. In any case, the application's 

1053 event loop will be blocked while the PDB input is displayed. The event 

1054 will continue after leaving the debugger. 

1055 """ 

1056 app = self 

1057 # Inline import on purpose. We don't want to import pdb, if not needed. 

1058 import pdb 

1059 from types import FrameType 

1060 

1061 TraceDispatch = Callable[[FrameType, str, Any], Any] 

1062 

1063 @contextmanager 

1064 def hide_app_from_eventloop_thread() -> Generator[None, None, None]: 

1065 """Stop application if `__breakpointhook__` is called from within 

1066 the App's event loop.""" 

1067 # Hide application. 

1068 app.renderer.erase() 

1069 

1070 # Detach input and dispatch to debugger. 

1071 with app.input.detach(): 

1072 with app.input.cooked_mode(): 

1073 yield 

1074 

1075 # Note: we don't render the application again here, because 

1076 # there's a good chance that there's a breakpoint on the next 

1077 # line. This paint/erase cycle would move the PDB prompt back 

1078 # to the middle of the screen. 

1079 

1080 @contextmanager 

1081 def hide_app_from_other_thread() -> Generator[None, None, None]: 

1082 """Stop application if `__breakpointhook__` is called from a 

1083 thread other than the App's event loop.""" 

1084 ready = threading.Event() 

1085 done = threading.Event() 

1086 

1087 async def in_loop() -> None: 

1088 # from .run_in_terminal import in_terminal 

1089 # async with in_terminal(): 

1090 # ready.set() 

1091 # await asyncio.get_running_loop().run_in_executor(None, done.wait) 

1092 # return 

1093 

1094 # Hide application. 

1095 app.renderer.erase() 

1096 

1097 # Detach input and dispatch to debugger. 

1098 with app.input.detach(): 

1099 with app.input.cooked_mode(): 

1100 ready.set() 

1101 # Here we block the App's event loop thread until the 

1102 # debugger resumes. We could have used `with 

1103 # run_in_terminal.in_terminal():` like the commented 

1104 # code above, but it seems to work better if we 

1105 # completely stop the main event loop while debugging. 

1106 done.wait() 

1107 

1108 self.create_background_task(in_loop()) 

1109 ready.wait() 

1110 try: 

1111 yield 

1112 finally: 

1113 done.set() 

1114 

1115 class CustomPdb(pdb.Pdb): 

1116 def trace_dispatch( 

1117 self, frame: FrameType, event: str, arg: Any 

1118 ) -> TraceDispatch: 

1119 if app._loop_thread is None: 

1120 return super().trace_dispatch(frame, event, arg) 

1121 

1122 if app._loop_thread == threading.current_thread(): 

1123 with hide_app_from_eventloop_thread(): 

1124 return super().trace_dispatch(frame, event, arg) 

1125 

1126 with hide_app_from_other_thread(): 

1127 return super().trace_dispatch(frame, event, arg) 

1128 

1129 frame = sys._getframe().f_back 

1130 CustomPdb(stdout=sys.__stdout__).set_trace(frame) 

1131 

1132 def create_background_task( 

1133 self, coroutine: Coroutine[Any, Any, None] 

1134 ) -> asyncio.Task[None]: 

1135 """ 

1136 Start a background task (coroutine) for the running application. When 

1137 the `Application` terminates, unfinished background tasks will be 

1138 cancelled. 

1139 

1140 Given that we still support Python versions before 3.11, we can't use 

1141 task groups (and exception groups), because of that, these background 

1142 tasks are not allowed to raise exceptions. If they do, we'll call the 

1143 default exception handler from the event loop. 

1144 

1145 If at some point, we have Python 3.11 as the minimum supported Python 

1146 version, then we can use a `TaskGroup` (with the lifetime of 

1147 `Application.run_async()`, and run run the background tasks in there. 

1148 

1149 This is not threadsafe. 

1150 """ 

1151 loop = self.loop or get_running_loop() 

1152 task: asyncio.Task[None] = loop.create_task(coroutine) 

1153 self._background_tasks.add(task) 

1154 

1155 task.add_done_callback(self._on_background_task_done) 

1156 return task 

1157 

1158 def _on_background_task_done(self, task: asyncio.Task[None]) -> None: 

1159 """ 

1160 Called when a background task completes. Remove it from 

1161 `_background_tasks`, and handle exceptions if any. 

1162 """ 

1163 self._background_tasks.discard(task) 

1164 

1165 if task.cancelled(): 

1166 return 

1167 

1168 exc = task.exception() 

1169 if exc is not None: 

1170 get_running_loop().call_exception_handler( 

1171 { 

1172 "message": f"prompt_toolkit.Application background task {task!r} " 

1173 "raised an unexpected exception.", 

1174 "exception": exc, 

1175 "task": task, 

1176 } 

1177 ) 

1178 

1179 async def cancel_and_wait_for_background_tasks(self) -> None: 

1180 """ 

1181 Cancel all background tasks, and wait for the cancellation to complete. 

1182 If any of the background tasks raised an exception, this will also 

1183 propagate the exception. 

1184 

1185 (If we had nurseries like Trio, this would be the `__aexit__` of a 

1186 nursery.) 

1187 """ 

1188 for task in self._background_tasks: 

1189 task.cancel() 

1190 

1191 # Wait until the cancellation of the background tasks completes. 

1192 # `asyncio.wait()` does not propagate exceptions raised within any of 

1193 # these tasks, which is what we want. Otherwise, we can't distinguish 

1194 # between a `CancelledError` raised in this task because it got 

1195 # cancelled, and a `CancelledError` raised on this `await` checkpoint, 

1196 # because *we* got cancelled during the teardown of the application. 

1197 # (If we get cancelled here, then it's important to not suppress the 

1198 # `CancelledError`, and have it propagate.) 

1199 # NOTE: Currently, if we get cancelled at this point then we can't wait 

1200 # for the cancellation to complete (in the future, we should be 

1201 # using anyio or Python's 3.11 TaskGroup.) 

1202 # Also, if we had exception groups, we could propagate an 

1203 # `ExceptionGroup` if something went wrong here. Right now, we 

1204 # don't propagate exceptions, but have them printed in 

1205 # `_on_background_task_done`. 

1206 if len(self._background_tasks) > 0: 

1207 await asyncio.wait( 

1208 self._background_tasks, timeout=None, return_when=asyncio.ALL_COMPLETED 

1209 ) 

1210 

1211 async def _poll_output_size(self) -> None: 

1212 """ 

1213 Coroutine for polling the terminal dimensions. 

1214 

1215 Useful for situations where `attach_winch_signal_handler` is not sufficient: 

1216 - If we are not running in the main thread. 

1217 - On Windows. 

1218 """ 

1219 size: Size | None = None 

1220 interval = self.terminal_size_polling_interval 

1221 

1222 if interval is None: 

1223 return 

1224 

1225 while True: 

1226 await asyncio.sleep(interval) 

1227 new_size = self.output.get_size() 

1228 

1229 if size is not None and new_size != size: 

1230 self._on_resize() 

1231 size = new_size 

1232 

1233 def cpr_not_supported_callback(self) -> None: 

1234 """ 

1235 Called when we don't receive the cursor position response in time. 

1236 """ 

1237 if not self.output.responds_to_cpr: 

1238 return # We know about this already. 

1239 

1240 def in_terminal() -> None: 

1241 self.output.write( 

1242 "WARNING: your terminal doesn't support cursor position requests (CPR).\r\n" 

1243 ) 

1244 self.output.flush() 

1245 

1246 run_in_terminal(in_terminal) 

1247 

1248 @overload 

1249 def exit(self) -> None: 

1250 "Exit without arguments." 

1251 

1252 @overload 

1253 def exit(self, *, result: _AppResult, style: str = "") -> None: 

1254 "Exit with `_AppResult`." 

1255 

1256 @overload 

1257 def exit( 

1258 self, *, exception: BaseException | type[BaseException], style: str = "" 

1259 ) -> None: 

1260 "Exit with exception." 

1261 

1262 def exit( 

1263 self, 

1264 result: _AppResult | None = None, 

1265 exception: BaseException | type[BaseException] | None = None, 

1266 style: str = "", 

1267 ) -> None: 

1268 """ 

1269 Exit application. 

1270 

1271 .. note:: 

1272 

1273 If `Application.exit` is called before `Application.run()` is 

1274 called, then the `Application` won't exit (because the 

1275 `Application.future` doesn't correspond to the current run). Use a 

1276 `pre_run` hook and an event to synchronize the closing if there's a 

1277 chance this can happen. 

1278 

1279 :param result: Set this result for the application. 

1280 :param exception: Set this exception as the result for an application. For 

1281 a prompt, this is often `EOFError` or `KeyboardInterrupt`. 

1282 :param style: Apply this style on the whole content when quitting, 

1283 often this is 'class:exiting' for a prompt. (Used when 

1284 `erase_when_done` is not set.) 

1285 """ 

1286 assert result is None or exception is None 

1287 

1288 if self.future is None: 

1289 raise Exception("Application is not running. Application.exit() failed.") 

1290 

1291 if self.future.done(): 

1292 raise Exception("Return value already set. Application.exit() failed.") 

1293 

1294 self.exit_style = style 

1295 

1296 if exception is not None: 

1297 self.future.set_exception(exception) 

1298 else: 

1299 self.future.set_result(cast(_AppResult, result)) 

1300 

1301 def _request_absolute_cursor_position(self) -> None: 

1302 """ 

1303 Send CPR request. 

1304 """ 

1305 # Note: only do this if the input queue is not empty, and a return 

1306 # value has not been set. Otherwise, we won't be able to read the 

1307 # response anyway. 

1308 if not self.key_processor.input_queue and not self.is_done: 

1309 self.renderer.request_absolute_cursor_position() 

1310 

1311 async def run_system_command( 

1312 self, 

1313 command: str, 

1314 wait_for_enter: bool = True, 

1315 display_before_text: AnyFormattedText = "", 

1316 wait_text: str = "Press ENTER to continue...", 

1317 ) -> None: 

1318 """ 

1319 Run system command (While hiding the prompt. When finished, all the 

1320 output will scroll above the prompt.) 

1321 

1322 :param command: Shell command to be executed. 

1323 :param wait_for_enter: FWait for the user to press enter, when the 

1324 command is finished. 

1325 :param display_before_text: If given, text to be displayed before the 

1326 command executes. 

1327 :return: A `Future` object. 

1328 """ 

1329 async with in_terminal(): 

1330 # Try to use the same input/output file descriptors as the one, 

1331 # used to run this application. 

1332 try: 

1333 input_fd = self.input.fileno() 

1334 except AttributeError: 

1335 input_fd = sys.stdin.fileno() 

1336 try: 

1337 output_fd = self.output.fileno() 

1338 except AttributeError: 

1339 output_fd = sys.stdout.fileno() 

1340 

1341 # Run sub process. 

1342 def run_command() -> None: 

1343 self.print_text(display_before_text) 

1344 p = Popen(command, shell=True, stdin=input_fd, stdout=output_fd) 

1345 p.wait() 

1346 

1347 await run_in_executor_with_context(run_command) 

1348 

1349 # Wait for the user to press enter. 

1350 if wait_for_enter: 

1351 await _do_wait_for_enter(wait_text) 

1352 

1353 def suspend_to_background(self, suspend_group: bool = True) -> None: 

1354 """ 

1355 (Not thread safe -- to be called from inside the key bindings.) 

1356 Suspend process. 

1357 

1358 :param suspend_group: When true, suspend the whole process group. 

1359 (This is the default, and probably what you want.) 

1360 """ 

1361 # Only suspend when the operating system supports it. 

1362 # (Not on Windows.) 

1363 if _SIGTSTP is not None: 

1364 

1365 def run() -> None: 

1366 signal = cast(int, _SIGTSTP) 

1367 # Send `SIGTSTP` to own process. 

1368 # This will cause it to suspend. 

1369 

1370 # Usually we want the whole process group to be suspended. This 

1371 # handles the case when input is piped from another process. 

1372 if suspend_group: 

1373 os.kill(0, signal) 

1374 else: 

1375 os.kill(os.getpid(), signal) 

1376 

1377 run_in_terminal(run) 

1378 

1379 def print_text( 

1380 self, text: AnyFormattedText, style: BaseStyle | None = None 

1381 ) -> None: 

1382 """ 

1383 Print a list of (style_str, text) tuples to the output. 

1384 (When the UI is running, this method has to be called through 

1385 `run_in_terminal`, otherwise it will destroy the UI.) 

1386 

1387 :param text: List of ``(style_str, text)`` tuples. 

1388 :param style: Style class to use. Defaults to the active style in the CLI. 

1389 """ 

1390 print_formatted_text( 

1391 output=self.output, 

1392 formatted_text=text, 

1393 style=style or self._merged_style, 

1394 color_depth=self.color_depth, 

1395 style_transformation=self.style_transformation, 

1396 ) 

1397 

1398 @property 

1399 def is_running(self) -> bool: 

1400 "`True` when the application is currently active/running." 

1401 return self._is_running 

1402 

1403 @property 

1404 def is_done(self) -> bool: 

1405 if self.future: 

1406 return self.future.done() 

1407 return False 

1408 

1409 def get_used_style_strings(self) -> list[str]: 

1410 """ 

1411 Return a list of used style strings. This is helpful for debugging, and 

1412 for writing a new `Style`. 

1413 """ 

1414 attrs_for_style = self.renderer._attrs_for_style 

1415 

1416 if attrs_for_style: 

1417 return sorted( 

1418 re.sub(r"\s+", " ", style_str).strip() 

1419 for style_str in attrs_for_style.keys() 

1420 ) 

1421 

1422 return [] 

1423 

1424 

1425class _CombinedRegistry(KeyBindingsBase): 

1426 """ 

1427 The `KeyBindings` of key bindings for a `Application`. 

1428 This merges the global key bindings with the one of the current user 

1429 control. 

1430 """ 

1431 

1432 def __init__(self, app: Application[_AppResult]) -> None: 

1433 self.app = app 

1434 self._cache: SimpleCache[ 

1435 tuple[Window, frozenset[UIControl]], KeyBindingsBase 

1436 ] = SimpleCache() 

1437 

1438 @property 

1439 def _version(self) -> Hashable: 

1440 """Not needed - this object is not going to be wrapped in another 

1441 KeyBindings object.""" 

1442 raise NotImplementedError 

1443 

1444 @property 

1445 def bindings(self) -> list[Binding]: 

1446 """Not needed - this object is not going to be wrapped in another 

1447 KeyBindings object.""" 

1448 raise NotImplementedError 

1449 

1450 def _create_key_bindings( 

1451 self, current_window: Window, other_controls: list[UIControl] 

1452 ) -> KeyBindingsBase: 

1453 """ 

1454 Create a `KeyBindings` object that merges the `KeyBindings` from the 

1455 `UIControl` with all the parent controls and the global key bindings. 

1456 """ 

1457 key_bindings = [] 

1458 collected_containers = set() 

1459 

1460 # Collect key bindings from currently focused control and all parent 

1461 # controls. Don't include key bindings of container parent controls. 

1462 container: Container = current_window 

1463 while True: 

1464 collected_containers.add(container) 

1465 kb = container.get_key_bindings() 

1466 if kb is not None: 

1467 key_bindings.append(kb) 

1468 

1469 if container.is_modal(): 

1470 break 

1471 

1472 parent = self.app.layout.get_parent(container) 

1473 if parent is None: 

1474 break 

1475 else: 

1476 container = parent 

1477 

1478 # Include global bindings (starting at the top-model container). 

1479 for c in walk(container): 

1480 if c not in collected_containers: 

1481 kb = c.get_key_bindings() 

1482 if kb is not None: 

1483 key_bindings.append(GlobalOnlyKeyBindings(kb)) 

1484 

1485 # Add App key bindings 

1486 if self.app.key_bindings: 

1487 key_bindings.append(self.app.key_bindings) 

1488 

1489 # Add mouse bindings. 

1490 key_bindings.append( 

1491 ConditionalKeyBindings( 

1492 self.app._page_navigation_bindings, 

1493 self.app.enable_page_navigation_bindings, 

1494 ) 

1495 ) 

1496 key_bindings.append(self.app._default_bindings) 

1497 

1498 # Reverse this list. The current control's key bindings should come 

1499 # last. They need priority. 

1500 key_bindings = key_bindings[::-1] 

1501 

1502 return merge_key_bindings(key_bindings) 

1503 

1504 @property 

1505 def _key_bindings(self) -> KeyBindingsBase: 

1506 current_window = self.app.layout.current_window 

1507 other_controls = list(self.app.layout.find_all_controls()) 

1508 key = current_window, frozenset(other_controls) 

1509 

1510 return self._cache.get( 

1511 key, lambda: self._create_key_bindings(current_window, other_controls) 

1512 ) 

1513 

1514 def get_bindings_for_keys(self, keys: KeysTuple) -> list[Binding]: 

1515 return self._key_bindings.get_bindings_for_keys(keys) 

1516 

1517 def get_bindings_starting_with_keys(self, keys: KeysTuple) -> list[Binding]: 

1518 return self._key_bindings.get_bindings_starting_with_keys(keys) 

1519 

1520 

1521async def _do_wait_for_enter(wait_text: AnyFormattedText) -> None: 

1522 """ 

1523 Create a sub application to wait for the enter key press. 

1524 This has two advantages over using 'input'/'raw_input': 

1525 - This will share the same input/output I/O. 

1526 - This doesn't block the event loop. 

1527 """ 

1528 from prompt_toolkit.shortcuts import PromptSession 

1529 

1530 key_bindings = KeyBindings() 

1531 

1532 @key_bindings.add("enter") 

1533 def _ok(event: E) -> None: 

1534 event.app.exit() 

1535 

1536 @key_bindings.add(Keys.Any) 

1537 def _ignore(event: E) -> None: 

1538 "Disallow typing." 

1539 pass 

1540 

1541 session: PromptSession[None] = PromptSession( 

1542 message=wait_text, key_bindings=key_bindings 

1543 ) 

1544 try: 

1545 await session.app.run_async() 

1546 except KeyboardInterrupt: 

1547 pass # Control-c pressed. Don't propagate this error. 

1548 

1549 

1550@contextmanager 

1551def attach_winch_signal_handler( 

1552 handler: Callable[[], None], 

1553) -> Generator[None, None, None]: 

1554 """ 

1555 Attach the given callback as a WINCH signal handler within the context 

1556 manager. Restore the original signal handler when done. 

1557 

1558 The `Application.run` method will register SIGWINCH, so that it will 

1559 properly repaint when the terminal window resizes. However, using 

1560 `run_in_terminal`, we can temporarily send an application to the 

1561 background, and run an other app in between, which will then overwrite the 

1562 SIGWINCH. This is why it's important to restore the handler when the app 

1563 terminates. 

1564 """ 

1565 # The tricky part here is that signals are registered in the Unix event 

1566 # loop with a wakeup fd, but another application could have registered 

1567 # signals using signal.signal directly. For now, the implementation is 

1568 # hard-coded for the `asyncio.unix_events._UnixSelectorEventLoop`. 

1569 

1570 # No WINCH? Then don't do anything. 

1571 sigwinch = getattr(signal, "SIGWINCH", None) 

1572 if sigwinch is None or not in_main_thread(): 

1573 yield 

1574 return 

1575 

1576 # Keep track of the previous handler. 

1577 # (Only UnixSelectorEventloop has `_signal_handlers`.) 

1578 loop = get_running_loop() 

1579 previous_winch_handler = getattr(loop, "_signal_handlers", {}).get(sigwinch) 

1580 

1581 try: 

1582 loop.add_signal_handler(sigwinch, handler) 

1583 yield 

1584 finally: 

1585 # Restore the previous signal handler. 

1586 loop.remove_signal_handler(sigwinch) 

1587 if previous_winch_handler is not None: 

1588 loop.add_signal_handler( 

1589 sigwinch, 

1590 previous_winch_handler._callback, 

1591 *previous_winch_handler._args, 

1592 ) 

1593 

1594 

1595@contextmanager 

1596def _restore_sigint_from_ctypes() -> Generator[None, None, None]: 

1597 # The following functions are part of the stable ABI since python 3.2 

1598 # See: https://docs.python.org/3/c-api/sys.html#c.PyOS_getsig 

1599 # Inline import: these are not available on Pypy. 

1600 try: 

1601 from ctypes import c_int, c_void_p, pythonapi 

1602 except ImportError: 

1603 # Any of the above imports don't exist? Don't do anything here. 

1604 yield 

1605 return 

1606 

1607 # PyOS_sighandler_t PyOS_getsig(int i) 

1608 pythonapi.PyOS_getsig.restype = c_void_p 

1609 pythonapi.PyOS_getsig.argtypes = (c_int,) 

1610 

1611 # PyOS_sighandler_t PyOS_setsig(int i, PyOS_sighandler_t h) 

1612 pythonapi.PyOS_setsig.restype = c_void_p 

1613 pythonapi.PyOS_setsig.argtypes = ( 

1614 c_int, 

1615 c_void_p, 

1616 ) 

1617 

1618 sigint = signal.getsignal(signal.SIGINT) 

1619 sigint_os = pythonapi.PyOS_getsig(signal.SIGINT) 

1620 

1621 try: 

1622 yield 

1623 finally: 

1624 signal.signal(signal.SIGINT, sigint) 

1625 pythonapi.PyOS_setsig(signal.SIGINT, sigint_os)