Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/prompt_toolkit/application/application.py: 19%
581 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
1from __future__ import annotations
3import asyncio
4import contextvars
5import os
6import re
7import signal
8import sys
9import threading
10import time
11from asyncio import (
12 AbstractEventLoop,
13 Future,
14 Task,
15 ensure_future,
16 get_running_loop,
17 sleep,
18)
19from contextlib import ExitStack, contextmanager
20from subprocess import Popen
21from traceback import format_tb
22from typing import (
23 Any,
24 Callable,
25 Coroutine,
26 Generator,
27 Generic,
28 Hashable,
29 Iterable,
30 Iterator,
31 TypeVar,
32 cast,
33 overload,
34)
36from prompt_toolkit.buffer import Buffer
37from prompt_toolkit.cache import SimpleCache
38from prompt_toolkit.clipboard import Clipboard, InMemoryClipboard
39from prompt_toolkit.cursor_shapes import AnyCursorShapeConfig, to_cursor_shape_config
40from prompt_toolkit.data_structures import Size
41from prompt_toolkit.enums import EditingMode
42from prompt_toolkit.eventloop import (
43 get_traceback_from_context,
44 run_in_executor_with_context,
45)
46from prompt_toolkit.eventloop.utils import call_soon_threadsafe
47from prompt_toolkit.filters import Condition, Filter, FilterOrBool, to_filter
48from prompt_toolkit.formatted_text import AnyFormattedText
49from prompt_toolkit.input.base import Input
50from prompt_toolkit.input.typeahead import get_typeahead, store_typeahead
51from prompt_toolkit.key_binding.bindings.page_navigation import (
52 load_page_navigation_bindings,
53)
54from prompt_toolkit.key_binding.defaults import load_key_bindings
55from prompt_toolkit.key_binding.emacs_state import EmacsState
56from prompt_toolkit.key_binding.key_bindings import (
57 Binding,
58 ConditionalKeyBindings,
59 GlobalOnlyKeyBindings,
60 KeyBindings,
61 KeyBindingsBase,
62 KeysTuple,
63 merge_key_bindings,
64)
65from prompt_toolkit.key_binding.key_processor import KeyPressEvent, KeyProcessor
66from prompt_toolkit.key_binding.vi_state import ViState
67from prompt_toolkit.keys import Keys
68from prompt_toolkit.layout.containers import Container, Window
69from prompt_toolkit.layout.controls import BufferControl, UIControl
70from prompt_toolkit.layout.dummy import create_dummy_layout
71from prompt_toolkit.layout.layout import Layout, walk
72from prompt_toolkit.output import ColorDepth, Output
73from prompt_toolkit.renderer import Renderer, print_formatted_text
74from prompt_toolkit.search import SearchState
75from prompt_toolkit.styles import (
76 BaseStyle,
77 DummyStyle,
78 DummyStyleTransformation,
79 DynamicStyle,
80 StyleTransformation,
81 default_pygments_style,
82 default_ui_style,
83 merge_styles,
84)
85from prompt_toolkit.utils import Event, in_main_thread
87from .current import get_app_session, set_app
88from .run_in_terminal import in_terminal, run_in_terminal
90__all__ = [
91 "Application",
92]
95E = KeyPressEvent
96_AppResult = TypeVar("_AppResult")
97ApplicationEventHandler = Callable[["Application[_AppResult]"], None]
99_SIGWINCH = getattr(signal, "SIGWINCH", None)
100_SIGTSTP = getattr(signal, "SIGTSTP", None)
103class Application(Generic[_AppResult]):
104 """
105 The main Application class!
106 This glues everything together.
108 :param layout: A :class:`~prompt_toolkit.layout.Layout` instance.
109 :param key_bindings:
110 :class:`~prompt_toolkit.key_binding.KeyBindingsBase` instance for
111 the key bindings.
112 :param clipboard: :class:`~prompt_toolkit.clipboard.Clipboard` to use.
113 :param full_screen: When True, run the application on the alternate screen buffer.
114 :param color_depth: Any :class:`~.ColorDepth` value, a callable that
115 returns a :class:`~.ColorDepth` or `None` for default.
116 :param erase_when_done: (bool) Clear the application output when it finishes.
117 :param reverse_vi_search_direction: Normally, in Vi mode, a '/' searches
118 forward and a '?' searches backward. In Readline mode, this is usually
119 reversed.
120 :param min_redraw_interval: Number of seconds to wait between redraws. Use
121 this for applications where `invalidate` is called a lot. This could cause
122 a lot of terminal output, which some terminals are not able to process.
124 `None` means that every `invalidate` will be scheduled right away
125 (which is usually fine).
127 When one `invalidate` is called, but a scheduled redraw of a previous
128 `invalidate` call has not been executed yet, nothing will happen in any
129 case.
131 :param max_render_postpone_time: When there is high CPU (a lot of other
132 scheduled calls), postpone the rendering max x seconds. '0' means:
133 don't postpone. '.5' means: try to draw at least twice a second.
135 :param refresh_interval: Automatically invalidate the UI every so many
136 seconds. When `None` (the default), only invalidate when `invalidate`
137 has been called.
139 :param terminal_size_polling_interval: Poll the terminal size every so many
140 seconds. Useful if the applications runs in a thread other then then
141 main thread where SIGWINCH can't be handled, or on Windows.
143 Filters:
145 :param mouse_support: (:class:`~prompt_toolkit.filters.Filter` or
146 boolean). When True, enable mouse support.
147 :param paste_mode: :class:`~prompt_toolkit.filters.Filter` or boolean.
148 :param editing_mode: :class:`~prompt_toolkit.enums.EditingMode`.
150 :param enable_page_navigation_bindings: When `True`, enable the page
151 navigation key bindings. These include both Emacs and Vi bindings like
152 page-up, page-down and so on to scroll through pages. Mostly useful for
153 creating an editor or other full screen applications. Probably, you
154 don't want this for the implementation of a REPL. By default, this is
155 enabled if `full_screen` is set.
157 Callbacks (all of these should accept an
158 :class:`~prompt_toolkit.application.Application` object as input.)
160 :param on_reset: Called during reset.
161 :param on_invalidate: Called when the UI has been invalidated.
162 :param before_render: Called right before rendering.
163 :param after_render: Called right after rendering.
165 I/O:
166 (Note that the preferred way to change the input/output is by creating an
167 `AppSession` with the required input/output objects. If you need multiple
168 applications running at the same time, you have to create a separate
169 `AppSession` using a `with create_app_session():` block.
171 :param input: :class:`~prompt_toolkit.input.Input` instance.
172 :param output: :class:`~prompt_toolkit.output.Output` instance. (Probably
173 Vt100_Output or Win32Output.)
175 Usage:
177 app = Application(...)
178 app.run()
180 # Or
181 await app.run_async()
182 """
184 def __init__(
185 self,
186 layout: Layout | None = None,
187 style: BaseStyle | None = None,
188 include_default_pygments_style: FilterOrBool = True,
189 style_transformation: StyleTransformation | None = None,
190 key_bindings: KeyBindingsBase | None = None,
191 clipboard: Clipboard | None = None,
192 full_screen: bool = False,
193 color_depth: (ColorDepth | Callable[[], ColorDepth | None] | None) = None,
194 mouse_support: FilterOrBool = False,
195 enable_page_navigation_bindings: None
196 | (FilterOrBool) = None, # Can be None, True or False.
197 paste_mode: FilterOrBool = False,
198 editing_mode: EditingMode = EditingMode.EMACS,
199 erase_when_done: bool = False,
200 reverse_vi_search_direction: FilterOrBool = False,
201 min_redraw_interval: float | int | None = None,
202 max_render_postpone_time: float | int | None = 0.01,
203 refresh_interval: float | None = None,
204 terminal_size_polling_interval: float | None = 0.5,
205 cursor: AnyCursorShapeConfig = None,
206 on_reset: ApplicationEventHandler[_AppResult] | None = None,
207 on_invalidate: ApplicationEventHandler[_AppResult] | None = None,
208 before_render: ApplicationEventHandler[_AppResult] | None = None,
209 after_render: ApplicationEventHandler[_AppResult] | None = None,
210 # I/O.
211 input: Input | None = None,
212 output: Output | None = None,
213 ) -> None:
214 # If `enable_page_navigation_bindings` is not specified, enable it in
215 # case of full screen applications only. This can be overridden by the user.
216 if enable_page_navigation_bindings is None:
217 enable_page_navigation_bindings = Condition(lambda: self.full_screen)
219 paste_mode = to_filter(paste_mode)
220 mouse_support = to_filter(mouse_support)
221 reverse_vi_search_direction = to_filter(reverse_vi_search_direction)
222 enable_page_navigation_bindings = to_filter(enable_page_navigation_bindings)
223 include_default_pygments_style = to_filter(include_default_pygments_style)
225 if layout is None:
226 layout = create_dummy_layout()
228 if style_transformation is None:
229 style_transformation = DummyStyleTransformation()
231 self.style = style
232 self.style_transformation = style_transformation
234 # Key bindings.
235 self.key_bindings = key_bindings
236 self._default_bindings = load_key_bindings()
237 self._page_navigation_bindings = load_page_navigation_bindings()
239 self.layout = layout
240 self.clipboard = clipboard or InMemoryClipboard()
241 self.full_screen: bool = full_screen
242 self._color_depth = color_depth
243 self.mouse_support = mouse_support
245 self.paste_mode = paste_mode
246 self.editing_mode = editing_mode
247 self.erase_when_done = erase_when_done
248 self.reverse_vi_search_direction = reverse_vi_search_direction
249 self.enable_page_navigation_bindings = enable_page_navigation_bindings
250 self.min_redraw_interval = min_redraw_interval
251 self.max_render_postpone_time = max_render_postpone_time
252 self.refresh_interval = refresh_interval
253 self.terminal_size_polling_interval = terminal_size_polling_interval
255 self.cursor = to_cursor_shape_config(cursor)
257 # Events.
258 self.on_invalidate = Event(self, on_invalidate)
259 self.on_reset = Event(self, on_reset)
260 self.before_render = Event(self, before_render)
261 self.after_render = Event(self, after_render)
263 # I/O.
264 session = get_app_session()
265 self.output = output or session.output
266 self.input = input or session.input
268 # List of 'extra' functions to execute before a Application.run.
269 self.pre_run_callables: list[Callable[[], None]] = []
271 self._is_running = False
272 self.future: Future[_AppResult] | None = None
273 self.loop: AbstractEventLoop | None = None
274 self._loop_thread: threading.Thread | None = None
275 self.context: contextvars.Context | None = None
277 #: Quoted insert. This flag is set if we go into quoted insert mode.
278 self.quoted_insert = False
280 #: Vi state. (For Vi key bindings.)
281 self.vi_state = ViState()
282 self.emacs_state = EmacsState()
284 #: When to flush the input (For flushing escape keys.) This is important
285 #: on terminals that use vt100 input. We can't distinguish the escape
286 #: key from for instance the left-arrow key, if we don't know what follows
287 #: after "\x1b". This little timer will consider "\x1b" to be escape if
288 #: nothing did follow in this time span.
289 #: This seems to work like the `ttimeoutlen` option in Vim.
290 self.ttimeoutlen = 0.5 # Seconds.
292 #: Like Vim's `timeoutlen` option. This can be `None` or a float. For
293 #: instance, suppose that we have a key binding AB and a second key
294 #: binding A. If the uses presses A and then waits, we don't handle
295 #: this binding yet (unless it was marked 'eager'), because we don't
296 #: know what will follow. This timeout is the maximum amount of time
297 #: that we wait until we call the handlers anyway. Pass `None` to
298 #: disable this timeout.
299 self.timeoutlen = 1.0
301 #: The `Renderer` instance.
302 # Make sure that the same stdout is used, when a custom renderer has been passed.
303 self._merged_style = self._create_merged_style(include_default_pygments_style)
305 self.renderer = Renderer(
306 self._merged_style,
307 self.output,
308 full_screen=full_screen,
309 mouse_support=mouse_support,
310 cpr_not_supported_callback=self.cpr_not_supported_callback,
311 )
313 #: Render counter. This one is increased every time the UI is rendered.
314 #: It can be used as a key for caching certain information during one
315 #: rendering.
316 self.render_counter = 0
318 # Invalidate flag. When 'True', a repaint has been scheduled.
319 self._invalidated = False
320 self._invalidate_events: list[
321 Event[object]
322 ] = [] # Collection of 'invalidate' Event objects.
323 self._last_redraw_time = 0.0 # Unix timestamp of last redraw. Used when
324 # `min_redraw_interval` is given.
326 #: The `InputProcessor` instance.
327 self.key_processor = KeyProcessor(_CombinedRegistry(self))
329 # If `run_in_terminal` was called. This will point to a `Future` what will be
330 # set at the point when the previous run finishes.
331 self._running_in_terminal = False
332 self._running_in_terminal_f: Future[None] | None = None
334 # Trigger initialize callback.
335 self.reset()
337 def _create_merged_style(self, include_default_pygments_style: Filter) -> BaseStyle:
338 """
339 Create a `Style` object that merges the default UI style, the default
340 pygments style, and the custom user style.
341 """
342 dummy_style = DummyStyle()
343 pygments_style = default_pygments_style()
345 @DynamicStyle
346 def conditional_pygments_style() -> BaseStyle:
347 if include_default_pygments_style():
348 return pygments_style
349 else:
350 return dummy_style
352 return merge_styles(
353 [
354 default_ui_style(),
355 conditional_pygments_style,
356 DynamicStyle(lambda: self.style),
357 ]
358 )
360 @property
361 def color_depth(self) -> ColorDepth:
362 """
363 The active :class:`.ColorDepth`.
365 The current value is determined as follows:
367 - If a color depth was given explicitly to this application, use that
368 value.
369 - Otherwise, fall back to the color depth that is reported by the
370 :class:`.Output` implementation. If the :class:`.Output` class was
371 created using `output.defaults.create_output`, then this value is
372 coming from the $PROMPT_TOOLKIT_COLOR_DEPTH environment variable.
373 """
374 depth = self._color_depth
376 if callable(depth):
377 depth = depth()
379 if depth is None:
380 depth = self.output.get_default_color_depth()
382 return depth
384 @property
385 def current_buffer(self) -> Buffer:
386 """
387 The currently focused :class:`~.Buffer`.
389 (This returns a dummy :class:`.Buffer` when none of the actual buffers
390 has the focus. In this case, it's really not practical to check for
391 `None` values or catch exceptions every time.)
392 """
393 return self.layout.current_buffer or Buffer(
394 name="dummy-buffer"
395 ) # Dummy buffer.
397 @property
398 def current_search_state(self) -> SearchState:
399 """
400 Return the current :class:`.SearchState`. (The one for the focused
401 :class:`.BufferControl`.)
402 """
403 ui_control = self.layout.current_control
404 if isinstance(ui_control, BufferControl):
405 return ui_control.search_state
406 else:
407 return SearchState() # Dummy search state. (Don't return None!)
409 def reset(self) -> None:
410 """
411 Reset everything, for reading the next input.
412 """
413 # Notice that we don't reset the buffers. (This happens just before
414 # returning, and when we have multiple buffers, we clearly want the
415 # content in the other buffers to remain unchanged between several
416 # calls of `run`. (And the same is true for the focus stack.)
418 self.exit_style = ""
420 self._background_tasks: set[Task[None]] = set()
422 self.renderer.reset()
423 self.key_processor.reset()
424 self.layout.reset()
425 self.vi_state.reset()
426 self.emacs_state.reset()
428 # Trigger reset event.
429 self.on_reset.fire()
431 # Make sure that we have a 'focusable' widget focused.
432 # (The `Layout` class can't determine this.)
433 layout = self.layout
435 if not layout.current_control.is_focusable():
436 for w in layout.find_all_windows():
437 if w.content.is_focusable():
438 layout.current_window = w
439 break
441 def invalidate(self) -> None:
442 """
443 Thread safe way of sending a repaint trigger to the input event loop.
444 """
445 if not self._is_running:
446 # Don't schedule a redraw if we're not running.
447 # Otherwise, `get_running_loop()` in `call_soon_threadsafe` can fail.
448 # See: https://github.com/dbcli/mycli/issues/797
449 return
451 # `invalidate()` called if we don't have a loop yet (not running?), or
452 # after the event loop was closed.
453 if self.loop is None or self.loop.is_closed():
454 return
456 # Never schedule a second redraw, when a previous one has not yet been
457 # executed. (This should protect against other threads calling
458 # 'invalidate' many times, resulting in 100% CPU.)
459 if self._invalidated:
460 return
461 else:
462 self._invalidated = True
464 # Trigger event.
465 self.loop.call_soon_threadsafe(self.on_invalidate.fire)
467 def redraw() -> None:
468 self._invalidated = False
469 self._redraw()
471 def schedule_redraw() -> None:
472 call_soon_threadsafe(
473 redraw, max_postpone_time=self.max_render_postpone_time, loop=self.loop
474 )
476 if self.min_redraw_interval:
477 # When a minimum redraw interval is set, wait minimum this amount
478 # of time between redraws.
479 diff = time.time() - self._last_redraw_time
480 if diff < self.min_redraw_interval:
482 async def redraw_in_future() -> None:
483 await sleep(cast(float, self.min_redraw_interval) - diff)
484 schedule_redraw()
486 self.loop.call_soon_threadsafe(
487 lambda: self.create_background_task(redraw_in_future())
488 )
489 else:
490 schedule_redraw()
491 else:
492 schedule_redraw()
494 @property
495 def invalidated(self) -> bool:
496 "True when a redraw operation has been scheduled."
497 return self._invalidated
499 def _redraw(self, render_as_done: bool = False) -> None:
500 """
501 Render the command line again. (Not thread safe!) (From other threads,
502 or if unsure, use :meth:`.Application.invalidate`.)
504 :param render_as_done: make sure to put the cursor after the UI.
505 """
507 def run_in_context() -> None:
508 # Only draw when no sub application was started.
509 if self._is_running and not self._running_in_terminal:
510 if self.min_redraw_interval:
511 self._last_redraw_time = time.time()
513 # Render
514 self.render_counter += 1
515 self.before_render.fire()
517 if render_as_done:
518 if self.erase_when_done:
519 self.renderer.erase()
520 else:
521 # Draw in 'done' state and reset renderer.
522 self.renderer.render(self, self.layout, is_done=render_as_done)
523 else:
524 self.renderer.render(self, self.layout)
526 self.layout.update_parents_relations()
528 # Fire render event.
529 self.after_render.fire()
531 self._update_invalidate_events()
533 # NOTE: We want to make sure this Application is the active one. The
534 # invalidate function is often called from a context where this
535 # application is not the active one. (Like the
536 # `PromptSession._auto_refresh_context`).
537 # We copy the context in case the context was already active, to
538 # prevent RuntimeErrors. (The rendering is not supposed to change
539 # any context variables.)
540 if self.context is not None:
541 self.context.copy().run(run_in_context)
543 def _start_auto_refresh_task(self) -> None:
544 """
545 Start a while/true loop in the background for automatic invalidation of
546 the UI.
547 """
548 if self.refresh_interval is not None and self.refresh_interval != 0:
550 async def auto_refresh(refresh_interval: float) -> None:
551 while True:
552 await sleep(refresh_interval)
553 self.invalidate()
555 self.create_background_task(auto_refresh(self.refresh_interval))
557 def _update_invalidate_events(self) -> None:
558 """
559 Make sure to attach 'invalidate' handlers to all invalidate events in
560 the UI.
561 """
562 # Remove all the original event handlers. (Components can be removed
563 # from the UI.)
564 for ev in self._invalidate_events:
565 ev -= self._invalidate_handler
567 # Gather all new events.
568 # (All controls are able to invalidate themselves.)
569 def gather_events() -> Iterable[Event[object]]:
570 for c in self.layout.find_all_controls():
571 yield from c.get_invalidate_events()
573 self._invalidate_events = list(gather_events())
575 for ev in self._invalidate_events:
576 ev += self._invalidate_handler
578 def _invalidate_handler(self, sender: object) -> None:
579 """
580 Handler for invalidate events coming from UIControls.
582 (This handles the difference in signature between event handler and
583 `self.invalidate`. It also needs to be a method -not a nested
584 function-, so that we can remove it again .)
585 """
586 self.invalidate()
588 def _on_resize(self) -> None:
589 """
590 When the window size changes, we erase the current output and request
591 again the cursor position. When the CPR answer arrives, the output is
592 drawn again.
593 """
594 # Erase, request position (when cursor is at the start position)
595 # and redraw again. -- The order is important.
596 self.renderer.erase(leave_alternate_screen=False)
597 self._request_absolute_cursor_position()
598 self._redraw()
600 def _pre_run(self, pre_run: Callable[[], None] | None = None) -> None:
601 """
602 Called during `run`.
604 `self.future` should be set to the new future at the point where this
605 is called in order to avoid data races. `pre_run` can be used to set a
606 `threading.Event` to synchronize with UI termination code, running in
607 another thread that would call `Application.exit`. (See the progress
608 bar code for an example.)
609 """
610 if pre_run:
611 pre_run()
613 # Process registered "pre_run_callables" and clear list.
614 for c in self.pre_run_callables:
615 c()
616 del self.pre_run_callables[:]
618 async def run_async(
619 self,
620 pre_run: Callable[[], None] | None = None,
621 set_exception_handler: bool = True,
622 handle_sigint: bool = True,
623 slow_callback_duration: float = 0.5,
624 ) -> _AppResult:
625 """
626 Run the prompt_toolkit :class:`~prompt_toolkit.application.Application`
627 until :meth:`~prompt_toolkit.application.Application.exit` has been
628 called. Return the value that was passed to
629 :meth:`~prompt_toolkit.application.Application.exit`.
631 This is the main entry point for a prompt_toolkit
632 :class:`~prompt_toolkit.application.Application` and usually the only
633 place where the event loop is actually running.
635 :param pre_run: Optional callable, which is called right after the
636 "reset" of the application.
637 :param set_exception_handler: When set, in case of an exception, go out
638 of the alternate screen and hide the application, display the
639 exception, and wait for the user to press ENTER.
640 :param handle_sigint: Handle SIGINT signal if possible. This will call
641 the `<sigint>` key binding when a SIGINT is received. (This only
642 works in the main thread.)
643 :param slow_callback_duration: Display warnings if code scheduled in
644 the asyncio event loop takes more time than this. The asyncio
645 default of `0.1` is sometimes not sufficient on a slow system,
646 because exceptionally, the drawing of the app, which happens in the
647 event loop, can take a bit longer from time to time.
648 """
649 assert not self._is_running, "Application is already running."
651 if not in_main_thread() or sys.platform == "win32":
652 # Handling signals in other threads is not supported.
653 # Also on Windows, `add_signal_handler(signal.SIGINT, ...)` raises
654 # `NotImplementedError`.
655 # See: https://github.com/prompt-toolkit/python-prompt-toolkit/issues/1553
656 handle_sigint = False
658 async def _run_async(f: "asyncio.Future[_AppResult]") -> _AppResult:
659 context = contextvars.copy_context()
660 self.context = context
662 # Counter for cancelling 'flush' timeouts. Every time when a key is
663 # pressed, we start a 'flush' timer for flushing our escape key. But
664 # when any subsequent input is received, a new timer is started and
665 # the current timer will be ignored.
666 flush_task: asyncio.Task[None] | None = None
668 # Reset.
669 # (`self.future` needs to be set when `pre_run` is called.)
670 self.reset()
671 self._pre_run(pre_run)
673 # Feed type ahead input first.
674 self.key_processor.feed_multiple(get_typeahead(self.input))
675 self.key_processor.process_keys()
677 def read_from_input() -> None:
678 nonlocal flush_task
680 # Ignore when we aren't running anymore. This callback will
681 # removed from the loop next time. (It could be that it was
682 # still in the 'tasks' list of the loop.)
683 # Except: if we need to process incoming CPRs.
684 if not self._is_running and not self.renderer.waiting_for_cpr:
685 return
687 # Get keys from the input object.
688 keys = self.input.read_keys()
690 # Feed to key processor.
691 self.key_processor.feed_multiple(keys)
692 self.key_processor.process_keys()
694 # Quit when the input stream was closed.
695 if self.input.closed:
696 if not f.done():
697 f.set_exception(EOFError)
698 else:
699 # Automatically flush keys.
700 if flush_task:
701 flush_task.cancel()
702 flush_task = self.create_background_task(auto_flush_input())
704 def read_from_input_in_context() -> None:
705 # Ensure that key bindings callbacks are always executed in the
706 # current context. This is important when key bindings are
707 # accessing contextvars. (These callbacks are currently being
708 # called from a different context. Underneath,
709 # `loop.add_reader` is used to register the stdin FD.)
710 # (We copy the context to avoid a `RuntimeError` in case the
711 # context is already active.)
712 context.copy().run(read_from_input)
714 async def auto_flush_input() -> None:
715 # Flush input after timeout.
716 # (Used for flushing the enter key.)
717 # This sleep can be cancelled, in that case we won't flush yet.
718 await sleep(self.ttimeoutlen)
719 flush_input()
721 def flush_input() -> None:
722 if not self.is_done:
723 # Get keys, and feed to key processor.
724 keys = self.input.flush_keys()
725 self.key_processor.feed_multiple(keys)
726 self.key_processor.process_keys()
728 if self.input.closed:
729 f.set_exception(EOFError)
731 # Enter raw mode, attach input and attach WINCH event handler.
732 with self.input.raw_mode(), self.input.attach(
733 read_from_input_in_context
734 ), attach_winch_signal_handler(self._on_resize):
735 # Draw UI.
736 self._request_absolute_cursor_position()
737 self._redraw()
738 self._start_auto_refresh_task()
740 self.create_background_task(self._poll_output_size())
742 # Wait for UI to finish.
743 try:
744 result = await f
745 finally:
746 # In any case, when the application finishes.
747 # (Successful, or because of an error.)
748 try:
749 self._redraw(render_as_done=True)
750 finally:
751 # _redraw has a good chance to fail if it calls widgets
752 # with bad code. Make sure to reset the renderer
753 # anyway.
754 self.renderer.reset()
756 # Unset `is_running`, this ensures that possibly
757 # scheduled draws won't paint during the following
758 # yield.
759 self._is_running = False
761 # Detach event handlers for invalidate events.
762 # (Important when a UIControl is embedded in multiple
763 # applications, like ptterm in pymux. An invalidate
764 # should not trigger a repaint in terminated
765 # applications.)
766 for ev in self._invalidate_events:
767 ev -= self._invalidate_handler
768 self._invalidate_events = []
770 # Wait for CPR responses.
771 if self.output.responds_to_cpr:
772 await self.renderer.wait_for_cpr_responses()
774 # Wait for the run-in-terminals to terminate.
775 previous_run_in_terminal_f = self._running_in_terminal_f
777 if previous_run_in_terminal_f:
778 await previous_run_in_terminal_f
780 # Store unprocessed input as typeahead for next time.
781 store_typeahead(self.input, self.key_processor.empty_queue())
783 return result
785 @contextmanager
786 def set_loop() -> Iterator[AbstractEventLoop]:
787 loop = get_running_loop()
788 self.loop = loop
789 self._loop_thread = threading.current_thread()
791 try:
792 yield loop
793 finally:
794 self.loop = None
795 self._loop_thread = None
797 @contextmanager
798 def set_is_running() -> Iterator[None]:
799 self._is_running = True
800 try:
801 yield
802 finally:
803 self._is_running = False
805 @contextmanager
806 def set_handle_sigint(loop: AbstractEventLoop) -> Iterator[None]:
807 if handle_sigint:
808 loop.add_signal_handler(
809 signal.SIGINT,
810 lambda *_: loop.call_soon_threadsafe(
811 self.key_processor.send_sigint
812 ),
813 )
814 try:
815 yield
816 finally:
817 loop.remove_signal_handler(signal.SIGINT)
818 else:
819 yield
821 @contextmanager
822 def set_exception_handler_ctx(loop: AbstractEventLoop) -> Iterator[None]:
823 if set_exception_handler:
824 previous_exc_handler = loop.get_exception_handler()
825 loop.set_exception_handler(self._handle_exception)
826 try:
827 yield
828 finally:
829 loop.set_exception_handler(previous_exc_handler)
831 else:
832 yield
834 @contextmanager
835 def set_callback_duration(loop: AbstractEventLoop) -> Iterator[None]:
836 # Set slow_callback_duration.
837 original_slow_callback_duration = loop.slow_callback_duration
838 loop.slow_callback_duration = slow_callback_duration
839 try:
840 yield
841 finally:
842 # Reset slow_callback_duration.
843 loop.slow_callback_duration = original_slow_callback_duration
845 @contextmanager
846 def create_future(
847 loop: AbstractEventLoop,
848 ) -> Iterator[asyncio.Future[_AppResult]]:
849 f = loop.create_future()
850 self.future = f # XXX: make sure to set this before calling '_redraw'.
852 try:
853 yield f
854 finally:
855 # Also remove the Future again. (This brings the
856 # application back to its initial state, where it also
857 # doesn't have a Future.)
858 self.future = None
860 with ExitStack() as stack:
861 stack.enter_context(set_is_running())
863 # Make sure to set `_invalidated` to `False` to begin with,
864 # otherwise we're not going to paint anything. This can happen if
865 # this application had run before on a different event loop, and a
866 # paint was scheduled using `call_soon_threadsafe` with
867 # `max_postpone_time`.
868 self._invalidated = False
870 loop = stack.enter_context(set_loop())
872 stack.enter_context(set_handle_sigint(loop))
873 stack.enter_context(set_exception_handler_ctx(loop))
874 stack.enter_context(set_callback_duration(loop))
875 stack.enter_context(set_app(self))
876 stack.enter_context(self._enable_breakpointhook())
878 f = stack.enter_context(create_future(loop))
880 try:
881 return await _run_async(f)
882 finally:
883 # Wait for the background tasks to be done. This needs to
884 # go in the finally! If `_run_async` raises
885 # `KeyboardInterrupt`, we still want to wait for the
886 # background tasks.
887 await self.cancel_and_wait_for_background_tasks()
889 # The `ExitStack` above is defined in typeshed in a way that it can
890 # swallow exceptions. Without next line, mypy would think that there's
891 # a possibility we don't return here. See:
892 # https://github.com/python/mypy/issues/7726
893 assert False, "unreachable"
895 def run(
896 self,
897 pre_run: Callable[[], None] | None = None,
898 set_exception_handler: bool = True,
899 handle_sigint: bool = True,
900 in_thread: bool = False,
901 ) -> _AppResult:
902 """
903 A blocking 'run' call that waits until the UI is finished.
905 This will start the current asyncio event loop. If no loop is set for
906 the current thread, then it will create a new loop. If a new loop was
907 created, this won't close the new loop (if `in_thread=False`).
909 :param pre_run: Optional callable, which is called right after the
910 "reset" of the application.
911 :param set_exception_handler: When set, in case of an exception, go out
912 of the alternate screen and hide the application, display the
913 exception, and wait for the user to press ENTER.
914 :param in_thread: When true, run the application in a background
915 thread, and block the current thread until the application
916 terminates. This is useful if we need to be sure the application
917 won't use the current event loop (asyncio does not support nested
918 event loops). A new event loop will be created in this background
919 thread, and that loop will also be closed when the background
920 thread terminates. When this is used, it's especially important to
921 make sure that all asyncio background tasks are managed through
922 `get_appp().create_background_task()`, so that unfinished tasks are
923 properly cancelled before the event loop is closed. This is used
924 for instance in ptpython.
925 :param handle_sigint: Handle SIGINT signal. Call the key binding for
926 `Keys.SIGINT`. (This only works in the main thread.)
927 """
928 if in_thread:
929 result: _AppResult
930 exception: BaseException | None = None
932 def run_in_thread() -> None:
933 nonlocal result, exception
934 try:
935 result = self.run(
936 pre_run=pre_run,
937 set_exception_handler=set_exception_handler,
938 # Signal handling only works in the main thread.
939 handle_sigint=False,
940 )
941 except BaseException as e:
942 exception = e
944 thread = threading.Thread(target=run_in_thread)
945 thread.start()
946 thread.join()
948 if exception is not None:
949 raise exception
950 return result
952 coro = self.run_async(
953 pre_run=pre_run,
954 set_exception_handler=set_exception_handler,
955 handle_sigint=handle_sigint,
956 )
957 try:
958 # See whether a loop was installed already. If so, use that. That's
959 # required for the input hooks to work, they are installed using
960 # `set_event_loop`.
961 loop = asyncio.get_event_loop()
962 except RuntimeError:
963 # No loop installed. Run like usual.
964 return asyncio.run(coro)
965 else:
966 # Use existing loop.
967 return loop.run_until_complete(coro)
969 def _handle_exception(
970 self, loop: AbstractEventLoop, context: dict[str, Any]
971 ) -> None:
972 """
973 Handler for event loop exceptions.
974 This will print the exception, using run_in_terminal.
975 """
976 # For Python 2: we have to get traceback at this point, because
977 # we're still in the 'except:' block of the event loop where the
978 # traceback is still available. Moving this code in the
979 # 'print_exception' coroutine will loose the exception.
980 tb = get_traceback_from_context(context)
981 formatted_tb = "".join(format_tb(tb))
983 async def in_term() -> None:
984 async with in_terminal():
985 # Print output. Similar to 'loop.default_exception_handler',
986 # but don't use logger. (This works better on Python 2.)
987 print("\nUnhandled exception in event loop:")
988 print(formatted_tb)
989 print("Exception {}".format(context.get("exception")))
991 await _do_wait_for_enter("Press ENTER to continue...")
993 ensure_future(in_term())
995 @contextmanager
996 def _enable_breakpointhook(self) -> Generator[None, None, None]:
997 """
998 Install our custom breakpointhook for the duration of this context
999 manager. (We will only install the hook if no other custom hook was
1000 set.)
1001 """
1002 if sys.version_info >= (3, 7) and sys.breakpointhook == sys.__breakpointhook__:
1003 sys.breakpointhook = self._breakpointhook
1005 try:
1006 yield
1007 finally:
1008 sys.breakpointhook = sys.__breakpointhook__
1009 else:
1010 yield
1012 def _breakpointhook(self, *a: object, **kw: object) -> None:
1013 """
1014 Breakpointhook which uses PDB, but ensures that the application is
1015 hidden and input echoing is restored during each debugger dispatch.
1017 This can be called from any thread. In any case, the application's
1018 event loop will be blocked while the PDB input is displayed. The event
1019 will continue after leaving the debugger.
1020 """
1021 app = self
1022 # Inline import on purpose. We don't want to import pdb, if not needed.
1023 import pdb
1024 from types import FrameType
1026 TraceDispatch = Callable[[FrameType, str, Any], Any]
1028 @contextmanager
1029 def hide_app_from_eventloop_thread() -> Generator[None, None, None]:
1030 """Stop application if `__breakpointhook__` is called from within
1031 the App's event loop."""
1032 # Hide application.
1033 app.renderer.erase()
1035 # Detach input and dispatch to debugger.
1036 with app.input.detach():
1037 with app.input.cooked_mode():
1038 yield
1040 # Note: we don't render the application again here, because
1041 # there's a good chance that there's a breakpoint on the next
1042 # line. This paint/erase cycle would move the PDB prompt back
1043 # to the middle of the screen.
1045 @contextmanager
1046 def hide_app_from_other_thread() -> Generator[None, None, None]:
1047 """Stop application if `__breakpointhook__` is called from a
1048 thread other than the App's event loop."""
1049 ready = threading.Event()
1050 done = threading.Event()
1052 async def in_loop() -> None:
1053 # from .run_in_terminal import in_terminal
1054 # async with in_terminal():
1055 # ready.set()
1056 # await asyncio.get_running_loop().run_in_executor(None, done.wait)
1057 # return
1059 # Hide application.
1060 app.renderer.erase()
1062 # Detach input and dispatch to debugger.
1063 with app.input.detach():
1064 with app.input.cooked_mode():
1065 ready.set()
1066 # Here we block the App's event loop thread until the
1067 # debugger resumes. We could have used `with
1068 # run_in_terminal.in_terminal():` like the commented
1069 # code above, but it seems to work better if we
1070 # completely stop the main event loop while debugging.
1071 done.wait()
1073 self.create_background_task(in_loop())
1074 ready.wait()
1075 try:
1076 yield
1077 finally:
1078 done.set()
1080 class CustomPdb(pdb.Pdb):
1081 def trace_dispatch(
1082 self, frame: FrameType, event: str, arg: Any
1083 ) -> TraceDispatch:
1084 if app._loop_thread is None:
1085 return super().trace_dispatch(frame, event, arg)
1087 if app._loop_thread == threading.current_thread():
1088 with hide_app_from_eventloop_thread():
1089 return super().trace_dispatch(frame, event, arg)
1091 with hide_app_from_other_thread():
1092 return super().trace_dispatch(frame, event, arg)
1094 frame = sys._getframe().f_back
1095 CustomPdb(stdout=sys.__stdout__).set_trace(frame)
1097 def create_background_task(
1098 self, coroutine: Coroutine[Any, Any, None]
1099 ) -> asyncio.Task[None]:
1100 """
1101 Start a background task (coroutine) for the running application. When
1102 the `Application` terminates, unfinished background tasks will be
1103 cancelled.
1105 Given that we still support Python versions before 3.11, we can't use
1106 task groups (and exception groups), because of that, these background
1107 tasks are not allowed to raise exceptions. If they do, we'll call the
1108 default exception handler from the event loop.
1110 If at some point, we have Python 3.11 as the minimum supported Python
1111 version, then we can use a `TaskGroup` (with the lifetime of
1112 `Application.run_async()`, and run run the background tasks in there.
1114 This is not threadsafe.
1115 """
1116 loop = self.loop or get_running_loop()
1117 task: asyncio.Task[None] = loop.create_task(coroutine)
1118 self._background_tasks.add(task)
1120 task.add_done_callback(self._on_background_task_done)
1121 return task
1123 def _on_background_task_done(self, task: asyncio.Task[None]) -> None:
1124 """
1125 Called when a background task completes. Remove it from
1126 `_background_tasks`, and handle exceptions if any.
1127 """
1128 self._background_tasks.discard(task)
1130 if task.cancelled():
1131 return
1133 exc = task.exception()
1134 if exc is not None:
1135 get_running_loop().call_exception_handler(
1136 {
1137 "message": f"prompt_toolkit.Application background task {task!r} "
1138 "raised an unexpected exception.",
1139 "exception": exc,
1140 "task": task,
1141 }
1142 )
1144 async def cancel_and_wait_for_background_tasks(self) -> None:
1145 """
1146 Cancel all background tasks, and wait for the cancellation to complete.
1147 If any of the background tasks raised an exception, this will also
1148 propagate the exception.
1150 (If we had nurseries like Trio, this would be the `__aexit__` of a
1151 nursery.)
1152 """
1153 for task in self._background_tasks:
1154 task.cancel()
1156 # Wait until the cancellation of the background tasks completes.
1157 # `asyncio.wait()` does not propagate exceptions raised within any of
1158 # these tasks, which is what we want. Otherwise, we can't distinguish
1159 # between a `CancelledError` raised in this task because it got
1160 # cancelled, and a `CancelledError` raised on this `await` checkpoint,
1161 # because *we* got cancelled during the teardown of the application.
1162 # (If we get cancelled here, then it's important to not suppress the
1163 # `CancelledError`, and have it propagate.)
1164 # NOTE: Currently, if we get cancelled at this point then we can't wait
1165 # for the cancellation to complete (in the future, we should be
1166 # using anyio or Python's 3.11 TaskGroup.)
1167 # Also, if we had exception groups, we could propagate an
1168 # `ExceptionGroup` if something went wrong here. Right now, we
1169 # don't propagate exceptions, but have them printed in
1170 # `_on_background_task_done`.
1171 if len(self._background_tasks) > 0:
1172 await asyncio.wait(
1173 self._background_tasks, timeout=None, return_when=asyncio.ALL_COMPLETED
1174 )
1176 async def _poll_output_size(self) -> None:
1177 """
1178 Coroutine for polling the terminal dimensions.
1180 Useful for situations where `attach_winch_signal_handler` is not sufficient:
1181 - If we are not running in the main thread.
1182 - On Windows.
1183 """
1184 size: Size | None = None
1185 interval = self.terminal_size_polling_interval
1187 if interval is None:
1188 return
1190 while True:
1191 await asyncio.sleep(interval)
1192 new_size = self.output.get_size()
1194 if size is not None and new_size != size:
1195 self._on_resize()
1196 size = new_size
1198 def cpr_not_supported_callback(self) -> None:
1199 """
1200 Called when we don't receive the cursor position response in time.
1201 """
1202 if not self.output.responds_to_cpr:
1203 return # We know about this already.
1205 def in_terminal() -> None:
1206 self.output.write(
1207 "WARNING: your terminal doesn't support cursor position requests (CPR).\r\n"
1208 )
1209 self.output.flush()
1211 run_in_terminal(in_terminal)
1213 @overload
1214 def exit(self) -> None:
1215 "Exit without arguments."
1217 @overload
1218 def exit(self, *, result: _AppResult, style: str = "") -> None:
1219 "Exit with `_AppResult`."
1221 @overload
1222 def exit(
1223 self, *, exception: BaseException | type[BaseException], style: str = ""
1224 ) -> None:
1225 "Exit with exception."
1227 def exit(
1228 self,
1229 result: _AppResult | None = None,
1230 exception: BaseException | type[BaseException] | None = None,
1231 style: str = "",
1232 ) -> None:
1233 """
1234 Exit application.
1236 .. note::
1238 If `Application.exit` is called before `Application.run()` is
1239 called, then the `Application` won't exit (because the
1240 `Application.future` doesn't correspond to the current run). Use a
1241 `pre_run` hook and an event to synchronize the closing if there's a
1242 chance this can happen.
1244 :param result: Set this result for the application.
1245 :param exception: Set this exception as the result for an application. For
1246 a prompt, this is often `EOFError` or `KeyboardInterrupt`.
1247 :param style: Apply this style on the whole content when quitting,
1248 often this is 'class:exiting' for a prompt. (Used when
1249 `erase_when_done` is not set.)
1250 """
1251 assert result is None or exception is None
1253 if self.future is None:
1254 raise Exception("Application is not running. Application.exit() failed.")
1256 if self.future.done():
1257 raise Exception("Return value already set. Application.exit() failed.")
1259 self.exit_style = style
1261 if exception is not None:
1262 self.future.set_exception(exception)
1263 else:
1264 self.future.set_result(cast(_AppResult, result))
1266 def _request_absolute_cursor_position(self) -> None:
1267 """
1268 Send CPR request.
1269 """
1270 # Note: only do this if the input queue is not empty, and a return
1271 # value has not been set. Otherwise, we won't be able to read the
1272 # response anyway.
1273 if not self.key_processor.input_queue and not self.is_done:
1274 self.renderer.request_absolute_cursor_position()
1276 async def run_system_command(
1277 self,
1278 command: str,
1279 wait_for_enter: bool = True,
1280 display_before_text: AnyFormattedText = "",
1281 wait_text: str = "Press ENTER to continue...",
1282 ) -> None:
1283 """
1284 Run system command (While hiding the prompt. When finished, all the
1285 output will scroll above the prompt.)
1287 :param command: Shell command to be executed.
1288 :param wait_for_enter: FWait for the user to press enter, when the
1289 command is finished.
1290 :param display_before_text: If given, text to be displayed before the
1291 command executes.
1292 :return: A `Future` object.
1293 """
1294 async with in_terminal():
1295 # Try to use the same input/output file descriptors as the one,
1296 # used to run this application.
1297 try:
1298 input_fd = self.input.fileno()
1299 except AttributeError:
1300 input_fd = sys.stdin.fileno()
1301 try:
1302 output_fd = self.output.fileno()
1303 except AttributeError:
1304 output_fd = sys.stdout.fileno()
1306 # Run sub process.
1307 def run_command() -> None:
1308 self.print_text(display_before_text)
1309 p = Popen(command, shell=True, stdin=input_fd, stdout=output_fd)
1310 p.wait()
1312 await run_in_executor_with_context(run_command)
1314 # Wait for the user to press enter.
1315 if wait_for_enter:
1316 await _do_wait_for_enter(wait_text)
1318 def suspend_to_background(self, suspend_group: bool = True) -> None:
1319 """
1320 (Not thread safe -- to be called from inside the key bindings.)
1321 Suspend process.
1323 :param suspend_group: When true, suspend the whole process group.
1324 (This is the default, and probably what you want.)
1325 """
1326 # Only suspend when the operating system supports it.
1327 # (Not on Windows.)
1328 if _SIGTSTP is not None:
1330 def run() -> None:
1331 signal = cast(int, _SIGTSTP)
1332 # Send `SIGTSTP` to own process.
1333 # This will cause it to suspend.
1335 # Usually we want the whole process group to be suspended. This
1336 # handles the case when input is piped from another process.
1337 if suspend_group:
1338 os.kill(0, signal)
1339 else:
1340 os.kill(os.getpid(), signal)
1342 run_in_terminal(run)
1344 def print_text(
1345 self, text: AnyFormattedText, style: BaseStyle | None = None
1346 ) -> None:
1347 """
1348 Print a list of (style_str, text) tuples to the output.
1349 (When the UI is running, this method has to be called through
1350 `run_in_terminal`, otherwise it will destroy the UI.)
1352 :param text: List of ``(style_str, text)`` tuples.
1353 :param style: Style class to use. Defaults to the active style in the CLI.
1354 """
1355 print_formatted_text(
1356 output=self.output,
1357 formatted_text=text,
1358 style=style or self._merged_style,
1359 color_depth=self.color_depth,
1360 style_transformation=self.style_transformation,
1361 )
1363 @property
1364 def is_running(self) -> bool:
1365 "`True` when the application is currently active/running."
1366 return self._is_running
1368 @property
1369 def is_done(self) -> bool:
1370 if self.future:
1371 return self.future.done()
1372 return False
1374 def get_used_style_strings(self) -> list[str]:
1375 """
1376 Return a list of used style strings. This is helpful for debugging, and
1377 for writing a new `Style`.
1378 """
1379 attrs_for_style = self.renderer._attrs_for_style
1381 if attrs_for_style:
1382 return sorted(
1383 re.sub(r"\s+", " ", style_str).strip()
1384 for style_str in attrs_for_style.keys()
1385 )
1387 return []
1390class _CombinedRegistry(KeyBindingsBase):
1391 """
1392 The `KeyBindings` of key bindings for a `Application`.
1393 This merges the global key bindings with the one of the current user
1394 control.
1395 """
1397 def __init__(self, app: Application[_AppResult]) -> None:
1398 self.app = app
1399 self._cache: SimpleCache[
1400 tuple[Window, frozenset[UIControl]], KeyBindingsBase
1401 ] = SimpleCache()
1403 @property
1404 def _version(self) -> Hashable:
1405 """Not needed - this object is not going to be wrapped in another
1406 KeyBindings object."""
1407 raise NotImplementedError
1409 @property
1410 def bindings(self) -> list[Binding]:
1411 """Not needed - this object is not going to be wrapped in another
1412 KeyBindings object."""
1413 raise NotImplementedError
1415 def _create_key_bindings(
1416 self, current_window: Window, other_controls: list[UIControl]
1417 ) -> KeyBindingsBase:
1418 """
1419 Create a `KeyBindings` object that merges the `KeyBindings` from the
1420 `UIControl` with all the parent controls and the global key bindings.
1421 """
1422 key_bindings = []
1423 collected_containers = set()
1425 # Collect key bindings from currently focused control and all parent
1426 # controls. Don't include key bindings of container parent controls.
1427 container: Container = current_window
1428 while True:
1429 collected_containers.add(container)
1430 kb = container.get_key_bindings()
1431 if kb is not None:
1432 key_bindings.append(kb)
1434 if container.is_modal():
1435 break
1437 parent = self.app.layout.get_parent(container)
1438 if parent is None:
1439 break
1440 else:
1441 container = parent
1443 # Include global bindings (starting at the top-model container).
1444 for c in walk(container):
1445 if c not in collected_containers:
1446 kb = c.get_key_bindings()
1447 if kb is not None:
1448 key_bindings.append(GlobalOnlyKeyBindings(kb))
1450 # Add App key bindings
1451 if self.app.key_bindings:
1452 key_bindings.append(self.app.key_bindings)
1454 # Add mouse bindings.
1455 key_bindings.append(
1456 ConditionalKeyBindings(
1457 self.app._page_navigation_bindings,
1458 self.app.enable_page_navigation_bindings,
1459 )
1460 )
1461 key_bindings.append(self.app._default_bindings)
1463 # Reverse this list. The current control's key bindings should come
1464 # last. They need priority.
1465 key_bindings = key_bindings[::-1]
1467 return merge_key_bindings(key_bindings)
1469 @property
1470 def _key_bindings(self) -> KeyBindingsBase:
1471 current_window = self.app.layout.current_window
1472 other_controls = list(self.app.layout.find_all_controls())
1473 key = current_window, frozenset(other_controls)
1475 return self._cache.get(
1476 key, lambda: self._create_key_bindings(current_window, other_controls)
1477 )
1479 def get_bindings_for_keys(self, keys: KeysTuple) -> list[Binding]:
1480 return self._key_bindings.get_bindings_for_keys(keys)
1482 def get_bindings_starting_with_keys(self, keys: KeysTuple) -> list[Binding]:
1483 return self._key_bindings.get_bindings_starting_with_keys(keys)
1486async def _do_wait_for_enter(wait_text: AnyFormattedText) -> None:
1487 """
1488 Create a sub application to wait for the enter key press.
1489 This has two advantages over using 'input'/'raw_input':
1490 - This will share the same input/output I/O.
1491 - This doesn't block the event loop.
1492 """
1493 from prompt_toolkit.shortcuts import PromptSession
1495 key_bindings = KeyBindings()
1497 @key_bindings.add("enter")
1498 def _ok(event: E) -> None:
1499 event.app.exit()
1501 @key_bindings.add(Keys.Any)
1502 def _ignore(event: E) -> None:
1503 "Disallow typing."
1504 pass
1506 session: PromptSession[None] = PromptSession(
1507 message=wait_text, key_bindings=key_bindings
1508 )
1509 try:
1510 await session.app.run_async()
1511 except KeyboardInterrupt:
1512 pass # Control-c pressed. Don't propagate this error.
1515@contextmanager
1516def attach_winch_signal_handler(
1517 handler: Callable[[], None]
1518) -> Generator[None, None, None]:
1519 """
1520 Attach the given callback as a WINCH signal handler within the context
1521 manager. Restore the original signal handler when done.
1523 The `Application.run` method will register SIGWINCH, so that it will
1524 properly repaint when the terminal window resizes. However, using
1525 `run_in_terminal`, we can temporarily send an application to the
1526 background, and run an other app in between, which will then overwrite the
1527 SIGWINCH. This is why it's important to restore the handler when the app
1528 terminates.
1529 """
1530 # The tricky part here is that signals are registered in the Unix event
1531 # loop with a wakeup fd, but another application could have registered
1532 # signals using signal.signal directly. For now, the implementation is
1533 # hard-coded for the `asyncio.unix_events._UnixSelectorEventLoop`.
1535 # No WINCH? Then don't do anything.
1536 sigwinch = getattr(signal, "SIGWINCH", None)
1537 if sigwinch is None or not in_main_thread():
1538 yield
1539 return
1541 # Keep track of the previous handler.
1542 # (Only UnixSelectorEventloop has `_signal_handlers`.)
1543 loop = get_running_loop()
1544 previous_winch_handler = getattr(loop, "_signal_handlers", {}).get(sigwinch)
1546 try:
1547 loop.add_signal_handler(sigwinch, handler)
1548 yield
1549 finally:
1550 # Restore the previous signal handler.
1551 loop.remove_signal_handler(sigwinch)
1552 if previous_winch_handler is not None:
1553 loop.add_signal_handler(
1554 sigwinch,
1555 previous_winch_handler._callback,
1556 *previous_winch_handler._args,
1557 )