1from __future__ import annotations
2
3import asyncio
4import contextvars
5import os
6import re
7import signal
8import sys
9import threading
10import time
11from asyncio import (
12 AbstractEventLoop,
13 Future,
14 Task,
15 ensure_future,
16 get_running_loop,
17 sleep,
18)
19from collections.abc import Callable, Coroutine, Generator, Hashable, Iterable, Iterator
20from contextlib import ExitStack, contextmanager
21from subprocess import Popen
22from traceback import format_tb
23from typing import (
24 Any,
25 Generic,
26 TypeVar,
27 cast,
28 overload,
29)
30
31from prompt_toolkit.buffer import Buffer
32from prompt_toolkit.cache import SimpleCache
33from prompt_toolkit.clipboard import Clipboard, InMemoryClipboard
34from prompt_toolkit.cursor_shapes import AnyCursorShapeConfig, to_cursor_shape_config
35from prompt_toolkit.data_structures import Size
36from prompt_toolkit.enums import EditingMode
37from prompt_toolkit.eventloop import (
38 InputHook,
39 get_traceback_from_context,
40 new_eventloop_with_inputhook,
41 run_in_executor_with_context,
42)
43from prompt_toolkit.eventloop.utils import call_soon_threadsafe
44from prompt_toolkit.filters import Condition, Filter, FilterOrBool, to_filter
45from prompt_toolkit.formatted_text import AnyFormattedText
46from prompt_toolkit.input.base import Input
47from prompt_toolkit.input.typeahead import get_typeahead, store_typeahead
48from prompt_toolkit.key_binding.bindings.page_navigation import (
49 load_page_navigation_bindings,
50)
51from prompt_toolkit.key_binding.defaults import load_key_bindings
52from prompt_toolkit.key_binding.emacs_state import EmacsState
53from prompt_toolkit.key_binding.key_bindings import (
54 Binding,
55 ConditionalKeyBindings,
56 GlobalOnlyKeyBindings,
57 KeyBindings,
58 KeyBindingsBase,
59 KeysTuple,
60 merge_key_bindings,
61)
62from prompt_toolkit.key_binding.key_processor import KeyPressEvent, KeyProcessor
63from prompt_toolkit.key_binding.vi_state import ViState
64from prompt_toolkit.keys import Keys
65from prompt_toolkit.layout.containers import Container, Window
66from prompt_toolkit.layout.controls import BufferControl, UIControl
67from prompt_toolkit.layout.dummy import create_dummy_layout
68from prompt_toolkit.layout.layout import Layout, walk
69from prompt_toolkit.output import ColorDepth, Output
70from prompt_toolkit.renderer import Renderer, print_formatted_text
71from prompt_toolkit.search import SearchState
72from prompt_toolkit.styles import (
73 BaseStyle,
74 DummyStyle,
75 DummyStyleTransformation,
76 DynamicStyle,
77 StyleTransformation,
78 default_pygments_style,
79 default_ui_style,
80 merge_styles,
81)
82from prompt_toolkit.utils import Event, in_main_thread
83
84from .current import get_app_session, set_app
85from .run_in_terminal import in_terminal, run_in_terminal
86
87__all__ = [
88 "Application",
89]
90
91
92E = KeyPressEvent
93_AppResult = TypeVar("_AppResult")
94ApplicationEventHandler = Callable[["Application[_AppResult]"], None]
95
96_SIGWINCH = getattr(signal, "SIGWINCH", None)
97_SIGTSTP = getattr(signal, "SIGTSTP", None)
98
99
100class Application(Generic[_AppResult]):
101 """
102 The main Application class!
103 This glues everything together.
104
105 :param layout: A :class:`~prompt_toolkit.layout.Layout` instance.
106 :param key_bindings:
107 :class:`~prompt_toolkit.key_binding.KeyBindingsBase` instance for
108 the key bindings.
109 :param clipboard: :class:`~prompt_toolkit.clipboard.Clipboard` to use.
110 :param full_screen: When True, run the application on the alternate screen buffer.
111 :param color_depth: Any :class:`~.ColorDepth` value, a callable that
112 returns a :class:`~.ColorDepth` or `None` for default.
113 :param erase_when_done: (bool) Clear the application output when it finishes.
114 :param reverse_vi_search_direction: Normally, in Vi mode, a '/' searches
115 forward and a '?' searches backward. In Readline mode, this is usually
116 reversed.
117 :param min_redraw_interval: Number of seconds to wait between redraws. Use
118 this for applications where `invalidate` is called a lot. This could cause
119 a lot of terminal output, which some terminals are not able to process.
120
121 `None` means that every `invalidate` will be scheduled right away
122 (which is usually fine).
123
124 When one `invalidate` is called, but a scheduled redraw of a previous
125 `invalidate` call has not been executed yet, nothing will happen in any
126 case.
127
128 :param max_render_postpone_time: When there is high CPU (a lot of other
129 scheduled calls), postpone the rendering max x seconds. '0' means:
130 don't postpone. '.5' means: try to draw at least twice a second.
131
132 :param refresh_interval: Automatically invalidate the UI every so many
133 seconds. When `None` (the default), only invalidate when `invalidate`
134 has been called.
135
136 :param terminal_size_polling_interval: Poll the terminal size every so many
137 seconds. Useful if the applications runs in a thread other then then
138 main thread where SIGWINCH can't be handled, or on Windows.
139
140 Filters:
141
142 :param mouse_support: (:class:`~prompt_toolkit.filters.Filter` or
143 boolean). When True, enable mouse support.
144 :param paste_mode: :class:`~prompt_toolkit.filters.Filter` or boolean.
145 :param editing_mode: :class:`~prompt_toolkit.enums.EditingMode`.
146
147 :param enable_page_navigation_bindings: When `True`, enable the page
148 navigation key bindings. These include both Emacs and Vi bindings like
149 page-up, page-down and so on to scroll through pages. Mostly useful for
150 creating an editor or other full screen applications. Probably, you
151 don't want this for the implementation of a REPL. By default, this is
152 enabled if `full_screen` is set.
153
154 Callbacks (all of these should accept an
155 :class:`~prompt_toolkit.application.Application` object as input.)
156
157 :param on_reset: Called during reset.
158 :param on_invalidate: Called when the UI has been invalidated.
159 :param before_render: Called right before rendering.
160 :param after_render: Called right after rendering.
161
162 I/O:
163 (Note that the preferred way to change the input/output is by creating an
164 `AppSession` with the required input/output objects. If you need multiple
165 applications running at the same time, you have to create a separate
166 `AppSession` using a `with create_app_session():` block.
167
168 :param input: :class:`~prompt_toolkit.input.Input` instance.
169 :param output: :class:`~prompt_toolkit.output.Output` instance. (Probably
170 Vt100_Output or Win32Output.)
171
172 Usage:
173
174 app = Application(...)
175 app.run()
176
177 # Or
178 await app.run_async()
179 """
180
181 def __init__(
182 self,
183 layout: Layout | None = None,
184 style: BaseStyle | None = None,
185 include_default_pygments_style: FilterOrBool = True,
186 style_transformation: StyleTransformation | None = None,
187 key_bindings: KeyBindingsBase | None = None,
188 clipboard: Clipboard | None = None,
189 full_screen: bool = False,
190 color_depth: (ColorDepth | Callable[[], ColorDepth | None] | None) = None,
191 mouse_support: FilterOrBool = False,
192 enable_page_navigation_bindings: None
193 | (FilterOrBool) = None, # Can be None, True or False.
194 paste_mode: FilterOrBool = False,
195 editing_mode: EditingMode = EditingMode.EMACS,
196 erase_when_done: bool = False,
197 reverse_vi_search_direction: FilterOrBool = False,
198 min_redraw_interval: float | int | None = None,
199 max_render_postpone_time: float | int | None = 0.01,
200 refresh_interval: float | None = None,
201 terminal_size_polling_interval: float | None = 0.5,
202 cursor: AnyCursorShapeConfig = None,
203 on_reset: ApplicationEventHandler[_AppResult] | None = None,
204 on_invalidate: ApplicationEventHandler[_AppResult] | None = None,
205 before_render: ApplicationEventHandler[_AppResult] | None = None,
206 after_render: ApplicationEventHandler[_AppResult] | None = None,
207 # I/O.
208 input: Input | None = None,
209 output: Output | None = None,
210 ) -> None:
211 # If `enable_page_navigation_bindings` is not specified, enable it in
212 # case of full screen applications only. This can be overridden by the user.
213 if enable_page_navigation_bindings is None:
214 enable_page_navigation_bindings = Condition(lambda: self.full_screen)
215
216 paste_mode = to_filter(paste_mode)
217 mouse_support = to_filter(mouse_support)
218 reverse_vi_search_direction = to_filter(reverse_vi_search_direction)
219 enable_page_navigation_bindings = to_filter(enable_page_navigation_bindings)
220 include_default_pygments_style = to_filter(include_default_pygments_style)
221
222 if layout is None:
223 layout = create_dummy_layout()
224
225 if style_transformation is None:
226 style_transformation = DummyStyleTransformation()
227
228 self.style = style
229 self.style_transformation = style_transformation
230
231 # Key bindings.
232 self.key_bindings = key_bindings
233 self._default_bindings = load_key_bindings()
234 self._page_navigation_bindings = load_page_navigation_bindings()
235
236 self.layout = layout
237 self.clipboard = clipboard or InMemoryClipboard()
238 self.full_screen: bool = full_screen
239 self._color_depth = color_depth
240 self.mouse_support = mouse_support
241
242 self.paste_mode = paste_mode
243 self.editing_mode = editing_mode
244 self.erase_when_done = erase_when_done
245 self.reverse_vi_search_direction = reverse_vi_search_direction
246 self.enable_page_navigation_bindings = enable_page_navigation_bindings
247 self.min_redraw_interval = min_redraw_interval
248 self.max_render_postpone_time = max_render_postpone_time
249 self.refresh_interval = refresh_interval
250 self.terminal_size_polling_interval = terminal_size_polling_interval
251
252 self.cursor = to_cursor_shape_config(cursor)
253
254 # Events.
255 self.on_invalidate = Event(self, on_invalidate)
256 self.on_reset = Event(self, on_reset)
257 self.before_render = Event(self, before_render)
258 self.after_render = Event(self, after_render)
259
260 # I/O.
261 session = get_app_session()
262 self.output = output or session.output
263 self.input = input or session.input
264
265 # List of 'extra' functions to execute before a Application.run.
266 self.pre_run_callables: list[Callable[[], None]] = []
267
268 self._is_running = False
269 self.future: Future[_AppResult] | None = None
270 self.loop: AbstractEventLoop | None = None
271 self._loop_thread: threading.Thread | None = None
272 self.context: contextvars.Context | None = None
273
274 #: Quoted insert. This flag is set if we go into quoted insert mode.
275 self.quoted_insert = False
276
277 #: Vi state. (For Vi key bindings.)
278 self.vi_state = ViState()
279 self.emacs_state = EmacsState()
280
281 #: When to flush the input (For flushing escape keys.) This is important
282 #: on terminals that use vt100 input. We can't distinguish the escape
283 #: key from for instance the left-arrow key, if we don't know what follows
284 #: after "\x1b". This little timer will consider "\x1b" to be escape if
285 #: nothing did follow in this time span.
286 #: This seems to work like the `ttimeoutlen` option in Vim.
287 self.ttimeoutlen = 0.5 # Seconds.
288
289 #: Like Vim's `timeoutlen` option. This can be `None` or a float. For
290 #: instance, suppose that we have a key binding AB and a second key
291 #: binding A. If the uses presses A and then waits, we don't handle
292 #: this binding yet (unless it was marked 'eager'), because we don't
293 #: know what will follow. This timeout is the maximum amount of time
294 #: that we wait until we call the handlers anyway. Pass `None` to
295 #: disable this timeout.
296 self.timeoutlen = 1.0
297
298 #: The `Renderer` instance.
299 # Make sure that the same stdout is used, when a custom renderer has been passed.
300 self._merged_style = self._create_merged_style(include_default_pygments_style)
301
302 self.renderer = Renderer(
303 self._merged_style,
304 self.output,
305 full_screen=full_screen,
306 mouse_support=mouse_support,
307 cpr_not_supported_callback=self.cpr_not_supported_callback,
308 )
309
310 #: Render counter. This one is increased every time the UI is rendered.
311 #: It can be used as a key for caching certain information during one
312 #: rendering.
313 self.render_counter = 0
314
315 # Invalidate flag. When 'True', a repaint has been scheduled.
316 self._invalidated = False
317 self._invalidate_events: list[
318 Event[object]
319 ] = [] # Collection of 'invalidate' Event objects.
320 self._last_redraw_time = 0.0 # Unix timestamp of last redraw. Used when
321 # `min_redraw_interval` is given.
322
323 #: The `InputProcessor` instance.
324 self.key_processor = KeyProcessor(_CombinedRegistry(self))
325
326 # If `run_in_terminal` was called. This will point to a `Future` what will be
327 # set at the point when the previous run finishes.
328 self._running_in_terminal = False
329 self._running_in_terminal_f: Future[None] | None = None
330
331 # Trigger initialize callback.
332 self.reset()
333
334 def _create_merged_style(self, include_default_pygments_style: Filter) -> BaseStyle:
335 """
336 Create a `Style` object that merges the default UI style, the default
337 pygments style, and the custom user style.
338 """
339 dummy_style = DummyStyle()
340 pygments_style = default_pygments_style()
341
342 @DynamicStyle
343 def conditional_pygments_style() -> BaseStyle:
344 if include_default_pygments_style():
345 return pygments_style
346 else:
347 return dummy_style
348
349 return merge_styles(
350 [
351 default_ui_style(),
352 conditional_pygments_style,
353 DynamicStyle(lambda: self.style),
354 ]
355 )
356
357 @property
358 def color_depth(self) -> ColorDepth:
359 """
360 The active :class:`.ColorDepth`.
361
362 The current value is determined as follows:
363
364 - If a color depth was given explicitly to this application, use that
365 value.
366 - Otherwise, fall back to the color depth that is reported by the
367 :class:`.Output` implementation. If the :class:`.Output` class was
368 created using `output.defaults.create_output`, then this value is
369 coming from the $PROMPT_TOOLKIT_COLOR_DEPTH environment variable.
370 """
371 depth = self._color_depth
372
373 if callable(depth):
374 depth = depth()
375
376 if depth is None:
377 depth = self.output.get_default_color_depth()
378
379 return depth
380
381 @property
382 def current_buffer(self) -> Buffer:
383 """
384 The currently focused :class:`~.Buffer`.
385
386 (This returns a dummy :class:`.Buffer` when none of the actual buffers
387 has the focus. In this case, it's really not practical to check for
388 `None` values or catch exceptions every time.)
389 """
390 return self.layout.current_buffer or Buffer(
391 name="dummy-buffer"
392 ) # Dummy buffer.
393
394 @property
395 def current_search_state(self) -> SearchState:
396 """
397 Return the current :class:`.SearchState`. (The one for the focused
398 :class:`.BufferControl`.)
399 """
400 ui_control = self.layout.current_control
401 if isinstance(ui_control, BufferControl):
402 return ui_control.search_state
403 else:
404 return SearchState() # Dummy search state. (Don't return None!)
405
406 def reset(self) -> None:
407 """
408 Reset everything, for reading the next input.
409 """
410 # Notice that we don't reset the buffers. (This happens just before
411 # returning, and when we have multiple buffers, we clearly want the
412 # content in the other buffers to remain unchanged between several
413 # calls of `run`. (And the same is true for the focus stack.)
414
415 self.exit_style = ""
416
417 self._background_tasks: set[Task[None]] = set()
418
419 self.renderer.reset()
420 self.key_processor.reset()
421 self.layout.reset()
422 self.vi_state.reset()
423 self.emacs_state.reset()
424
425 # Trigger reset event.
426 self.on_reset.fire()
427
428 # Make sure that we have a 'focusable' widget focused.
429 # (The `Layout` class can't determine this.)
430 layout = self.layout
431
432 if not layout.current_control.is_focusable():
433 for w in layout.find_all_windows():
434 if w.content.is_focusable():
435 layout.current_window = w
436 break
437
438 def invalidate(self) -> None:
439 """
440 Thread safe way of sending a repaint trigger to the input event loop.
441 """
442 if not self._is_running:
443 # Don't schedule a redraw if we're not running.
444 # Otherwise, `get_running_loop()` in `call_soon_threadsafe` can fail.
445 # See: https://github.com/dbcli/mycli/issues/797
446 return
447
448 # `invalidate()` called if we don't have a loop yet (not running?), or
449 # after the event loop was closed.
450 if self.loop is None or self.loop.is_closed():
451 return
452
453 # Never schedule a second redraw, when a previous one has not yet been
454 # executed. (This should protect against other threads calling
455 # 'invalidate' many times, resulting in 100% CPU.)
456 if self._invalidated:
457 return
458 else:
459 self._invalidated = True
460
461 # Trigger event.
462 self.loop.call_soon_threadsafe(self.on_invalidate.fire)
463
464 def redraw() -> None:
465 self._invalidated = False
466 self._redraw()
467
468 def schedule_redraw() -> None:
469 call_soon_threadsafe(
470 redraw, max_postpone_time=self.max_render_postpone_time, loop=self.loop
471 )
472
473 if self.min_redraw_interval:
474 # When a minimum redraw interval is set, wait minimum this amount
475 # of time between redraws.
476 diff = time.time() - self._last_redraw_time
477 if diff < self.min_redraw_interval:
478
479 async def redraw_in_future() -> None:
480 await sleep(cast(float, self.min_redraw_interval) - diff)
481 schedule_redraw()
482
483 self.loop.call_soon_threadsafe(
484 lambda: self.create_background_task(redraw_in_future())
485 )
486 else:
487 schedule_redraw()
488 else:
489 schedule_redraw()
490
491 @property
492 def invalidated(self) -> bool:
493 "True when a redraw operation has been scheduled."
494 return self._invalidated
495
496 def _redraw(self, render_as_done: bool = False) -> None:
497 """
498 Render the command line again. (Not thread safe!) (From other threads,
499 or if unsure, use :meth:`.Application.invalidate`.)
500
501 :param render_as_done: make sure to put the cursor after the UI.
502 """
503
504 def run_in_context() -> None:
505 # Only draw when no sub application was started.
506 if self._is_running and not self._running_in_terminal:
507 if self.min_redraw_interval:
508 self._last_redraw_time = time.time()
509
510 # Render
511 self.render_counter += 1
512 self.before_render.fire()
513
514 if render_as_done:
515 if self.erase_when_done:
516 self.renderer.erase()
517 else:
518 # Draw in 'done' state and reset renderer.
519 self.renderer.render(self, self.layout, is_done=render_as_done)
520 else:
521 self.renderer.render(self, self.layout)
522
523 self.layout.update_parents_relations()
524
525 # Fire render event.
526 self.after_render.fire()
527
528 self._update_invalidate_events()
529
530 # NOTE: We want to make sure this Application is the active one. The
531 # invalidate function is often called from a context where this
532 # application is not the active one. (Like the
533 # `PromptSession._auto_refresh_context`).
534 # We copy the context in case the context was already active, to
535 # prevent RuntimeErrors. (The rendering is not supposed to change
536 # any context variables.)
537 if self.context is not None:
538 self.context.copy().run(run_in_context)
539
540 def _start_auto_refresh_task(self) -> None:
541 """
542 Start a while/true loop in the background for automatic invalidation of
543 the UI.
544 """
545 if self.refresh_interval is not None and self.refresh_interval != 0:
546
547 async def auto_refresh(refresh_interval: float) -> None:
548 while True:
549 await sleep(refresh_interval)
550 self.invalidate()
551
552 self.create_background_task(auto_refresh(self.refresh_interval))
553
554 def _update_invalidate_events(self) -> None:
555 """
556 Make sure to attach 'invalidate' handlers to all invalidate events in
557 the UI.
558 """
559 # Remove all the original event handlers. (Components can be removed
560 # from the UI.)
561 for ev in self._invalidate_events:
562 ev -= self._invalidate_handler
563
564 # Gather all new events.
565 # (All controls are able to invalidate themselves.)
566 def gather_events() -> Iterable[Event[object]]:
567 for c in self.layout.find_all_controls():
568 yield from c.get_invalidate_events()
569
570 self._invalidate_events = list(gather_events())
571
572 for ev in self._invalidate_events:
573 ev += self._invalidate_handler
574
575 def _invalidate_handler(self, sender: object) -> None:
576 """
577 Handler for invalidate events coming from UIControls.
578
579 (This handles the difference in signature between event handler and
580 `self.invalidate`. It also needs to be a method -not a nested
581 function-, so that we can remove it again .)
582 """
583 self.invalidate()
584
585 def _on_resize(self) -> None:
586 """
587 When the window size changes, we erase the current output and request
588 again the cursor position. When the CPR answer arrives, the output is
589 drawn again.
590 """
591 # Erase, request position (when cursor is at the start position)
592 # and redraw again. -- The order is important.
593 self.renderer.erase(leave_alternate_screen=False)
594 self._request_absolute_cursor_position()
595 self._redraw()
596
597 def _pre_run(self, pre_run: Callable[[], None] | None = None) -> None:
598 """
599 Called during `run`.
600
601 `self.future` should be set to the new future at the point where this
602 is called in order to avoid data races. `pre_run` can be used to set a
603 `threading.Event` to synchronize with UI termination code, running in
604 another thread that would call `Application.exit`. (See the progress
605 bar code for an example.)
606 """
607 if pre_run:
608 pre_run()
609
610 # Process registered "pre_run_callables" and clear list.
611 for c in self.pre_run_callables:
612 c()
613 del self.pre_run_callables[:]
614
615 async def run_async(
616 self,
617 pre_run: Callable[[], None] | None = None,
618 set_exception_handler: bool = True,
619 handle_sigint: bool = True,
620 slow_callback_duration: float = 0.5,
621 ) -> _AppResult:
622 """
623 Run the prompt_toolkit :class:`~prompt_toolkit.application.Application`
624 until :meth:`~prompt_toolkit.application.Application.exit` has been
625 called. Return the value that was passed to
626 :meth:`~prompt_toolkit.application.Application.exit`.
627
628 This is the main entry point for a prompt_toolkit
629 :class:`~prompt_toolkit.application.Application` and usually the only
630 place where the event loop is actually running.
631
632 :param pre_run: Optional callable, which is called right after the
633 "reset" of the application.
634 :param set_exception_handler: When set, in case of an exception, go out
635 of the alternate screen and hide the application, display the
636 exception, and wait for the user to press ENTER.
637 :param handle_sigint: Handle SIGINT signal if possible. This will call
638 the `<sigint>` key binding when a SIGINT is received. (This only
639 works in the main thread.)
640 :param slow_callback_duration: Display warnings if code scheduled in
641 the asyncio event loop takes more time than this. The asyncio
642 default of `0.1` is sometimes not sufficient on a slow system,
643 because exceptionally, the drawing of the app, which happens in the
644 event loop, can take a bit longer from time to time.
645 """
646 assert not self._is_running, "Application is already running."
647
648 if not in_main_thread() or sys.platform == "win32":
649 # Handling signals in other threads is not supported.
650 # Also on Windows, `add_signal_handler(signal.SIGINT, ...)` raises
651 # `NotImplementedError`.
652 # See: https://github.com/prompt-toolkit/python-prompt-toolkit/issues/1553
653 handle_sigint = False
654
655 async def _run_async(f: asyncio.Future[_AppResult]) -> _AppResult:
656 context = contextvars.copy_context()
657 self.context = context
658
659 # Counter for cancelling 'flush' timeouts. Every time when a key is
660 # pressed, we start a 'flush' timer for flushing our escape key. But
661 # when any subsequent input is received, a new timer is started and
662 # the current timer will be ignored.
663 flush_task: asyncio.Task[None] | None = None
664
665 # Reset.
666 # (`self.future` needs to be set when `pre_run` is called.)
667 self.reset()
668 self._pre_run(pre_run)
669
670 # Feed type ahead input first.
671 self.key_processor.feed_multiple(get_typeahead(self.input))
672 self.key_processor.process_keys()
673
674 def read_from_input() -> None:
675 nonlocal flush_task
676
677 # Ignore when we aren't running anymore. This callback will
678 # removed from the loop next time. (It could be that it was
679 # still in the 'tasks' list of the loop.)
680 # Except: if we need to process incoming CPRs.
681 if not self._is_running and not self.renderer.waiting_for_cpr:
682 return
683
684 # Get keys from the input object.
685 keys = self.input.read_keys()
686
687 # Feed to key processor.
688 self.key_processor.feed_multiple(keys)
689 self.key_processor.process_keys()
690
691 # Quit when the input stream was closed.
692 if self.input.closed:
693 if not f.done():
694 f.set_exception(EOFError)
695 else:
696 # Automatically flush keys.
697 if flush_task:
698 flush_task.cancel()
699 flush_task = self.create_background_task(auto_flush_input())
700
701 def read_from_input_in_context() -> None:
702 # Ensure that key bindings callbacks are always executed in the
703 # current context. This is important when key bindings are
704 # accessing contextvars. (These callbacks are currently being
705 # called from a different context. Underneath,
706 # `loop.add_reader` is used to register the stdin FD.)
707 # (We copy the context to avoid a `RuntimeError` in case the
708 # context is already active.)
709 context.copy().run(read_from_input)
710
711 async def auto_flush_input() -> None:
712 # Flush input after timeout.
713 # (Used for flushing the enter key.)
714 # This sleep can be cancelled, in that case we won't flush yet.
715 await sleep(self.ttimeoutlen)
716 flush_input()
717
718 def flush_input() -> None:
719 if not self.is_done:
720 # Get keys, and feed to key processor.
721 keys = self.input.flush_keys()
722 self.key_processor.feed_multiple(keys)
723 self.key_processor.process_keys()
724
725 if self.input.closed:
726 f.set_exception(EOFError)
727
728 # Enter raw mode, attach input and attach WINCH event handler.
729 with (
730 self.input.raw_mode(),
731 self.input.attach(read_from_input_in_context),
732 attach_winch_signal_handler(self._on_resize),
733 ):
734 # Draw UI.
735 self._request_absolute_cursor_position()
736 self._redraw()
737 self._start_auto_refresh_task()
738
739 self.create_background_task(self._poll_output_size())
740
741 # Wait for UI to finish.
742 try:
743 result = await f
744 finally:
745 # In any case, when the application finishes.
746 # (Successful, or because of an error.)
747 try:
748 self._redraw(render_as_done=True)
749 finally:
750 # _redraw has a good chance to fail if it calls widgets
751 # with bad code. Make sure to reset the renderer
752 # anyway.
753 self.renderer.reset()
754
755 # Unset `is_running`, this ensures that possibly
756 # scheduled draws won't paint during the following
757 # yield.
758 self._is_running = False
759
760 # Detach event handlers for invalidate events.
761 # (Important when a UIControl is embedded in multiple
762 # applications, like ptterm in pymux. An invalidate
763 # should not trigger a repaint in terminated
764 # applications.)
765 for ev in self._invalidate_events:
766 ev -= self._invalidate_handler
767 self._invalidate_events = []
768
769 # Wait for CPR responses.
770 if self.output.responds_to_cpr:
771 await self.renderer.wait_for_cpr_responses()
772
773 # Wait for the run-in-terminals to terminate.
774 previous_run_in_terminal_f = self._running_in_terminal_f
775
776 if previous_run_in_terminal_f:
777 await previous_run_in_terminal_f
778
779 # Store unprocessed input as typeahead for next time.
780 store_typeahead(self.input, self.key_processor.empty_queue())
781
782 return result
783
784 @contextmanager
785 def set_loop() -> Iterator[AbstractEventLoop]:
786 loop = get_running_loop()
787 self.loop = loop
788 self._loop_thread = threading.current_thread()
789
790 try:
791 yield loop
792 finally:
793 self.loop = None
794 self._loop_thread = None
795
796 @contextmanager
797 def set_is_running() -> Iterator[None]:
798 self._is_running = True
799 try:
800 yield
801 finally:
802 self._is_running = False
803
804 @contextmanager
805 def set_handle_sigint(loop: AbstractEventLoop) -> Iterator[None]:
806 if handle_sigint:
807 with _restore_sigint_from_ctypes():
808 # save sigint handlers (python and os level)
809 # See: https://github.com/prompt-toolkit/python-prompt-toolkit/issues/1576
810 loop.add_signal_handler(
811 signal.SIGINT,
812 lambda *_: loop.call_soon_threadsafe(
813 self.key_processor.send_sigint
814 ),
815 )
816 try:
817 yield
818 finally:
819 loop.remove_signal_handler(signal.SIGINT)
820 else:
821 yield
822
823 @contextmanager
824 def set_exception_handler_ctx(loop: AbstractEventLoop) -> Iterator[None]:
825 if set_exception_handler:
826 previous_exc_handler = loop.get_exception_handler()
827 loop.set_exception_handler(self._handle_exception)
828 try:
829 yield
830 finally:
831 loop.set_exception_handler(previous_exc_handler)
832
833 else:
834 yield
835
836 @contextmanager
837 def set_callback_duration(loop: AbstractEventLoop) -> Iterator[None]:
838 # Set slow_callback_duration.
839 original_slow_callback_duration = loop.slow_callback_duration
840 loop.slow_callback_duration = slow_callback_duration
841 try:
842 yield
843 finally:
844 # Reset slow_callback_duration.
845 loop.slow_callback_duration = original_slow_callback_duration
846
847 @contextmanager
848 def create_future(
849 loop: AbstractEventLoop,
850 ) -> Iterator[asyncio.Future[_AppResult]]:
851 f = loop.create_future()
852 self.future = f # XXX: make sure to set this before calling '_redraw'.
853
854 try:
855 yield f
856 finally:
857 # Also remove the Future again. (This brings the
858 # application back to its initial state, where it also
859 # doesn't have a Future.)
860 self.future = None
861
862 with ExitStack() as stack:
863 stack.enter_context(set_is_running())
864
865 # Make sure to set `_invalidated` to `False` to begin with,
866 # otherwise we're not going to paint anything. This can happen if
867 # this application had run before on a different event loop, and a
868 # paint was scheduled using `call_soon_threadsafe` with
869 # `max_postpone_time`.
870 self._invalidated = False
871
872 loop = stack.enter_context(set_loop())
873
874 stack.enter_context(set_handle_sigint(loop))
875 stack.enter_context(set_exception_handler_ctx(loop))
876 stack.enter_context(set_callback_duration(loop))
877 stack.enter_context(set_app(self))
878 stack.enter_context(self._enable_breakpointhook())
879
880 f = stack.enter_context(create_future(loop))
881
882 try:
883 return await _run_async(f)
884 finally:
885 # Wait for the background tasks to be done. This needs to
886 # go in the finally! If `_run_async` raises
887 # `KeyboardInterrupt`, we still want to wait for the
888 # background tasks.
889 await self.cancel_and_wait_for_background_tasks()
890
891 # The `ExitStack` above is defined in typeshed in a way that it can
892 # swallow exceptions. Without next line, mypy would think that there's
893 # a possibility we don't return here. See:
894 # https://github.com/python/mypy/issues/7726
895 assert False, "unreachable"
896
897 def run(
898 self,
899 pre_run: Callable[[], None] | None = None,
900 set_exception_handler: bool = True,
901 handle_sigint: bool = True,
902 in_thread: bool = False,
903 inputhook: InputHook | None = None,
904 ) -> _AppResult:
905 """
906 A blocking 'run' call that waits until the UI is finished.
907
908 This will run the application in a fresh asyncio event loop.
909
910 :param pre_run: Optional callable, which is called right after the
911 "reset" of the application.
912 :param set_exception_handler: When set, in case of an exception, go out
913 of the alternate screen and hide the application, display the
914 exception, and wait for the user to press ENTER.
915 :param in_thread: When true, run the application in a background
916 thread, and block the current thread until the application
917 terminates. This is useful if we need to be sure the application
918 won't use the current event loop (asyncio does not support nested
919 event loops). A new event loop will be created in this background
920 thread, and that loop will also be closed when the background
921 thread terminates. When this is used, it's especially important to
922 make sure that all asyncio background tasks are managed through
923 `get_app().create_background_task()`, so that unfinished tasks are
924 properly cancelled before the event loop is closed. This is used
925 for instance in ptpython.
926 :param handle_sigint: Handle SIGINT signal. Call the key binding for
927 `Keys.SIGINT`. (This only works in the main thread.)
928 """
929 if in_thread:
930 result: _AppResult
931 exception: BaseException | None = None
932
933 def run_in_thread() -> None:
934 nonlocal result, exception
935 try:
936 result = self.run(
937 pre_run=pre_run,
938 set_exception_handler=set_exception_handler,
939 # Signal handling only works in the main thread.
940 handle_sigint=False,
941 inputhook=inputhook,
942 )
943 except BaseException as e:
944 exception = e
945
946 thread = threading.Thread(target=run_in_thread)
947 thread.start()
948 thread.join()
949
950 if exception is not None:
951 raise exception
952 return result
953
954 coro = self.run_async(
955 pre_run=pre_run,
956 set_exception_handler=set_exception_handler,
957 handle_sigint=handle_sigint,
958 )
959
960 def _called_from_ipython() -> bool:
961 try:
962 return (
963 sys.modules["IPython"].version_info < (8, 18, 0, "")
964 and "IPython/terminal/interactiveshell.py"
965 in sys._getframe(3).f_code.co_filename
966 )
967 except BaseException:
968 return False
969
970 if inputhook is not None:
971 # Create new event loop with given input hook and run the app.
972 # In Python 3.12, we can use asyncio.run(loop_factory=...)
973 # For now, use `run_until_complete()`.
974 loop = new_eventloop_with_inputhook(inputhook)
975 result = loop.run_until_complete(coro)
976 loop.run_until_complete(loop.shutdown_asyncgens())
977 loop.close()
978 return result
979
980 elif _called_from_ipython():
981 # workaround to make input hooks work for IPython until
982 # https://github.com/ipython/ipython/pull/14241 is merged.
983 # IPython was setting the input hook by installing an event loop
984 # previously.
985 try:
986 # See whether a loop was installed already. If so, use that.
987 # That's required for the input hooks to work, they are
988 # installed using `set_event_loop`.
989 loop = asyncio.get_event_loop()
990 except RuntimeError:
991 # No loop installed. Run like usual.
992 return asyncio.run(coro)
993 else:
994 # Use existing loop.
995 return loop.run_until_complete(coro)
996
997 else:
998 # No loop installed. Run like usual.
999 return asyncio.run(coro)
1000
1001 def _handle_exception(
1002 self, loop: AbstractEventLoop, context: dict[str, Any]
1003 ) -> None:
1004 """
1005 Handler for event loop exceptions.
1006 This will print the exception, using run_in_terminal.
1007 """
1008 # For Python 2: we have to get traceback at this point, because
1009 # we're still in the 'except:' block of the event loop where the
1010 # traceback is still available. Moving this code in the
1011 # 'print_exception' coroutine will loose the exception.
1012 tb = get_traceback_from_context(context)
1013 formatted_tb = "".join(format_tb(tb))
1014
1015 async def in_term() -> None:
1016 async with in_terminal():
1017 # Print output. Similar to 'loop.default_exception_handler',
1018 # but don't use logger. (This works better on Python 2.)
1019 print("\nUnhandled exception in event loop:")
1020 print(formatted_tb)
1021 print("Exception {}".format(context.get("exception")))
1022
1023 await _do_wait_for_enter("Press ENTER to continue...")
1024
1025 ensure_future(in_term())
1026
1027 @contextmanager
1028 def _enable_breakpointhook(self) -> Generator[None, None, None]:
1029 """
1030 Install our custom breakpointhook for the duration of this context
1031 manager. (We will only install the hook if no other custom hook was
1032 set.)
1033 """
1034 if sys.breakpointhook == sys.__breakpointhook__:
1035 sys.breakpointhook = self._breakpointhook
1036
1037 try:
1038 yield
1039 finally:
1040 sys.breakpointhook = sys.__breakpointhook__
1041 else:
1042 yield
1043
1044 def _breakpointhook(self, *a: object, **kw: object) -> None:
1045 """
1046 Breakpointhook which uses PDB, but ensures that the application is
1047 hidden and input echoing is restored during each debugger dispatch.
1048
1049 This can be called from any thread. In any case, the application's
1050 event loop will be blocked while the PDB input is displayed. The event
1051 will continue after leaving the debugger.
1052 """
1053 app = self
1054 # Inline import on purpose. We don't want to import pdb, if not needed.
1055 import pdb
1056 from types import FrameType
1057
1058 TraceDispatch = Callable[[FrameType, str, Any], Any]
1059
1060 @contextmanager
1061 def hide_app_from_eventloop_thread() -> Generator[None, None, None]:
1062 """Stop application if `__breakpointhook__` is called from within
1063 the App's event loop."""
1064 # Hide application.
1065 app.renderer.erase()
1066
1067 # Detach input and dispatch to debugger.
1068 with app.input.detach():
1069 with app.input.cooked_mode():
1070 yield
1071
1072 # Note: we don't render the application again here, because
1073 # there's a good chance that there's a breakpoint on the next
1074 # line. This paint/erase cycle would move the PDB prompt back
1075 # to the middle of the screen.
1076
1077 @contextmanager
1078 def hide_app_from_other_thread() -> Generator[None, None, None]:
1079 """Stop application if `__breakpointhook__` is called from a
1080 thread other than the App's event loop."""
1081 ready = threading.Event()
1082 done = threading.Event()
1083
1084 async def in_loop() -> None:
1085 # from .run_in_terminal import in_terminal
1086 # async with in_terminal():
1087 # ready.set()
1088 # await asyncio.get_running_loop().run_in_executor(None, done.wait)
1089 # return
1090
1091 # Hide application.
1092 app.renderer.erase()
1093
1094 # Detach input and dispatch to debugger.
1095 with app.input.detach():
1096 with app.input.cooked_mode():
1097 ready.set()
1098 # Here we block the App's event loop thread until the
1099 # debugger resumes. We could have used `with
1100 # run_in_terminal.in_terminal():` like the commented
1101 # code above, but it seems to work better if we
1102 # completely stop the main event loop while debugging.
1103 done.wait()
1104
1105 self.create_background_task(in_loop())
1106 ready.wait()
1107 try:
1108 yield
1109 finally:
1110 done.set()
1111
1112 class CustomPdb(pdb.Pdb):
1113 def trace_dispatch(
1114 self, frame: FrameType, event: str, arg: Any
1115 ) -> TraceDispatch:
1116 if app._loop_thread is None:
1117 return super().trace_dispatch(frame, event, arg)
1118
1119 if app._loop_thread == threading.current_thread():
1120 with hide_app_from_eventloop_thread():
1121 return super().trace_dispatch(frame, event, arg)
1122
1123 with hide_app_from_other_thread():
1124 return super().trace_dispatch(frame, event, arg)
1125
1126 frame = sys._getframe().f_back
1127 CustomPdb(stdout=sys.__stdout__).set_trace(frame)
1128
1129 def create_background_task(
1130 self, coroutine: Coroutine[Any, Any, None]
1131 ) -> asyncio.Task[None]:
1132 """
1133 Start a background task (coroutine) for the running application. When
1134 the `Application` terminates, unfinished background tasks will be
1135 cancelled.
1136
1137 Given that we still support Python versions before 3.11, we can't use
1138 task groups (and exception groups), because of that, these background
1139 tasks are not allowed to raise exceptions. If they do, we'll call the
1140 default exception handler from the event loop.
1141
1142 If at some point, we have Python 3.11 as the minimum supported Python
1143 version, then we can use a `TaskGroup` (with the lifetime of
1144 `Application.run_async()`, and run run the background tasks in there.
1145
1146 This is not threadsafe.
1147 """
1148 loop = self.loop or get_running_loop()
1149 task: asyncio.Task[None] = loop.create_task(coroutine)
1150 self._background_tasks.add(task)
1151
1152 task.add_done_callback(self._on_background_task_done)
1153 return task
1154
1155 def _on_background_task_done(self, task: asyncio.Task[None]) -> None:
1156 """
1157 Called when a background task completes. Remove it from
1158 `_background_tasks`, and handle exceptions if any.
1159 """
1160 self._background_tasks.discard(task)
1161
1162 if task.cancelled():
1163 return
1164
1165 exc = task.exception()
1166 if exc is not None:
1167 get_running_loop().call_exception_handler(
1168 {
1169 "message": f"prompt_toolkit.Application background task {task!r} "
1170 "raised an unexpected exception.",
1171 "exception": exc,
1172 "task": task,
1173 }
1174 )
1175
1176 async def cancel_and_wait_for_background_tasks(self) -> None:
1177 """
1178 Cancel all background tasks, and wait for the cancellation to complete.
1179 If any of the background tasks raised an exception, this will also
1180 propagate the exception.
1181
1182 (If we had nurseries like Trio, this would be the `__aexit__` of a
1183 nursery.)
1184 """
1185 for task in self._background_tasks:
1186 task.cancel()
1187
1188 # Wait until the cancellation of the background tasks completes.
1189 # `asyncio.wait()` does not propagate exceptions raised within any of
1190 # these tasks, which is what we want. Otherwise, we can't distinguish
1191 # between a `CancelledError` raised in this task because it got
1192 # cancelled, and a `CancelledError` raised on this `await` checkpoint,
1193 # because *we* got cancelled during the teardown of the application.
1194 # (If we get cancelled here, then it's important to not suppress the
1195 # `CancelledError`, and have it propagate.)
1196 # NOTE: Currently, if we get cancelled at this point then we can't wait
1197 # for the cancellation to complete (in the future, we should be
1198 # using anyio or Python's 3.11 TaskGroup.)
1199 # Also, if we had exception groups, we could propagate an
1200 # `ExceptionGroup` if something went wrong here. Right now, we
1201 # don't propagate exceptions, but have them printed in
1202 # `_on_background_task_done`.
1203 if len(self._background_tasks) > 0:
1204 await asyncio.wait(
1205 self._background_tasks, timeout=None, return_when=asyncio.ALL_COMPLETED
1206 )
1207
1208 async def _poll_output_size(self) -> None:
1209 """
1210 Coroutine for polling the terminal dimensions.
1211
1212 Useful for situations where `attach_winch_signal_handler` is not sufficient:
1213 - If we are not running in the main thread.
1214 - On Windows.
1215 """
1216 size: Size | None = None
1217 interval = self.terminal_size_polling_interval
1218
1219 if interval is None:
1220 return
1221
1222 while True:
1223 await asyncio.sleep(interval)
1224 new_size = self.output.get_size()
1225
1226 if size is not None and new_size != size:
1227 self._on_resize()
1228 size = new_size
1229
1230 def cpr_not_supported_callback(self) -> None:
1231 """
1232 Called when we don't receive the cursor position response in time.
1233 """
1234 if not self.output.responds_to_cpr:
1235 return # We know about this already.
1236
1237 def in_terminal() -> None:
1238 self.output.write(
1239 "WARNING: your terminal doesn't support cursor position requests (CPR).\r\n"
1240 )
1241 self.output.flush()
1242
1243 run_in_terminal(in_terminal)
1244
1245 @overload
1246 def exit(self) -> None:
1247 "Exit without arguments."
1248
1249 @overload
1250 def exit(self, *, result: _AppResult, style: str = "") -> None:
1251 "Exit with `_AppResult`."
1252
1253 @overload
1254 def exit(
1255 self, *, exception: BaseException | type[BaseException], style: str = ""
1256 ) -> None:
1257 "Exit with exception."
1258
1259 def exit(
1260 self,
1261 result: _AppResult | None = None,
1262 exception: BaseException | type[BaseException] | None = None,
1263 style: str = "",
1264 ) -> None:
1265 """
1266 Exit application.
1267
1268 .. note::
1269
1270 If `Application.exit` is called before `Application.run()` is
1271 called, then the `Application` won't exit (because the
1272 `Application.future` doesn't correspond to the current run). Use a
1273 `pre_run` hook and an event to synchronize the closing if there's a
1274 chance this can happen.
1275
1276 :param result: Set this result for the application.
1277 :param exception: Set this exception as the result for an application. For
1278 a prompt, this is often `EOFError` or `KeyboardInterrupt`.
1279 :param style: Apply this style on the whole content when quitting,
1280 often this is 'class:exiting' for a prompt. (Used when
1281 `erase_when_done` is not set.)
1282 """
1283 assert result is None or exception is None
1284
1285 if self.future is None:
1286 raise Exception("Application is not running. Application.exit() failed.")
1287
1288 if self.future.done():
1289 raise Exception("Return value already set. Application.exit() failed.")
1290
1291 self.exit_style = style
1292
1293 if exception is not None:
1294 self.future.set_exception(exception)
1295 else:
1296 self.future.set_result(cast(_AppResult, result))
1297
1298 def _request_absolute_cursor_position(self) -> None:
1299 """
1300 Send CPR request.
1301 """
1302 # Note: only do this if the input queue is not empty, and a return
1303 # value has not been set. Otherwise, we won't be able to read the
1304 # response anyway.
1305 if not self.key_processor.input_queue and not self.is_done:
1306 self.renderer.request_absolute_cursor_position()
1307
1308 async def run_system_command(
1309 self,
1310 command: str,
1311 wait_for_enter: bool = True,
1312 display_before_text: AnyFormattedText = "",
1313 wait_text: str = "Press ENTER to continue...",
1314 ) -> None:
1315 """
1316 Run system command (While hiding the prompt. When finished, all the
1317 output will scroll above the prompt.)
1318
1319 :param command: Shell command to be executed.
1320 :param wait_for_enter: FWait for the user to press enter, when the
1321 command is finished.
1322 :param display_before_text: If given, text to be displayed before the
1323 command executes.
1324 :return: A `Future` object.
1325 """
1326 async with in_terminal():
1327 # Try to use the same input/output file descriptors as the one,
1328 # used to run this application.
1329 try:
1330 input_fd = self.input.fileno()
1331 except AttributeError:
1332 input_fd = sys.stdin.fileno()
1333 try:
1334 output_fd = self.output.fileno()
1335 except AttributeError:
1336 output_fd = sys.stdout.fileno()
1337
1338 # Run sub process.
1339 def run_command() -> None:
1340 self.print_text(display_before_text)
1341 p = Popen(command, shell=True, stdin=input_fd, stdout=output_fd)
1342 p.wait()
1343
1344 await run_in_executor_with_context(run_command)
1345
1346 # Wait for the user to press enter.
1347 if wait_for_enter:
1348 await _do_wait_for_enter(wait_text)
1349
1350 def suspend_to_background(self, suspend_group: bool = True) -> None:
1351 """
1352 (Not thread safe -- to be called from inside the key bindings.)
1353 Suspend process.
1354
1355 :param suspend_group: When true, suspend the whole process group.
1356 (This is the default, and probably what you want.)
1357 """
1358 # Only suspend when the operating system supports it.
1359 # (Not on Windows.)
1360 if _SIGTSTP is not None:
1361
1362 def run() -> None:
1363 signal = cast(int, _SIGTSTP)
1364 # Send `SIGTSTP` to own process.
1365 # This will cause it to suspend.
1366
1367 # Usually we want the whole process group to be suspended. This
1368 # handles the case when input is piped from another process.
1369 if suspend_group:
1370 os.kill(0, signal)
1371 else:
1372 os.kill(os.getpid(), signal)
1373
1374 run_in_terminal(run)
1375
1376 def print_text(
1377 self, text: AnyFormattedText, style: BaseStyle | None = None
1378 ) -> None:
1379 """
1380 Print a list of (style_str, text) tuples to the output.
1381 (When the UI is running, this method has to be called through
1382 `run_in_terminal`, otherwise it will destroy the UI.)
1383
1384 :param text: List of ``(style_str, text)`` tuples.
1385 :param style: Style class to use. Defaults to the active style in the CLI.
1386 """
1387 print_formatted_text(
1388 output=self.output,
1389 formatted_text=text,
1390 style=style or self._merged_style,
1391 color_depth=self.color_depth,
1392 style_transformation=self.style_transformation,
1393 )
1394
1395 @property
1396 def is_running(self) -> bool:
1397 "`True` when the application is currently active/running."
1398 return self._is_running
1399
1400 @property
1401 def is_done(self) -> bool:
1402 if self.future:
1403 return self.future.done()
1404 return False
1405
1406 def get_used_style_strings(self) -> list[str]:
1407 """
1408 Return a list of used style strings. This is helpful for debugging, and
1409 for writing a new `Style`.
1410 """
1411 attrs_for_style = self.renderer._attrs_for_style
1412
1413 if attrs_for_style:
1414 return sorted(
1415 re.sub(r"\s+", " ", style_str).strip()
1416 for style_str in attrs_for_style.keys()
1417 )
1418
1419 return []
1420
1421
1422class _CombinedRegistry(KeyBindingsBase):
1423 """
1424 The `KeyBindings` of key bindings for a `Application`.
1425 This merges the global key bindings with the one of the current user
1426 control.
1427 """
1428
1429 def __init__(self, app: Application[_AppResult]) -> None:
1430 self.app = app
1431 self._cache: SimpleCache[
1432 tuple[Window, frozenset[UIControl]], KeyBindingsBase
1433 ] = SimpleCache()
1434
1435 @property
1436 def _version(self) -> Hashable:
1437 """Not needed - this object is not going to be wrapped in another
1438 KeyBindings object."""
1439 raise NotImplementedError
1440
1441 @property
1442 def bindings(self) -> list[Binding]:
1443 """Not needed - this object is not going to be wrapped in another
1444 KeyBindings object."""
1445 raise NotImplementedError
1446
1447 def _create_key_bindings(
1448 self, current_window: Window, other_controls: list[UIControl]
1449 ) -> KeyBindingsBase:
1450 """
1451 Create a `KeyBindings` object that merges the `KeyBindings` from the
1452 `UIControl` with all the parent controls and the global key bindings.
1453 """
1454 key_bindings = []
1455 collected_containers = set()
1456
1457 # Collect key bindings from currently focused control and all parent
1458 # controls. Don't include key bindings of container parent controls.
1459 container: Container = current_window
1460 while True:
1461 collected_containers.add(container)
1462 kb = container.get_key_bindings()
1463 if kb is not None:
1464 key_bindings.append(kb)
1465
1466 if container.is_modal():
1467 break
1468
1469 parent = self.app.layout.get_parent(container)
1470 if parent is None:
1471 break
1472 else:
1473 container = parent
1474
1475 # Include global bindings (starting at the top-model container).
1476 for c in walk(container):
1477 if c not in collected_containers:
1478 kb = c.get_key_bindings()
1479 if kb is not None:
1480 key_bindings.append(GlobalOnlyKeyBindings(kb))
1481
1482 # Add App key bindings
1483 if self.app.key_bindings:
1484 key_bindings.append(self.app.key_bindings)
1485
1486 # Add mouse bindings.
1487 key_bindings.append(
1488 ConditionalKeyBindings(
1489 self.app._page_navigation_bindings,
1490 self.app.enable_page_navigation_bindings,
1491 )
1492 )
1493 key_bindings.append(self.app._default_bindings)
1494
1495 # Reverse this list. The current control's key bindings should come
1496 # last. They need priority.
1497 key_bindings = key_bindings[::-1]
1498
1499 return merge_key_bindings(key_bindings)
1500
1501 @property
1502 def _key_bindings(self) -> KeyBindingsBase:
1503 current_window = self.app.layout.current_window
1504 other_controls = list(self.app.layout.find_all_controls())
1505 key = current_window, frozenset(other_controls)
1506
1507 return self._cache.get(
1508 key, lambda: self._create_key_bindings(current_window, other_controls)
1509 )
1510
1511 def get_bindings_for_keys(self, keys: KeysTuple) -> list[Binding]:
1512 return self._key_bindings.get_bindings_for_keys(keys)
1513
1514 def get_bindings_starting_with_keys(self, keys: KeysTuple) -> list[Binding]:
1515 return self._key_bindings.get_bindings_starting_with_keys(keys)
1516
1517
1518async def _do_wait_for_enter(wait_text: AnyFormattedText) -> None:
1519 """
1520 Create a sub application to wait for the enter key press.
1521 This has two advantages over using 'input'/'raw_input':
1522 - This will share the same input/output I/O.
1523 - This doesn't block the event loop.
1524 """
1525 from prompt_toolkit.shortcuts import PromptSession
1526
1527 key_bindings = KeyBindings()
1528
1529 @key_bindings.add("enter")
1530 def _ok(event: E) -> None:
1531 event.app.exit()
1532
1533 @key_bindings.add(Keys.Any)
1534 def _ignore(event: E) -> None:
1535 "Disallow typing."
1536 pass
1537
1538 session: PromptSession[None] = PromptSession(
1539 message=wait_text, key_bindings=key_bindings
1540 )
1541 try:
1542 await session.app.run_async()
1543 except KeyboardInterrupt:
1544 pass # Control-c pressed. Don't propagate this error.
1545
1546
1547@contextmanager
1548def attach_winch_signal_handler(
1549 handler: Callable[[], None],
1550) -> Generator[None, None, None]:
1551 """
1552 Attach the given callback as a WINCH signal handler within the context
1553 manager. Restore the original signal handler when done.
1554
1555 The `Application.run` method will register SIGWINCH, so that it will
1556 properly repaint when the terminal window resizes. However, using
1557 `run_in_terminal`, we can temporarily send an application to the
1558 background, and run an other app in between, which will then overwrite the
1559 SIGWINCH. This is why it's important to restore the handler when the app
1560 terminates.
1561 """
1562 # The tricky part here is that signals are registered in the Unix event
1563 # loop with a wakeup fd, but another application could have registered
1564 # signals using signal.signal directly. For now, the implementation is
1565 # hard-coded for the `asyncio.unix_events._UnixSelectorEventLoop`.
1566
1567 # No WINCH? Then don't do anything.
1568 sigwinch = getattr(signal, "SIGWINCH", None)
1569 if sigwinch is None or not in_main_thread():
1570 yield
1571 return
1572
1573 # Keep track of the previous handler.
1574 # (Only UnixSelectorEventloop has `_signal_handlers`.)
1575 loop = get_running_loop()
1576 previous_winch_handler = getattr(loop, "_signal_handlers", {}).get(sigwinch)
1577
1578 try:
1579 loop.add_signal_handler(sigwinch, handler)
1580 yield
1581 finally:
1582 # Restore the previous signal handler.
1583 loop.remove_signal_handler(sigwinch)
1584 if previous_winch_handler is not None:
1585 loop.add_signal_handler(
1586 sigwinch,
1587 previous_winch_handler._callback,
1588 *previous_winch_handler._args,
1589 )
1590
1591
1592@contextmanager
1593def _restore_sigint_from_ctypes() -> Generator[None, None, None]:
1594 # The following functions are part of the stable ABI since python 3.2
1595 # See: https://docs.python.org/3/c-api/sys.html#c.PyOS_getsig
1596 # Inline import: these are not available on Pypy.
1597 try:
1598 from ctypes import c_int, c_void_p, pythonapi
1599 except ImportError:
1600 have_ctypes_signal = False
1601 else:
1602 # GraalPy has the functions, but they don't work
1603 have_ctypes_signal = sys.implementation.name != "graalpy"
1604
1605 if have_ctypes_signal:
1606 # PyOS_sighandler_t PyOS_getsig(int i)
1607 pythonapi.PyOS_getsig.restype = c_void_p
1608 pythonapi.PyOS_getsig.argtypes = (c_int,)
1609
1610 # PyOS_sighandler_t PyOS_setsig(int i, PyOS_sighandler_t h)
1611 pythonapi.PyOS_setsig.restype = c_void_p
1612 pythonapi.PyOS_setsig.argtypes = (
1613 c_int,
1614 c_void_p,
1615 )
1616
1617 sigint = signal.getsignal(signal.SIGINT)
1618 if have_ctypes_signal:
1619 sigint_os = pythonapi.PyOS_getsig(signal.SIGINT)
1620
1621 try:
1622 yield
1623 finally:
1624 if sigint is not None:
1625 signal.signal(signal.SIGINT, sigint)
1626 if have_ctypes_signal:
1627 pythonapi.PyOS_setsig(signal.SIGINT, sigint_os)