Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/application/application.py: 18%
612 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1from __future__ import annotations
3import asyncio
4import contextvars
5import os
6import re
7import signal
8import sys
9import threading
10import time
11from asyncio import (
12 AbstractEventLoop,
13 Future,
14 Task,
15 ensure_future,
16 get_running_loop,
17 sleep,
18)
19from contextlib import ExitStack, contextmanager
20from subprocess import Popen
21from traceback import format_tb
22from typing import (
23 Any,
24 Callable,
25 Coroutine,
26 Generator,
27 Generic,
28 Hashable,
29 Iterable,
30 Iterator,
31 TypeVar,
32 cast,
33 overload,
34)
36from prompt_toolkit.buffer import Buffer
37from prompt_toolkit.cache import SimpleCache
38from prompt_toolkit.clipboard import Clipboard, InMemoryClipboard
39from prompt_toolkit.cursor_shapes import AnyCursorShapeConfig, to_cursor_shape_config
40from prompt_toolkit.data_structures import Size
41from prompt_toolkit.enums import EditingMode
42from prompt_toolkit.eventloop import (
43 InputHook,
44 get_traceback_from_context,
45 new_eventloop_with_inputhook,
46 run_in_executor_with_context,
47)
48from prompt_toolkit.eventloop.utils import call_soon_threadsafe
49from prompt_toolkit.filters import Condition, Filter, FilterOrBool, to_filter
50from prompt_toolkit.formatted_text import AnyFormattedText
51from prompt_toolkit.input.base import Input
52from prompt_toolkit.input.typeahead import get_typeahead, store_typeahead
53from prompt_toolkit.key_binding.bindings.page_navigation import (
54 load_page_navigation_bindings,
55)
56from prompt_toolkit.key_binding.defaults import load_key_bindings
57from prompt_toolkit.key_binding.emacs_state import EmacsState
58from prompt_toolkit.key_binding.key_bindings import (
59 Binding,
60 ConditionalKeyBindings,
61 GlobalOnlyKeyBindings,
62 KeyBindings,
63 KeyBindingsBase,
64 KeysTuple,
65 merge_key_bindings,
66)
67from prompt_toolkit.key_binding.key_processor import KeyPressEvent, KeyProcessor
68from prompt_toolkit.key_binding.vi_state import ViState
69from prompt_toolkit.keys import Keys
70from prompt_toolkit.layout.containers import Container, Window
71from prompt_toolkit.layout.controls import BufferControl, UIControl
72from prompt_toolkit.layout.dummy import create_dummy_layout
73from prompt_toolkit.layout.layout import Layout, walk
74from prompt_toolkit.output import ColorDepth, Output
75from prompt_toolkit.renderer import Renderer, print_formatted_text
76from prompt_toolkit.search import SearchState
77from prompt_toolkit.styles import (
78 BaseStyle,
79 DummyStyle,
80 DummyStyleTransformation,
81 DynamicStyle,
82 StyleTransformation,
83 default_pygments_style,
84 default_ui_style,
85 merge_styles,
86)
87from prompt_toolkit.utils import Event, in_main_thread
89from .current import get_app_session, set_app
90from .run_in_terminal import in_terminal, run_in_terminal
92__all__ = [
93 "Application",
94]
97E = KeyPressEvent
98_AppResult = TypeVar("_AppResult")
99ApplicationEventHandler = Callable[["Application[_AppResult]"], None]
101_SIGWINCH = getattr(signal, "SIGWINCH", None)
102_SIGTSTP = getattr(signal, "SIGTSTP", None)
105class Application(Generic[_AppResult]):
106 """
107 The main Application class!
108 This glues everything together.
110 :param layout: A :class:`~prompt_toolkit.layout.Layout` instance.
111 :param key_bindings:
112 :class:`~prompt_toolkit.key_binding.KeyBindingsBase` instance for
113 the key bindings.
114 :param clipboard: :class:`~prompt_toolkit.clipboard.Clipboard` to use.
115 :param full_screen: When True, run the application on the alternate screen buffer.
116 :param color_depth: Any :class:`~.ColorDepth` value, a callable that
117 returns a :class:`~.ColorDepth` or `None` for default.
118 :param erase_when_done: (bool) Clear the application output when it finishes.
119 :param reverse_vi_search_direction: Normally, in Vi mode, a '/' searches
120 forward and a '?' searches backward. In Readline mode, this is usually
121 reversed.
122 :param min_redraw_interval: Number of seconds to wait between redraws. Use
123 this for applications where `invalidate` is called a lot. This could cause
124 a lot of terminal output, which some terminals are not able to process.
126 `None` means that every `invalidate` will be scheduled right away
127 (which is usually fine).
129 When one `invalidate` is called, but a scheduled redraw of a previous
130 `invalidate` call has not been executed yet, nothing will happen in any
131 case.
133 :param max_render_postpone_time: When there is high CPU (a lot of other
134 scheduled calls), postpone the rendering max x seconds. '0' means:
135 don't postpone. '.5' means: try to draw at least twice a second.
137 :param refresh_interval: Automatically invalidate the UI every so many
138 seconds. When `None` (the default), only invalidate when `invalidate`
139 has been called.
141 :param terminal_size_polling_interval: Poll the terminal size every so many
142 seconds. Useful if the applications runs in a thread other then then
143 main thread where SIGWINCH can't be handled, or on Windows.
145 Filters:
147 :param mouse_support: (:class:`~prompt_toolkit.filters.Filter` or
148 boolean). When True, enable mouse support.
149 :param paste_mode: :class:`~prompt_toolkit.filters.Filter` or boolean.
150 :param editing_mode: :class:`~prompt_toolkit.enums.EditingMode`.
152 :param enable_page_navigation_bindings: When `True`, enable the page
153 navigation key bindings. These include both Emacs and Vi bindings like
154 page-up, page-down and so on to scroll through pages. Mostly useful for
155 creating an editor or other full screen applications. Probably, you
156 don't want this for the implementation of a REPL. By default, this is
157 enabled if `full_screen` is set.
159 Callbacks (all of these should accept an
160 :class:`~prompt_toolkit.application.Application` object as input.)
162 :param on_reset: Called during reset.
163 :param on_invalidate: Called when the UI has been invalidated.
164 :param before_render: Called right before rendering.
165 :param after_render: Called right after rendering.
167 I/O:
168 (Note that the preferred way to change the input/output is by creating an
169 `AppSession` with the required input/output objects. If you need multiple
170 applications running at the same time, you have to create a separate
171 `AppSession` using a `with create_app_session():` block.
173 :param input: :class:`~prompt_toolkit.input.Input` instance.
174 :param output: :class:`~prompt_toolkit.output.Output` instance. (Probably
175 Vt100_Output or Win32Output.)
177 Usage:
179 app = Application(...)
180 app.run()
182 # Or
183 await app.run_async()
184 """
186 def __init__(
187 self,
188 layout: Layout | None = None,
189 style: BaseStyle | None = None,
190 include_default_pygments_style: FilterOrBool = True,
191 style_transformation: StyleTransformation | None = None,
192 key_bindings: KeyBindingsBase | None = None,
193 clipboard: Clipboard | None = None,
194 full_screen: bool = False,
195 color_depth: (ColorDepth | Callable[[], ColorDepth | None] | None) = None,
196 mouse_support: FilterOrBool = False,
197 enable_page_navigation_bindings: None
198 | (FilterOrBool) = None, # Can be None, True or False.
199 paste_mode: FilterOrBool = False,
200 editing_mode: EditingMode = EditingMode.EMACS,
201 erase_when_done: bool = False,
202 reverse_vi_search_direction: FilterOrBool = False,
203 min_redraw_interval: float | int | None = None,
204 max_render_postpone_time: float | int | None = 0.01,
205 refresh_interval: float | None = None,
206 terminal_size_polling_interval: float | None = 0.5,
207 cursor: AnyCursorShapeConfig = None,
208 on_reset: ApplicationEventHandler[_AppResult] | None = None,
209 on_invalidate: ApplicationEventHandler[_AppResult] | None = None,
210 before_render: ApplicationEventHandler[_AppResult] | None = None,
211 after_render: ApplicationEventHandler[_AppResult] | None = None,
212 # I/O.
213 input: Input | None = None,
214 output: Output | None = None,
215 ) -> None:
216 # If `enable_page_navigation_bindings` is not specified, enable it in
217 # case of full screen applications only. This can be overridden by the user.
218 if enable_page_navigation_bindings is None:
219 enable_page_navigation_bindings = Condition(lambda: self.full_screen)
221 paste_mode = to_filter(paste_mode)
222 mouse_support = to_filter(mouse_support)
223 reverse_vi_search_direction = to_filter(reverse_vi_search_direction)
224 enable_page_navigation_bindings = to_filter(enable_page_navigation_bindings)
225 include_default_pygments_style = to_filter(include_default_pygments_style)
227 if layout is None:
228 layout = create_dummy_layout()
230 if style_transformation is None:
231 style_transformation = DummyStyleTransformation()
233 self.style = style
234 self.style_transformation = style_transformation
236 # Key bindings.
237 self.key_bindings = key_bindings
238 self._default_bindings = load_key_bindings()
239 self._page_navigation_bindings = load_page_navigation_bindings()
241 self.layout = layout
242 self.clipboard = clipboard or InMemoryClipboard()
243 self.full_screen: bool = full_screen
244 self._color_depth = color_depth
245 self.mouse_support = mouse_support
247 self.paste_mode = paste_mode
248 self.editing_mode = editing_mode
249 self.erase_when_done = erase_when_done
250 self.reverse_vi_search_direction = reverse_vi_search_direction
251 self.enable_page_navigation_bindings = enable_page_navigation_bindings
252 self.min_redraw_interval = min_redraw_interval
253 self.max_render_postpone_time = max_render_postpone_time
254 self.refresh_interval = refresh_interval
255 self.terminal_size_polling_interval = terminal_size_polling_interval
257 self.cursor = to_cursor_shape_config(cursor)
259 # Events.
260 self.on_invalidate = Event(self, on_invalidate)
261 self.on_reset = Event(self, on_reset)
262 self.before_render = Event(self, before_render)
263 self.after_render = Event(self, after_render)
265 # I/O.
266 session = get_app_session()
267 self.output = output or session.output
268 self.input = input or session.input
270 # List of 'extra' functions to execute before a Application.run.
271 self.pre_run_callables: list[Callable[[], None]] = []
273 self._is_running = False
274 self.future: Future[_AppResult] | None = None
275 self.loop: AbstractEventLoop | None = None
276 self._loop_thread: threading.Thread | None = None
277 self.context: contextvars.Context | None = None
279 #: Quoted insert. This flag is set if we go into quoted insert mode.
280 self.quoted_insert = False
282 #: Vi state. (For Vi key bindings.)
283 self.vi_state = ViState()
284 self.emacs_state = EmacsState()
286 #: When to flush the input (For flushing escape keys.) This is important
287 #: on terminals that use vt100 input. We can't distinguish the escape
288 #: key from for instance the left-arrow key, if we don't know what follows
289 #: after "\x1b". This little timer will consider "\x1b" to be escape if
290 #: nothing did follow in this time span.
291 #: This seems to work like the `ttimeoutlen` option in Vim.
292 self.ttimeoutlen = 0.5 # Seconds.
294 #: Like Vim's `timeoutlen` option. This can be `None` or a float. For
295 #: instance, suppose that we have a key binding AB and a second key
296 #: binding A. If the uses presses A and then waits, we don't handle
297 #: this binding yet (unless it was marked 'eager'), because we don't
298 #: know what will follow. This timeout is the maximum amount of time
299 #: that we wait until we call the handlers anyway. Pass `None` to
300 #: disable this timeout.
301 self.timeoutlen = 1.0
303 #: The `Renderer` instance.
304 # Make sure that the same stdout is used, when a custom renderer has been passed.
305 self._merged_style = self._create_merged_style(include_default_pygments_style)
307 self.renderer = Renderer(
308 self._merged_style,
309 self.output,
310 full_screen=full_screen,
311 mouse_support=mouse_support,
312 cpr_not_supported_callback=self.cpr_not_supported_callback,
313 )
315 #: Render counter. This one is increased every time the UI is rendered.
316 #: It can be used as a key for caching certain information during one
317 #: rendering.
318 self.render_counter = 0
320 # Invalidate flag. When 'True', a repaint has been scheduled.
321 self._invalidated = False
322 self._invalidate_events: list[
323 Event[object]
324 ] = [] # Collection of 'invalidate' Event objects.
325 self._last_redraw_time = 0.0 # Unix timestamp of last redraw. Used when
326 # `min_redraw_interval` is given.
328 #: The `InputProcessor` instance.
329 self.key_processor = KeyProcessor(_CombinedRegistry(self))
331 # If `run_in_terminal` was called. This will point to a `Future` what will be
332 # set at the point when the previous run finishes.
333 self._running_in_terminal = False
334 self._running_in_terminal_f: Future[None] | None = None
336 # Trigger initialize callback.
337 self.reset()
339 def _create_merged_style(self, include_default_pygments_style: Filter) -> BaseStyle:
340 """
341 Create a `Style` object that merges the default UI style, the default
342 pygments style, and the custom user style.
343 """
344 dummy_style = DummyStyle()
345 pygments_style = default_pygments_style()
347 @DynamicStyle
348 def conditional_pygments_style() -> BaseStyle:
349 if include_default_pygments_style():
350 return pygments_style
351 else:
352 return dummy_style
354 return merge_styles(
355 [
356 default_ui_style(),
357 conditional_pygments_style,
358 DynamicStyle(lambda: self.style),
359 ]
360 )
362 @property
363 def color_depth(self) -> ColorDepth:
364 """
365 The active :class:`.ColorDepth`.
367 The current value is determined as follows:
369 - If a color depth was given explicitly to this application, use that
370 value.
371 - Otherwise, fall back to the color depth that is reported by the
372 :class:`.Output` implementation. If the :class:`.Output` class was
373 created using `output.defaults.create_output`, then this value is
374 coming from the $PROMPT_TOOLKIT_COLOR_DEPTH environment variable.
375 """
376 depth = self._color_depth
378 if callable(depth):
379 depth = depth()
381 if depth is None:
382 depth = self.output.get_default_color_depth()
384 return depth
386 @property
387 def current_buffer(self) -> Buffer:
388 """
389 The currently focused :class:`~.Buffer`.
391 (This returns a dummy :class:`.Buffer` when none of the actual buffers
392 has the focus. In this case, it's really not practical to check for
393 `None` values or catch exceptions every time.)
394 """
395 return self.layout.current_buffer or Buffer(
396 name="dummy-buffer"
397 ) # Dummy buffer.
399 @property
400 def current_search_state(self) -> SearchState:
401 """
402 Return the current :class:`.SearchState`. (The one for the focused
403 :class:`.BufferControl`.)
404 """
405 ui_control = self.layout.current_control
406 if isinstance(ui_control, BufferControl):
407 return ui_control.search_state
408 else:
409 return SearchState() # Dummy search state. (Don't return None!)
411 def reset(self) -> None:
412 """
413 Reset everything, for reading the next input.
414 """
415 # Notice that we don't reset the buffers. (This happens just before
416 # returning, and when we have multiple buffers, we clearly want the
417 # content in the other buffers to remain unchanged between several
418 # calls of `run`. (And the same is true for the focus stack.)
420 self.exit_style = ""
422 self._background_tasks: set[Task[None]] = set()
424 self.renderer.reset()
425 self.key_processor.reset()
426 self.layout.reset()
427 self.vi_state.reset()
428 self.emacs_state.reset()
430 # Trigger reset event.
431 self.on_reset.fire()
433 # Make sure that we have a 'focusable' widget focused.
434 # (The `Layout` class can't determine this.)
435 layout = self.layout
437 if not layout.current_control.is_focusable():
438 for w in layout.find_all_windows():
439 if w.content.is_focusable():
440 layout.current_window = w
441 break
443 def invalidate(self) -> None:
444 """
445 Thread safe way of sending a repaint trigger to the input event loop.
446 """
447 if not self._is_running:
448 # Don't schedule a redraw if we're not running.
449 # Otherwise, `get_running_loop()` in `call_soon_threadsafe` can fail.
450 # See: https://github.com/dbcli/mycli/issues/797
451 return
453 # `invalidate()` called if we don't have a loop yet (not running?), or
454 # after the event loop was closed.
455 if self.loop is None or self.loop.is_closed():
456 return
458 # Never schedule a second redraw, when a previous one has not yet been
459 # executed. (This should protect against other threads calling
460 # 'invalidate' many times, resulting in 100% CPU.)
461 if self._invalidated:
462 return
463 else:
464 self._invalidated = True
466 # Trigger event.
467 self.loop.call_soon_threadsafe(self.on_invalidate.fire)
469 def redraw() -> None:
470 self._invalidated = False
471 self._redraw()
473 def schedule_redraw() -> None:
474 call_soon_threadsafe(
475 redraw, max_postpone_time=self.max_render_postpone_time, loop=self.loop
476 )
478 if self.min_redraw_interval:
479 # When a minimum redraw interval is set, wait minimum this amount
480 # of time between redraws.
481 diff = time.time() - self._last_redraw_time
482 if diff < self.min_redraw_interval:
484 async def redraw_in_future() -> None:
485 await sleep(cast(float, self.min_redraw_interval) - diff)
486 schedule_redraw()
488 self.loop.call_soon_threadsafe(
489 lambda: self.create_background_task(redraw_in_future())
490 )
491 else:
492 schedule_redraw()
493 else:
494 schedule_redraw()
496 @property
497 def invalidated(self) -> bool:
498 "True when a redraw operation has been scheduled."
499 return self._invalidated
501 def _redraw(self, render_as_done: bool = False) -> None:
502 """
503 Render the command line again. (Not thread safe!) (From other threads,
504 or if unsure, use :meth:`.Application.invalidate`.)
506 :param render_as_done: make sure to put the cursor after the UI.
507 """
509 def run_in_context() -> None:
510 # Only draw when no sub application was started.
511 if self._is_running and not self._running_in_terminal:
512 if self.min_redraw_interval:
513 self._last_redraw_time = time.time()
515 # Render
516 self.render_counter += 1
517 self.before_render.fire()
519 if render_as_done:
520 if self.erase_when_done:
521 self.renderer.erase()
522 else:
523 # Draw in 'done' state and reset renderer.
524 self.renderer.render(self, self.layout, is_done=render_as_done)
525 else:
526 self.renderer.render(self, self.layout)
528 self.layout.update_parents_relations()
530 # Fire render event.
531 self.after_render.fire()
533 self._update_invalidate_events()
535 # NOTE: We want to make sure this Application is the active one. The
536 # invalidate function is often called from a context where this
537 # application is not the active one. (Like the
538 # `PromptSession._auto_refresh_context`).
539 # We copy the context in case the context was already active, to
540 # prevent RuntimeErrors. (The rendering is not supposed to change
541 # any context variables.)
542 if self.context is not None:
543 self.context.copy().run(run_in_context)
545 def _start_auto_refresh_task(self) -> None:
546 """
547 Start a while/true loop in the background for automatic invalidation of
548 the UI.
549 """
550 if self.refresh_interval is not None and self.refresh_interval != 0:
552 async def auto_refresh(refresh_interval: float) -> None:
553 while True:
554 await sleep(refresh_interval)
555 self.invalidate()
557 self.create_background_task(auto_refresh(self.refresh_interval))
559 def _update_invalidate_events(self) -> None:
560 """
561 Make sure to attach 'invalidate' handlers to all invalidate events in
562 the UI.
563 """
564 # Remove all the original event handlers. (Components can be removed
565 # from the UI.)
566 for ev in self._invalidate_events:
567 ev -= self._invalidate_handler
569 # Gather all new events.
570 # (All controls are able to invalidate themselves.)
571 def gather_events() -> Iterable[Event[object]]:
572 for c in self.layout.find_all_controls():
573 yield from c.get_invalidate_events()
575 self._invalidate_events = list(gather_events())
577 for ev in self._invalidate_events:
578 ev += self._invalidate_handler
580 def _invalidate_handler(self, sender: object) -> None:
581 """
582 Handler for invalidate events coming from UIControls.
584 (This handles the difference in signature between event handler and
585 `self.invalidate`. It also needs to be a method -not a nested
586 function-, so that we can remove it again .)
587 """
588 self.invalidate()
590 def _on_resize(self) -> None:
591 """
592 When the window size changes, we erase the current output and request
593 again the cursor position. When the CPR answer arrives, the output is
594 drawn again.
595 """
596 # Erase, request position (when cursor is at the start position)
597 # and redraw again. -- The order is important.
598 self.renderer.erase(leave_alternate_screen=False)
599 self._request_absolute_cursor_position()
600 self._redraw()
602 def _pre_run(self, pre_run: Callable[[], None] | None = None) -> None:
603 """
604 Called during `run`.
606 `self.future` should be set to the new future at the point where this
607 is called in order to avoid data races. `pre_run` can be used to set a
608 `threading.Event` to synchronize with UI termination code, running in
609 another thread that would call `Application.exit`. (See the progress
610 bar code for an example.)
611 """
612 if pre_run:
613 pre_run()
615 # Process registered "pre_run_callables" and clear list.
616 for c in self.pre_run_callables:
617 c()
618 del self.pre_run_callables[:]
620 async def run_async(
621 self,
622 pre_run: Callable[[], None] | None = None,
623 set_exception_handler: bool = True,
624 handle_sigint: bool = True,
625 slow_callback_duration: float = 0.5,
626 ) -> _AppResult:
627 """
628 Run the prompt_toolkit :class:`~prompt_toolkit.application.Application`
629 until :meth:`~prompt_toolkit.application.Application.exit` has been
630 called. Return the value that was passed to
631 :meth:`~prompt_toolkit.application.Application.exit`.
633 This is the main entry point for a prompt_toolkit
634 :class:`~prompt_toolkit.application.Application` and usually the only
635 place where the event loop is actually running.
637 :param pre_run: Optional callable, which is called right after the
638 "reset" of the application.
639 :param set_exception_handler: When set, in case of an exception, go out
640 of the alternate screen and hide the application, display the
641 exception, and wait for the user to press ENTER.
642 :param handle_sigint: Handle SIGINT signal if possible. This will call
643 the `<sigint>` key binding when a SIGINT is received. (This only
644 works in the main thread.)
645 :param slow_callback_duration: Display warnings if code scheduled in
646 the asyncio event loop takes more time than this. The asyncio
647 default of `0.1` is sometimes not sufficient on a slow system,
648 because exceptionally, the drawing of the app, which happens in the
649 event loop, can take a bit longer from time to time.
650 """
651 assert not self._is_running, "Application is already running."
653 if not in_main_thread() or sys.platform == "win32":
654 # Handling signals in other threads is not supported.
655 # Also on Windows, `add_signal_handler(signal.SIGINT, ...)` raises
656 # `NotImplementedError`.
657 # See: https://github.com/prompt-toolkit/python-prompt-toolkit/issues/1553
658 handle_sigint = False
660 async def _run_async(f: asyncio.Future[_AppResult]) -> _AppResult:
661 context = contextvars.copy_context()
662 self.context = context
664 # Counter for cancelling 'flush' timeouts. Every time when a key is
665 # pressed, we start a 'flush' timer for flushing our escape key. But
666 # when any subsequent input is received, a new timer is started and
667 # the current timer will be ignored.
668 flush_task: asyncio.Task[None] | None = None
670 # Reset.
671 # (`self.future` needs to be set when `pre_run` is called.)
672 self.reset()
673 self._pre_run(pre_run)
675 # Feed type ahead input first.
676 self.key_processor.feed_multiple(get_typeahead(self.input))
677 self.key_processor.process_keys()
679 def read_from_input() -> None:
680 nonlocal flush_task
682 # Ignore when we aren't running anymore. This callback will
683 # removed from the loop next time. (It could be that it was
684 # still in the 'tasks' list of the loop.)
685 # Except: if we need to process incoming CPRs.
686 if not self._is_running and not self.renderer.waiting_for_cpr:
687 return
689 # Get keys from the input object.
690 keys = self.input.read_keys()
692 # Feed to key processor.
693 self.key_processor.feed_multiple(keys)
694 self.key_processor.process_keys()
696 # Quit when the input stream was closed.
697 if self.input.closed:
698 if not f.done():
699 f.set_exception(EOFError)
700 else:
701 # Automatically flush keys.
702 if flush_task:
703 flush_task.cancel()
704 flush_task = self.create_background_task(auto_flush_input())
706 def read_from_input_in_context() -> None:
707 # Ensure that key bindings callbacks are always executed in the
708 # current context. This is important when key bindings are
709 # accessing contextvars. (These callbacks are currently being
710 # called from a different context. Underneath,
711 # `loop.add_reader` is used to register the stdin FD.)
712 # (We copy the context to avoid a `RuntimeError` in case the
713 # context is already active.)
714 context.copy().run(read_from_input)
716 async def auto_flush_input() -> None:
717 # Flush input after timeout.
718 # (Used for flushing the enter key.)
719 # This sleep can be cancelled, in that case we won't flush yet.
720 await sleep(self.ttimeoutlen)
721 flush_input()
723 def flush_input() -> None:
724 if not self.is_done:
725 # Get keys, and feed to key processor.
726 keys = self.input.flush_keys()
727 self.key_processor.feed_multiple(keys)
728 self.key_processor.process_keys()
730 if self.input.closed:
731 f.set_exception(EOFError)
733 # Enter raw mode, attach input and attach WINCH event handler.
734 with self.input.raw_mode(), self.input.attach(
735 read_from_input_in_context
736 ), attach_winch_signal_handler(self._on_resize):
737 # Draw UI.
738 self._request_absolute_cursor_position()
739 self._redraw()
740 self._start_auto_refresh_task()
742 self.create_background_task(self._poll_output_size())
744 # Wait for UI to finish.
745 try:
746 result = await f
747 finally:
748 # In any case, when the application finishes.
749 # (Successful, or because of an error.)
750 try:
751 self._redraw(render_as_done=True)
752 finally:
753 # _redraw has a good chance to fail if it calls widgets
754 # with bad code. Make sure to reset the renderer
755 # anyway.
756 self.renderer.reset()
758 # Unset `is_running`, this ensures that possibly
759 # scheduled draws won't paint during the following
760 # yield.
761 self._is_running = False
763 # Detach event handlers for invalidate events.
764 # (Important when a UIControl is embedded in multiple
765 # applications, like ptterm in pymux. An invalidate
766 # should not trigger a repaint in terminated
767 # applications.)
768 for ev in self._invalidate_events:
769 ev -= self._invalidate_handler
770 self._invalidate_events = []
772 # Wait for CPR responses.
773 if self.output.responds_to_cpr:
774 await self.renderer.wait_for_cpr_responses()
776 # Wait for the run-in-terminals to terminate.
777 previous_run_in_terminal_f = self._running_in_terminal_f
779 if previous_run_in_terminal_f:
780 await previous_run_in_terminal_f
782 # Store unprocessed input as typeahead for next time.
783 store_typeahead(self.input, self.key_processor.empty_queue())
785 return result
787 @contextmanager
788 def set_loop() -> Iterator[AbstractEventLoop]:
789 loop = get_running_loop()
790 self.loop = loop
791 self._loop_thread = threading.current_thread()
793 try:
794 yield loop
795 finally:
796 self.loop = None
797 self._loop_thread = None
799 @contextmanager
800 def set_is_running() -> Iterator[None]:
801 self._is_running = True
802 try:
803 yield
804 finally:
805 self._is_running = False
807 @contextmanager
808 def set_handle_sigint(loop: AbstractEventLoop) -> Iterator[None]:
809 if handle_sigint:
810 with _restore_sigint_from_ctypes():
811 # save sigint handlers (python and os level)
812 # See: https://github.com/prompt-toolkit/python-prompt-toolkit/issues/1576
813 loop.add_signal_handler(
814 signal.SIGINT,
815 lambda *_: loop.call_soon_threadsafe(
816 self.key_processor.send_sigint
817 ),
818 )
819 try:
820 yield
821 finally:
822 loop.remove_signal_handler(signal.SIGINT)
823 else:
824 yield
826 @contextmanager
827 def set_exception_handler_ctx(loop: AbstractEventLoop) -> Iterator[None]:
828 if set_exception_handler:
829 previous_exc_handler = loop.get_exception_handler()
830 loop.set_exception_handler(self._handle_exception)
831 try:
832 yield
833 finally:
834 loop.set_exception_handler(previous_exc_handler)
836 else:
837 yield
839 @contextmanager
840 def set_callback_duration(loop: AbstractEventLoop) -> Iterator[None]:
841 # Set slow_callback_duration.
842 original_slow_callback_duration = loop.slow_callback_duration
843 loop.slow_callback_duration = slow_callback_duration
844 try:
845 yield
846 finally:
847 # Reset slow_callback_duration.
848 loop.slow_callback_duration = original_slow_callback_duration
850 @contextmanager
851 def create_future(
852 loop: AbstractEventLoop,
853 ) -> Iterator[asyncio.Future[_AppResult]]:
854 f = loop.create_future()
855 self.future = f # XXX: make sure to set this before calling '_redraw'.
857 try:
858 yield f
859 finally:
860 # Also remove the Future again. (This brings the
861 # application back to its initial state, where it also
862 # doesn't have a Future.)
863 self.future = None
865 with ExitStack() as stack:
866 stack.enter_context(set_is_running())
868 # Make sure to set `_invalidated` to `False` to begin with,
869 # otherwise we're not going to paint anything. This can happen if
870 # this application had run before on a different event loop, and a
871 # paint was scheduled using `call_soon_threadsafe` with
872 # `max_postpone_time`.
873 self._invalidated = False
875 loop = stack.enter_context(set_loop())
877 stack.enter_context(set_handle_sigint(loop))
878 stack.enter_context(set_exception_handler_ctx(loop))
879 stack.enter_context(set_callback_duration(loop))
880 stack.enter_context(set_app(self))
881 stack.enter_context(self._enable_breakpointhook())
883 f = stack.enter_context(create_future(loop))
885 try:
886 return await _run_async(f)
887 finally:
888 # Wait for the background tasks to be done. This needs to
889 # go in the finally! If `_run_async` raises
890 # `KeyboardInterrupt`, we still want to wait for the
891 # background tasks.
892 await self.cancel_and_wait_for_background_tasks()
894 # The `ExitStack` above is defined in typeshed in a way that it can
895 # swallow exceptions. Without next line, mypy would think that there's
896 # a possibility we don't return here. See:
897 # https://github.com/python/mypy/issues/7726
898 assert False, "unreachable"
900 def run(
901 self,
902 pre_run: Callable[[], None] | None = None,
903 set_exception_handler: bool = True,
904 handle_sigint: bool = True,
905 in_thread: bool = False,
906 inputhook: InputHook | None = None,
907 ) -> _AppResult:
908 """
909 A blocking 'run' call that waits until the UI is finished.
911 This will run the application in a fresh asyncio event loop.
913 :param pre_run: Optional callable, which is called right after the
914 "reset" of the application.
915 :param set_exception_handler: When set, in case of an exception, go out
916 of the alternate screen and hide the application, display the
917 exception, and wait for the user to press ENTER.
918 :param in_thread: When true, run the application in a background
919 thread, and block the current thread until the application
920 terminates. This is useful if we need to be sure the application
921 won't use the current event loop (asyncio does not support nested
922 event loops). A new event loop will be created in this background
923 thread, and that loop will also be closed when the background
924 thread terminates. When this is used, it's especially important to
925 make sure that all asyncio background tasks are managed through
926 `get_appp().create_background_task()`, so that unfinished tasks are
927 properly cancelled before the event loop is closed. This is used
928 for instance in ptpython.
929 :param handle_sigint: Handle SIGINT signal. Call the key binding for
930 `Keys.SIGINT`. (This only works in the main thread.)
931 """
932 if in_thread:
933 result: _AppResult
934 exception: BaseException | None = None
936 def run_in_thread() -> None:
937 nonlocal result, exception
938 try:
939 result = self.run(
940 pre_run=pre_run,
941 set_exception_handler=set_exception_handler,
942 # Signal handling only works in the main thread.
943 handle_sigint=False,
944 inputhook=inputhook,
945 )
946 except BaseException as e:
947 exception = e
949 thread = threading.Thread(target=run_in_thread)
950 thread.start()
951 thread.join()
953 if exception is not None:
954 raise exception
955 return result
957 coro = self.run_async(
958 pre_run=pre_run,
959 set_exception_handler=set_exception_handler,
960 handle_sigint=handle_sigint,
961 )
963 def _called_from_ipython() -> bool:
964 try:
965 return (
966 sys.modules["IPython"].version_info < (8, 18, 0, "")
967 and "IPython/terminal/interactiveshell.py"
968 in sys._getframe(3).f_code.co_filename
969 )
970 except BaseException:
971 return False
973 if inputhook is not None:
974 # Create new event loop with given input hook and run the app.
975 # In Python 3.12, we can use asyncio.run(loop_factory=...)
976 # For now, use `run_until_complete()`.
977 loop = new_eventloop_with_inputhook(inputhook)
978 result = loop.run_until_complete(coro)
979 loop.run_until_complete(loop.shutdown_asyncgens())
980 loop.close()
981 return result
983 elif _called_from_ipython():
984 # workaround to make input hooks work for IPython until
985 # https://github.com/ipython/ipython/pull/14241 is merged.
986 # IPython was setting the input hook by installing an event loop
987 # previously.
988 try:
989 # See whether a loop was installed already. If so, use that.
990 # That's required for the input hooks to work, they are
991 # installed using `set_event_loop`.
992 loop = asyncio.get_event_loop()
993 except RuntimeError:
994 # No loop installed. Run like usual.
995 return asyncio.run(coro)
996 else:
997 # Use existing loop.
998 return loop.run_until_complete(coro)
1000 else:
1001 # No loop installed. Run like usual.
1002 return asyncio.run(coro)
1004 def _handle_exception(
1005 self, loop: AbstractEventLoop, context: dict[str, Any]
1006 ) -> None:
1007 """
1008 Handler for event loop exceptions.
1009 This will print the exception, using run_in_terminal.
1010 """
1011 # For Python 2: we have to get traceback at this point, because
1012 # we're still in the 'except:' block of the event loop where the
1013 # traceback is still available. Moving this code in the
1014 # 'print_exception' coroutine will loose the exception.
1015 tb = get_traceback_from_context(context)
1016 formatted_tb = "".join(format_tb(tb))
1018 async def in_term() -> None:
1019 async with in_terminal():
1020 # Print output. Similar to 'loop.default_exception_handler',
1021 # but don't use logger. (This works better on Python 2.)
1022 print("\nUnhandled exception in event loop:")
1023 print(formatted_tb)
1024 print("Exception {}".format(context.get("exception")))
1026 await _do_wait_for_enter("Press ENTER to continue...")
1028 ensure_future(in_term())
1030 @contextmanager
1031 def _enable_breakpointhook(self) -> Generator[None, None, None]:
1032 """
1033 Install our custom breakpointhook for the duration of this context
1034 manager. (We will only install the hook if no other custom hook was
1035 set.)
1036 """
1037 if sys.breakpointhook == sys.__breakpointhook__:
1038 sys.breakpointhook = self._breakpointhook
1040 try:
1041 yield
1042 finally:
1043 sys.breakpointhook = sys.__breakpointhook__
1044 else:
1045 yield
1047 def _breakpointhook(self, *a: object, **kw: object) -> None:
1048 """
1049 Breakpointhook which uses PDB, but ensures that the application is
1050 hidden and input echoing is restored during each debugger dispatch.
1052 This can be called from any thread. In any case, the application's
1053 event loop will be blocked while the PDB input is displayed. The event
1054 will continue after leaving the debugger.
1055 """
1056 app = self
1057 # Inline import on purpose. We don't want to import pdb, if not needed.
1058 import pdb
1059 from types import FrameType
1061 TraceDispatch = Callable[[FrameType, str, Any], Any]
1063 @contextmanager
1064 def hide_app_from_eventloop_thread() -> Generator[None, None, None]:
1065 """Stop application if `__breakpointhook__` is called from within
1066 the App's event loop."""
1067 # Hide application.
1068 app.renderer.erase()
1070 # Detach input and dispatch to debugger.
1071 with app.input.detach():
1072 with app.input.cooked_mode():
1073 yield
1075 # Note: we don't render the application again here, because
1076 # there's a good chance that there's a breakpoint on the next
1077 # line. This paint/erase cycle would move the PDB prompt back
1078 # to the middle of the screen.
1080 @contextmanager
1081 def hide_app_from_other_thread() -> Generator[None, None, None]:
1082 """Stop application if `__breakpointhook__` is called from a
1083 thread other than the App's event loop."""
1084 ready = threading.Event()
1085 done = threading.Event()
1087 async def in_loop() -> None:
1088 # from .run_in_terminal import in_terminal
1089 # async with in_terminal():
1090 # ready.set()
1091 # await asyncio.get_running_loop().run_in_executor(None, done.wait)
1092 # return
1094 # Hide application.
1095 app.renderer.erase()
1097 # Detach input and dispatch to debugger.
1098 with app.input.detach():
1099 with app.input.cooked_mode():
1100 ready.set()
1101 # Here we block the App's event loop thread until the
1102 # debugger resumes. We could have used `with
1103 # run_in_terminal.in_terminal():` like the commented
1104 # code above, but it seems to work better if we
1105 # completely stop the main event loop while debugging.
1106 done.wait()
1108 self.create_background_task(in_loop())
1109 ready.wait()
1110 try:
1111 yield
1112 finally:
1113 done.set()
1115 class CustomPdb(pdb.Pdb):
1116 def trace_dispatch(
1117 self, frame: FrameType, event: str, arg: Any
1118 ) -> TraceDispatch:
1119 if app._loop_thread is None:
1120 return super().trace_dispatch(frame, event, arg)
1122 if app._loop_thread == threading.current_thread():
1123 with hide_app_from_eventloop_thread():
1124 return super().trace_dispatch(frame, event, arg)
1126 with hide_app_from_other_thread():
1127 return super().trace_dispatch(frame, event, arg)
1129 frame = sys._getframe().f_back
1130 CustomPdb(stdout=sys.__stdout__).set_trace(frame)
1132 def create_background_task(
1133 self, coroutine: Coroutine[Any, Any, None]
1134 ) -> asyncio.Task[None]:
1135 """
1136 Start a background task (coroutine) for the running application. When
1137 the `Application` terminates, unfinished background tasks will be
1138 cancelled.
1140 Given that we still support Python versions before 3.11, we can't use
1141 task groups (and exception groups), because of that, these background
1142 tasks are not allowed to raise exceptions. If they do, we'll call the
1143 default exception handler from the event loop.
1145 If at some point, we have Python 3.11 as the minimum supported Python
1146 version, then we can use a `TaskGroup` (with the lifetime of
1147 `Application.run_async()`, and run run the background tasks in there.
1149 This is not threadsafe.
1150 """
1151 loop = self.loop or get_running_loop()
1152 task: asyncio.Task[None] = loop.create_task(coroutine)
1153 self._background_tasks.add(task)
1155 task.add_done_callback(self._on_background_task_done)
1156 return task
1158 def _on_background_task_done(self, task: asyncio.Task[None]) -> None:
1159 """
1160 Called when a background task completes. Remove it from
1161 `_background_tasks`, and handle exceptions if any.
1162 """
1163 self._background_tasks.discard(task)
1165 if task.cancelled():
1166 return
1168 exc = task.exception()
1169 if exc is not None:
1170 get_running_loop().call_exception_handler(
1171 {
1172 "message": f"prompt_toolkit.Application background task {task!r} "
1173 "raised an unexpected exception.",
1174 "exception": exc,
1175 "task": task,
1176 }
1177 )
1179 async def cancel_and_wait_for_background_tasks(self) -> None:
1180 """
1181 Cancel all background tasks, and wait for the cancellation to complete.
1182 If any of the background tasks raised an exception, this will also
1183 propagate the exception.
1185 (If we had nurseries like Trio, this would be the `__aexit__` of a
1186 nursery.)
1187 """
1188 for task in self._background_tasks:
1189 task.cancel()
1191 # Wait until the cancellation of the background tasks completes.
1192 # `asyncio.wait()` does not propagate exceptions raised within any of
1193 # these tasks, which is what we want. Otherwise, we can't distinguish
1194 # between a `CancelledError` raised in this task because it got
1195 # cancelled, and a `CancelledError` raised on this `await` checkpoint,
1196 # because *we* got cancelled during the teardown of the application.
1197 # (If we get cancelled here, then it's important to not suppress the
1198 # `CancelledError`, and have it propagate.)
1199 # NOTE: Currently, if we get cancelled at this point then we can't wait
1200 # for the cancellation to complete (in the future, we should be
1201 # using anyio or Python's 3.11 TaskGroup.)
1202 # Also, if we had exception groups, we could propagate an
1203 # `ExceptionGroup` if something went wrong here. Right now, we
1204 # don't propagate exceptions, but have them printed in
1205 # `_on_background_task_done`.
1206 if len(self._background_tasks) > 0:
1207 await asyncio.wait(
1208 self._background_tasks, timeout=None, return_when=asyncio.ALL_COMPLETED
1209 )
1211 async def _poll_output_size(self) -> None:
1212 """
1213 Coroutine for polling the terminal dimensions.
1215 Useful for situations where `attach_winch_signal_handler` is not sufficient:
1216 - If we are not running in the main thread.
1217 - On Windows.
1218 """
1219 size: Size | None = None
1220 interval = self.terminal_size_polling_interval
1222 if interval is None:
1223 return
1225 while True:
1226 await asyncio.sleep(interval)
1227 new_size = self.output.get_size()
1229 if size is not None and new_size != size:
1230 self._on_resize()
1231 size = new_size
1233 def cpr_not_supported_callback(self) -> None:
1234 """
1235 Called when we don't receive the cursor position response in time.
1236 """
1237 if not self.output.responds_to_cpr:
1238 return # We know about this already.
1240 def in_terminal() -> None:
1241 self.output.write(
1242 "WARNING: your terminal doesn't support cursor position requests (CPR).\r\n"
1243 )
1244 self.output.flush()
1246 run_in_terminal(in_terminal)
1248 @overload
1249 def exit(self) -> None:
1250 "Exit without arguments."
1252 @overload
1253 def exit(self, *, result: _AppResult, style: str = "") -> None:
1254 "Exit with `_AppResult`."
1256 @overload
1257 def exit(
1258 self, *, exception: BaseException | type[BaseException], style: str = ""
1259 ) -> None:
1260 "Exit with exception."
1262 def exit(
1263 self,
1264 result: _AppResult | None = None,
1265 exception: BaseException | type[BaseException] | None = None,
1266 style: str = "",
1267 ) -> None:
1268 """
1269 Exit application.
1271 .. note::
1273 If `Application.exit` is called before `Application.run()` is
1274 called, then the `Application` won't exit (because the
1275 `Application.future` doesn't correspond to the current run). Use a
1276 `pre_run` hook and an event to synchronize the closing if there's a
1277 chance this can happen.
1279 :param result: Set this result for the application.
1280 :param exception: Set this exception as the result for an application. For
1281 a prompt, this is often `EOFError` or `KeyboardInterrupt`.
1282 :param style: Apply this style on the whole content when quitting,
1283 often this is 'class:exiting' for a prompt. (Used when
1284 `erase_when_done` is not set.)
1285 """
1286 assert result is None or exception is None
1288 if self.future is None:
1289 raise Exception("Application is not running. Application.exit() failed.")
1291 if self.future.done():
1292 raise Exception("Return value already set. Application.exit() failed.")
1294 self.exit_style = style
1296 if exception is not None:
1297 self.future.set_exception(exception)
1298 else:
1299 self.future.set_result(cast(_AppResult, result))
1301 def _request_absolute_cursor_position(self) -> None:
1302 """
1303 Send CPR request.
1304 """
1305 # Note: only do this if the input queue is not empty, and a return
1306 # value has not been set. Otherwise, we won't be able to read the
1307 # response anyway.
1308 if not self.key_processor.input_queue and not self.is_done:
1309 self.renderer.request_absolute_cursor_position()
1311 async def run_system_command(
1312 self,
1313 command: str,
1314 wait_for_enter: bool = True,
1315 display_before_text: AnyFormattedText = "",
1316 wait_text: str = "Press ENTER to continue...",
1317 ) -> None:
1318 """
1319 Run system command (While hiding the prompt. When finished, all the
1320 output will scroll above the prompt.)
1322 :param command: Shell command to be executed.
1323 :param wait_for_enter: FWait for the user to press enter, when the
1324 command is finished.
1325 :param display_before_text: If given, text to be displayed before the
1326 command executes.
1327 :return: A `Future` object.
1328 """
1329 async with in_terminal():
1330 # Try to use the same input/output file descriptors as the one,
1331 # used to run this application.
1332 try:
1333 input_fd = self.input.fileno()
1334 except AttributeError:
1335 input_fd = sys.stdin.fileno()
1336 try:
1337 output_fd = self.output.fileno()
1338 except AttributeError:
1339 output_fd = sys.stdout.fileno()
1341 # Run sub process.
1342 def run_command() -> None:
1343 self.print_text(display_before_text)
1344 p = Popen(command, shell=True, stdin=input_fd, stdout=output_fd)
1345 p.wait()
1347 await run_in_executor_with_context(run_command)
1349 # Wait for the user to press enter.
1350 if wait_for_enter:
1351 await _do_wait_for_enter(wait_text)
1353 def suspend_to_background(self, suspend_group: bool = True) -> None:
1354 """
1355 (Not thread safe -- to be called from inside the key bindings.)
1356 Suspend process.
1358 :param suspend_group: When true, suspend the whole process group.
1359 (This is the default, and probably what you want.)
1360 """
1361 # Only suspend when the operating system supports it.
1362 # (Not on Windows.)
1363 if _SIGTSTP is not None:
1365 def run() -> None:
1366 signal = cast(int, _SIGTSTP)
1367 # Send `SIGTSTP` to own process.
1368 # This will cause it to suspend.
1370 # Usually we want the whole process group to be suspended. This
1371 # handles the case when input is piped from another process.
1372 if suspend_group:
1373 os.kill(0, signal)
1374 else:
1375 os.kill(os.getpid(), signal)
1377 run_in_terminal(run)
1379 def print_text(
1380 self, text: AnyFormattedText, style: BaseStyle | None = None
1381 ) -> None:
1382 """
1383 Print a list of (style_str, text) tuples to the output.
1384 (When the UI is running, this method has to be called through
1385 `run_in_terminal`, otherwise it will destroy the UI.)
1387 :param text: List of ``(style_str, text)`` tuples.
1388 :param style: Style class to use. Defaults to the active style in the CLI.
1389 """
1390 print_formatted_text(
1391 output=self.output,
1392 formatted_text=text,
1393 style=style or self._merged_style,
1394 color_depth=self.color_depth,
1395 style_transformation=self.style_transformation,
1396 )
1398 @property
1399 def is_running(self) -> bool:
1400 "`True` when the application is currently active/running."
1401 return self._is_running
1403 @property
1404 def is_done(self) -> bool:
1405 if self.future:
1406 return self.future.done()
1407 return False
1409 def get_used_style_strings(self) -> list[str]:
1410 """
1411 Return a list of used style strings. This is helpful for debugging, and
1412 for writing a new `Style`.
1413 """
1414 attrs_for_style = self.renderer._attrs_for_style
1416 if attrs_for_style:
1417 return sorted(
1418 re.sub(r"\s+", " ", style_str).strip()
1419 for style_str in attrs_for_style.keys()
1420 )
1422 return []
1425class _CombinedRegistry(KeyBindingsBase):
1426 """
1427 The `KeyBindings` of key bindings for a `Application`.
1428 This merges the global key bindings with the one of the current user
1429 control.
1430 """
1432 def __init__(self, app: Application[_AppResult]) -> None:
1433 self.app = app
1434 self._cache: SimpleCache[
1435 tuple[Window, frozenset[UIControl]], KeyBindingsBase
1436 ] = SimpleCache()
1438 @property
1439 def _version(self) -> Hashable:
1440 """Not needed - this object is not going to be wrapped in another
1441 KeyBindings object."""
1442 raise NotImplementedError
1444 @property
1445 def bindings(self) -> list[Binding]:
1446 """Not needed - this object is not going to be wrapped in another
1447 KeyBindings object."""
1448 raise NotImplementedError
1450 def _create_key_bindings(
1451 self, current_window: Window, other_controls: list[UIControl]
1452 ) -> KeyBindingsBase:
1453 """
1454 Create a `KeyBindings` object that merges the `KeyBindings` from the
1455 `UIControl` with all the parent controls and the global key bindings.
1456 """
1457 key_bindings = []
1458 collected_containers = set()
1460 # Collect key bindings from currently focused control and all parent
1461 # controls. Don't include key bindings of container parent controls.
1462 container: Container = current_window
1463 while True:
1464 collected_containers.add(container)
1465 kb = container.get_key_bindings()
1466 if kb is not None:
1467 key_bindings.append(kb)
1469 if container.is_modal():
1470 break
1472 parent = self.app.layout.get_parent(container)
1473 if parent is None:
1474 break
1475 else:
1476 container = parent
1478 # Include global bindings (starting at the top-model container).
1479 for c in walk(container):
1480 if c not in collected_containers:
1481 kb = c.get_key_bindings()
1482 if kb is not None:
1483 key_bindings.append(GlobalOnlyKeyBindings(kb))
1485 # Add App key bindings
1486 if self.app.key_bindings:
1487 key_bindings.append(self.app.key_bindings)
1489 # Add mouse bindings.
1490 key_bindings.append(
1491 ConditionalKeyBindings(
1492 self.app._page_navigation_bindings,
1493 self.app.enable_page_navigation_bindings,
1494 )
1495 )
1496 key_bindings.append(self.app._default_bindings)
1498 # Reverse this list. The current control's key bindings should come
1499 # last. They need priority.
1500 key_bindings = key_bindings[::-1]
1502 return merge_key_bindings(key_bindings)
1504 @property
1505 def _key_bindings(self) -> KeyBindingsBase:
1506 current_window = self.app.layout.current_window
1507 other_controls = list(self.app.layout.find_all_controls())
1508 key = current_window, frozenset(other_controls)
1510 return self._cache.get(
1511 key, lambda: self._create_key_bindings(current_window, other_controls)
1512 )
1514 def get_bindings_for_keys(self, keys: KeysTuple) -> list[Binding]:
1515 return self._key_bindings.get_bindings_for_keys(keys)
1517 def get_bindings_starting_with_keys(self, keys: KeysTuple) -> list[Binding]:
1518 return self._key_bindings.get_bindings_starting_with_keys(keys)
1521async def _do_wait_for_enter(wait_text: AnyFormattedText) -> None:
1522 """
1523 Create a sub application to wait for the enter key press.
1524 This has two advantages over using 'input'/'raw_input':
1525 - This will share the same input/output I/O.
1526 - This doesn't block the event loop.
1527 """
1528 from prompt_toolkit.shortcuts import PromptSession
1530 key_bindings = KeyBindings()
1532 @key_bindings.add("enter")
1533 def _ok(event: E) -> None:
1534 event.app.exit()
1536 @key_bindings.add(Keys.Any)
1537 def _ignore(event: E) -> None:
1538 "Disallow typing."
1539 pass
1541 session: PromptSession[None] = PromptSession(
1542 message=wait_text, key_bindings=key_bindings
1543 )
1544 try:
1545 await session.app.run_async()
1546 except KeyboardInterrupt:
1547 pass # Control-c pressed. Don't propagate this error.
1550@contextmanager
1551def attach_winch_signal_handler(
1552 handler: Callable[[], None],
1553) -> Generator[None, None, None]:
1554 """
1555 Attach the given callback as a WINCH signal handler within the context
1556 manager. Restore the original signal handler when done.
1558 The `Application.run` method will register SIGWINCH, so that it will
1559 properly repaint when the terminal window resizes. However, using
1560 `run_in_terminal`, we can temporarily send an application to the
1561 background, and run an other app in between, which will then overwrite the
1562 SIGWINCH. This is why it's important to restore the handler when the app
1563 terminates.
1564 """
1565 # The tricky part here is that signals are registered in the Unix event
1566 # loop with a wakeup fd, but another application could have registered
1567 # signals using signal.signal directly. For now, the implementation is
1568 # hard-coded for the `asyncio.unix_events._UnixSelectorEventLoop`.
1570 # No WINCH? Then don't do anything.
1571 sigwinch = getattr(signal, "SIGWINCH", None)
1572 if sigwinch is None or not in_main_thread():
1573 yield
1574 return
1576 # Keep track of the previous handler.
1577 # (Only UnixSelectorEventloop has `_signal_handlers`.)
1578 loop = get_running_loop()
1579 previous_winch_handler = getattr(loop, "_signal_handlers", {}).get(sigwinch)
1581 try:
1582 loop.add_signal_handler(sigwinch, handler)
1583 yield
1584 finally:
1585 # Restore the previous signal handler.
1586 loop.remove_signal_handler(sigwinch)
1587 if previous_winch_handler is not None:
1588 loop.add_signal_handler(
1589 sigwinch,
1590 previous_winch_handler._callback,
1591 *previous_winch_handler._args,
1592 )
1595@contextmanager
1596def _restore_sigint_from_ctypes() -> Generator[None, None, None]:
1597 # The following functions are part of the stable ABI since python 3.2
1598 # See: https://docs.python.org/3/c-api/sys.html#c.PyOS_getsig
1599 # Inline import: these are not available on Pypy.
1600 try:
1601 from ctypes import c_int, c_void_p, pythonapi
1602 except ImportError:
1603 # Any of the above imports don't exist? Don't do anything here.
1604 yield
1605 return
1607 # PyOS_sighandler_t PyOS_getsig(int i)
1608 pythonapi.PyOS_getsig.restype = c_void_p
1609 pythonapi.PyOS_getsig.argtypes = (c_int,)
1611 # PyOS_sighandler_t PyOS_setsig(int i, PyOS_sighandler_t h)
1612 pythonapi.PyOS_setsig.restype = c_void_p
1613 pythonapi.PyOS_setsig.argtypes = (
1614 c_int,
1615 c_void_p,
1616 )
1618 sigint = signal.getsignal(signal.SIGINT)
1619 sigint_os = pythonapi.PyOS_getsig(signal.SIGINT)
1621 try:
1622 yield
1623 finally:
1624 signal.signal(signal.SIGINT, sigint)
1625 pythonapi.PyOS_setsig(signal.SIGINT, sigint_os)