Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/buffer.py: 17%
807 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1"""
2Data structures for the Buffer.
3It holds the text, cursor position, history, etc...
4"""
5from __future__ import annotations
7import asyncio
8import logging
9import os
10import re
11import shlex
12import shutil
13import subprocess
14import tempfile
15from collections import deque
16from enum import Enum
17from functools import wraps
18from typing import (
19 Any,
20 Awaitable,
21 Callable,
22 Coroutine,
23 Deque,
24 Iterable,
25 List,
26 Optional,
27 Set,
28 Tuple,
29 TypeVar,
30 Union,
31 cast,
32)
34from .application.current import get_app
35from .application.run_in_terminal import run_in_terminal
36from .auto_suggest import AutoSuggest, Suggestion
37from .cache import FastDictCache
38from .clipboard import ClipboardData
39from .completion import (
40 CompleteEvent,
41 Completer,
42 Completion,
43 DummyCompleter,
44 get_common_complete_suffix,
45)
46from .document import Document
47from .eventloop import aclosing
48from .filters import FilterOrBool, to_filter
49from .history import History, InMemoryHistory
50from .search import SearchDirection, SearchState
51from .selection import PasteMode, SelectionState, SelectionType
52from .utils import Event, to_str
53from .validation import ValidationError, Validator
55__all__ = [
56 "EditReadOnlyBuffer",
57 "Buffer",
58 "CompletionState",
59 "indent",
60 "unindent",
61 "reshape_text",
62]
64logger = logging.getLogger(__name__)
67class EditReadOnlyBuffer(Exception):
68 "Attempt editing of read-only :class:`.Buffer`."
71class ValidationState(Enum):
72 "The validation state of a buffer. This is set after the validation."
73 VALID = "VALID"
74 INVALID = "INVALID"
75 UNKNOWN = "UNKNOWN"
78class CompletionState:
79 """
80 Immutable class that contains a completion state.
81 """
83 def __init__(
84 self,
85 original_document: Document,
86 completions: list[Completion] | None = None,
87 complete_index: int | None = None,
88 ) -> None:
89 #: Document as it was when the completion started.
90 self.original_document = original_document
92 #: List of all the current Completion instances which are possible at
93 #: this point.
94 self.completions = completions or []
96 #: Position in the `completions` array.
97 #: This can be `None` to indicate "no completion", the original text.
98 self.complete_index = complete_index # Position in the `_completions` array.
100 def __repr__(self) -> str:
101 return "{}({!r}, <{!r}> completions, index={!r})".format(
102 self.__class__.__name__,
103 self.original_document,
104 len(self.completions),
105 self.complete_index,
106 )
108 def go_to_index(self, index: int | None) -> None:
109 """
110 Create a new :class:`.CompletionState` object with the new index.
112 When `index` is `None` deselect the completion.
113 """
114 if self.completions:
115 assert index is None or 0 <= index < len(self.completions)
116 self.complete_index = index
118 def new_text_and_position(self) -> tuple[str, int]:
119 """
120 Return (new_text, new_cursor_position) for this completion.
121 """
122 if self.complete_index is None:
123 return self.original_document.text, self.original_document.cursor_position
124 else:
125 original_text_before_cursor = self.original_document.text_before_cursor
126 original_text_after_cursor = self.original_document.text_after_cursor
128 c = self.completions[self.complete_index]
129 if c.start_position == 0:
130 before = original_text_before_cursor
131 else:
132 before = original_text_before_cursor[: c.start_position]
134 new_text = before + c.text + original_text_after_cursor
135 new_cursor_position = len(before) + len(c.text)
136 return new_text, new_cursor_position
138 @property
139 def current_completion(self) -> Completion | None:
140 """
141 Return the current completion, or return `None` when no completion is
142 selected.
143 """
144 if self.complete_index is not None:
145 return self.completions[self.complete_index]
146 return None
149_QUOTED_WORDS_RE = re.compile(r"""(\s+|".*?"|'.*?')""")
152class YankNthArgState:
153 """
154 For yank-last-arg/yank-nth-arg: Keep track of where we are in the history.
155 """
157 def __init__(
158 self, history_position: int = 0, n: int = -1, previous_inserted_word: str = ""
159 ) -> None:
160 self.history_position = history_position
161 self.previous_inserted_word = previous_inserted_word
162 self.n = n
164 def __repr__(self) -> str:
165 return "{}(history_position={!r}, n={!r}, previous_inserted_word={!r})".format(
166 self.__class__.__name__,
167 self.history_position,
168 self.n,
169 self.previous_inserted_word,
170 )
173BufferEventHandler = Callable[["Buffer"], None]
174BufferAcceptHandler = Callable[["Buffer"], bool]
177class Buffer:
178 """
179 The core data structure that holds the text and cursor position of the
180 current input line and implements all text manipulations on top of it. It
181 also implements the history, undo stack and the completion state.
183 :param completer: :class:`~prompt_toolkit.completion.Completer` instance.
184 :param history: :class:`~prompt_toolkit.history.History` instance.
185 :param tempfile_suffix: The tempfile suffix (extension) to be used for the
186 "open in editor" function. For a Python REPL, this would be ".py", so
187 that the editor knows the syntax highlighting to use. This can also be
188 a callable that returns a string.
189 :param tempfile: For more advanced tempfile situations where you need
190 control over the subdirectories and filename. For a Git Commit Message,
191 this would be ".git/COMMIT_EDITMSG", so that the editor knows the syntax
192 highlighting to use. This can also be a callable that returns a string.
193 :param name: Name for this buffer. E.g. DEFAULT_BUFFER. This is mostly
194 useful for key bindings where we sometimes prefer to refer to a buffer
195 by their name instead of by reference.
196 :param accept_handler: Called when the buffer input is accepted. (Usually
197 when the user presses `enter`.) The accept handler receives this
198 `Buffer` as input and should return True when the buffer text should be
199 kept instead of calling reset.
201 In case of a `PromptSession` for instance, we want to keep the text,
202 because we will exit the application, and only reset it during the next
203 run.
205 Events:
207 :param on_text_changed: When the buffer text changes. (Callable or None.)
208 :param on_text_insert: When new text is inserted. (Callable or None.)
209 :param on_cursor_position_changed: When the cursor moves. (Callable or None.)
210 :param on_completions_changed: When the completions were changed. (Callable or None.)
211 :param on_suggestion_set: When an auto-suggestion text has been set. (Callable or None.)
213 Filters:
215 :param complete_while_typing: :class:`~prompt_toolkit.filters.Filter`
216 or `bool`. Decide whether or not to do asynchronous autocompleting while
217 typing.
218 :param validate_while_typing: :class:`~prompt_toolkit.filters.Filter`
219 or `bool`. Decide whether or not to do asynchronous validation while
220 typing.
221 :param enable_history_search: :class:`~prompt_toolkit.filters.Filter` or
222 `bool` to indicate when up-arrow partial string matching is enabled. It
223 is advised to not enable this at the same time as
224 `complete_while_typing`, because when there is an autocompletion found,
225 the up arrows usually browse through the completions, rather than
226 through the history.
227 :param read_only: :class:`~prompt_toolkit.filters.Filter`. When True,
228 changes will not be allowed.
229 :param multiline: :class:`~prompt_toolkit.filters.Filter` or `bool`. When
230 not set, pressing `Enter` will call the `accept_handler`. Otherwise,
231 pressing `Esc-Enter` is required.
232 """
234 def __init__(
235 self,
236 completer: Completer | None = None,
237 auto_suggest: AutoSuggest | None = None,
238 history: History | None = None,
239 validator: Validator | None = None,
240 tempfile_suffix: str | Callable[[], str] = "",
241 tempfile: str | Callable[[], str] = "",
242 name: str = "",
243 complete_while_typing: FilterOrBool = False,
244 validate_while_typing: FilterOrBool = False,
245 enable_history_search: FilterOrBool = False,
246 document: Document | None = None,
247 accept_handler: BufferAcceptHandler | None = None,
248 read_only: FilterOrBool = False,
249 multiline: FilterOrBool = True,
250 on_text_changed: BufferEventHandler | None = None,
251 on_text_insert: BufferEventHandler | None = None,
252 on_cursor_position_changed: BufferEventHandler | None = None,
253 on_completions_changed: BufferEventHandler | None = None,
254 on_suggestion_set: BufferEventHandler | None = None,
255 ):
256 # Accept both filters and booleans as input.
257 enable_history_search = to_filter(enable_history_search)
258 complete_while_typing = to_filter(complete_while_typing)
259 validate_while_typing = to_filter(validate_while_typing)
260 read_only = to_filter(read_only)
261 multiline = to_filter(multiline)
263 self.completer = completer or DummyCompleter()
264 self.auto_suggest = auto_suggest
265 self.validator = validator
266 self.tempfile_suffix = tempfile_suffix
267 self.tempfile = tempfile
268 self.name = name
269 self.accept_handler = accept_handler
271 # Filters. (Usually, used by the key bindings to drive the buffer.)
272 self.complete_while_typing = complete_while_typing
273 self.validate_while_typing = validate_while_typing
274 self.enable_history_search = enable_history_search
275 self.read_only = read_only
276 self.multiline = multiline
278 # Text width. (For wrapping, used by the Vi 'gq' operator.)
279 self.text_width = 0
281 #: The command buffer history.
282 # Note that we shouldn't use a lazy 'or' here. bool(history) could be
283 # False when empty.
284 self.history = InMemoryHistory() if history is None else history
286 self.__cursor_position = 0
288 # Events
289 self.on_text_changed: Event[Buffer] = Event(self, on_text_changed)
290 self.on_text_insert: Event[Buffer] = Event(self, on_text_insert)
291 self.on_cursor_position_changed: Event[Buffer] = Event(
292 self, on_cursor_position_changed
293 )
294 self.on_completions_changed: Event[Buffer] = Event(self, on_completions_changed)
295 self.on_suggestion_set: Event[Buffer] = Event(self, on_suggestion_set)
297 # Document cache. (Avoid creating new Document instances.)
298 self._document_cache: FastDictCache[
299 tuple[str, int, SelectionState | None], Document
300 ] = FastDictCache(Document, size=10)
302 # Create completer / auto suggestion / validation coroutines.
303 self._async_suggester = self._create_auto_suggest_coroutine()
304 self._async_completer = self._create_completer_coroutine()
305 self._async_validator = self._create_auto_validate_coroutine()
307 # Asyncio task for populating the history.
308 self._load_history_task: asyncio.Future[None] | None = None
310 # Reset other attributes.
311 self.reset(document=document)
313 def __repr__(self) -> str:
314 if len(self.text) < 15:
315 text = self.text
316 else:
317 text = self.text[:12] + "..."
319 return f"<Buffer(name={self.name!r}, text={text!r}) at {id(self)!r}>"
321 def reset(
322 self, document: Document | None = None, append_to_history: bool = False
323 ) -> None:
324 """
325 :param append_to_history: Append current input to history first.
326 """
327 if append_to_history:
328 self.append_to_history()
330 document = document or Document()
332 self.__cursor_position = document.cursor_position
334 # `ValidationError` instance. (Will be set when the input is wrong.)
335 self.validation_error: ValidationError | None = None
336 self.validation_state: ValidationState | None = ValidationState.UNKNOWN
338 # State of the selection.
339 self.selection_state: SelectionState | None = None
341 # Multiple cursor mode. (When we press 'I' or 'A' in visual-block mode,
342 # we can insert text on multiple lines at once. This is implemented by
343 # using multiple cursors.)
344 self.multiple_cursor_positions: list[int] = []
346 # When doing consecutive up/down movements, prefer to stay at this column.
347 self.preferred_column: int | None = None
349 # State of complete browser
350 # For interactive completion through Ctrl-N/Ctrl-P.
351 self.complete_state: CompletionState | None = None
353 # State of Emacs yank-nth-arg completion.
354 self.yank_nth_arg_state: YankNthArgState | None = None # for yank-nth-arg.
356 # Remember the document that we had *right before* the last paste
357 # operation. This is used for rotating through the kill ring.
358 self.document_before_paste: Document | None = None
360 # Current suggestion.
361 self.suggestion: Suggestion | None = None
363 # The history search text. (Used for filtering the history when we
364 # browse through it.)
365 self.history_search_text: str | None = None
367 # Undo/redo stacks (stack of `(text, cursor_position)`).
368 self._undo_stack: list[tuple[str, int]] = []
369 self._redo_stack: list[tuple[str, int]] = []
371 # Cancel history loader. If history loading was still ongoing.
372 # Cancel the `_load_history_task`, so that next repaint of the
373 # `BufferControl` we will repopulate it.
374 if self._load_history_task is not None:
375 self._load_history_task.cancel()
376 self._load_history_task = None
378 #: The working lines. Similar to history, except that this can be
379 #: modified. The user can press arrow_up and edit previous entries.
380 #: Ctrl-C should reset this, and copy the whole history back in here.
381 #: Enter should process the current command and append to the real
382 #: history.
383 self._working_lines: Deque[str] = deque([document.text])
384 self.__working_index = 0
386 def load_history_if_not_yet_loaded(self) -> None:
387 """
388 Create task for populating the buffer history (if not yet done).
390 Note::
392 This needs to be called from within the event loop of the
393 application, because history loading is async, and we need to be
394 sure the right event loop is active. Therefor, we call this method
395 in the `BufferControl.create_content`.
397 There are situations where prompt_toolkit applications are created
398 in one thread, but will later run in a different thread (Ptpython
399 is one example. The REPL runs in a separate thread, in order to
400 prevent interfering with a potential different event loop in the
401 main thread. The REPL UI however is still created in the main
402 thread.) We could decide to not support creating prompt_toolkit
403 objects in one thread and running the application in a different
404 thread, but history loading is the only place where it matters, and
405 this solves it.
406 """
407 if self._load_history_task is None:
409 async def load_history() -> None:
410 async for item in self.history.load():
411 self._working_lines.appendleft(item)
412 self.__working_index += 1
414 self._load_history_task = get_app().create_background_task(load_history())
416 def load_history_done(f: asyncio.Future[None]) -> None:
417 """
418 Handle `load_history` result when either done, cancelled, or
419 when an exception was raised.
420 """
421 try:
422 f.result()
423 except asyncio.CancelledError:
424 # Ignore cancellation. But handle it, so that we don't get
425 # this traceback.
426 pass
427 except GeneratorExit:
428 # Probably not needed, but we had situations where
429 # `GeneratorExit` was raised in `load_history` during
430 # cancellation.
431 pass
432 except BaseException:
433 # Log error if something goes wrong. (We don't have a
434 # caller to which we can propagate this exception.)
435 logger.exception("Loading history failed")
437 self._load_history_task.add_done_callback(load_history_done)
439 # <getters/setters>
441 def _set_text(self, value: str) -> bool:
442 """set text at current working_index. Return whether it changed."""
443 working_index = self.working_index
444 working_lines = self._working_lines
446 original_value = working_lines[working_index]
447 working_lines[working_index] = value
449 # Return True when this text has been changed.
450 if len(value) != len(original_value):
451 # For Python 2, it seems that when two strings have a different
452 # length and one is a prefix of the other, Python still scans
453 # character by character to see whether the strings are different.
454 # (Some benchmarking showed significant differences for big
455 # documents. >100,000 of lines.)
456 return True
457 elif value != original_value:
458 return True
459 return False
461 def _set_cursor_position(self, value: int) -> bool:
462 """Set cursor position. Return whether it changed."""
463 original_position = self.__cursor_position
464 self.__cursor_position = max(0, value)
466 return self.__cursor_position != original_position
468 @property
469 def text(self) -> str:
470 return self._working_lines[self.working_index]
472 @text.setter
473 def text(self, value: str) -> None:
474 """
475 Setting text. (When doing this, make sure that the cursor_position is
476 valid for this text. text/cursor_position should be consistent at any time,
477 otherwise set a Document instead.)
478 """
479 # Ensure cursor position remains within the size of the text.
480 if self.cursor_position > len(value):
481 self.cursor_position = len(value)
483 # Don't allow editing of read-only buffers.
484 if self.read_only():
485 raise EditReadOnlyBuffer()
487 changed = self._set_text(value)
489 if changed:
490 self._text_changed()
492 # Reset history search text.
493 # (Note that this doesn't need to happen when working_index
494 # changes, which is when we traverse the history. That's why we
495 # don't do this in `self._text_changed`.)
496 self.history_search_text = None
498 @property
499 def cursor_position(self) -> int:
500 return self.__cursor_position
502 @cursor_position.setter
503 def cursor_position(self, value: int) -> None:
504 """
505 Setting cursor position.
506 """
507 assert isinstance(value, int)
509 # Ensure cursor position is within the size of the text.
510 if value > len(self.text):
511 value = len(self.text)
512 if value < 0:
513 value = 0
515 changed = self._set_cursor_position(value)
517 if changed:
518 self._cursor_position_changed()
520 @property
521 def working_index(self) -> int:
522 return self.__working_index
524 @working_index.setter
525 def working_index(self, value: int) -> None:
526 if self.__working_index != value:
527 self.__working_index = value
528 # Make sure to reset the cursor position, otherwise we end up in
529 # situations where the cursor position is out of the bounds of the
530 # text.
531 self.cursor_position = 0
532 self._text_changed()
534 def _text_changed(self) -> None:
535 # Remove any validation errors and complete state.
536 self.validation_error = None
537 self.validation_state = ValidationState.UNKNOWN
538 self.complete_state = None
539 self.yank_nth_arg_state = None
540 self.document_before_paste = None
541 self.selection_state = None
542 self.suggestion = None
543 self.preferred_column = None
545 # fire 'on_text_changed' event.
546 self.on_text_changed.fire()
548 # Input validation.
549 # (This happens on all change events, unlike auto completion, also when
550 # deleting text.)
551 if self.validator and self.validate_while_typing():
552 get_app().create_background_task(self._async_validator())
554 def _cursor_position_changed(self) -> None:
555 # Remove any complete state.
556 # (Input validation should only be undone when the cursor position
557 # changes.)
558 self.complete_state = None
559 self.yank_nth_arg_state = None
560 self.document_before_paste = None
562 # Unset preferred_column. (Will be set after the cursor movement, if
563 # required.)
564 self.preferred_column = None
566 # Note that the cursor position can change if we have a selection the
567 # new position of the cursor determines the end of the selection.
569 # fire 'on_cursor_position_changed' event.
570 self.on_cursor_position_changed.fire()
572 @property
573 def document(self) -> Document:
574 """
575 Return :class:`~prompt_toolkit.document.Document` instance from the
576 current text, cursor position and selection state.
577 """
578 return self._document_cache[
579 self.text, self.cursor_position, self.selection_state
580 ]
582 @document.setter
583 def document(self, value: Document) -> None:
584 """
585 Set :class:`~prompt_toolkit.document.Document` instance.
587 This will set both the text and cursor position at the same time, but
588 atomically. (Change events will be triggered only after both have been set.)
589 """
590 self.set_document(value)
592 def set_document(self, value: Document, bypass_readonly: bool = False) -> None:
593 """
594 Set :class:`~prompt_toolkit.document.Document` instance. Like the
595 ``document`` property, but accept an ``bypass_readonly`` argument.
597 :param bypass_readonly: When True, don't raise an
598 :class:`.EditReadOnlyBuffer` exception, even
599 when the buffer is read-only.
601 .. warning::
603 When this buffer is read-only and `bypass_readonly` was not passed,
604 the `EditReadOnlyBuffer` exception will be caught by the
605 `KeyProcessor` and is silently suppressed. This is important to
606 keep in mind when writing key bindings, because it won't do what
607 you expect, and there won't be a stack trace. Use try/finally
608 around this function if you need some cleanup code.
609 """
610 # Don't allow editing of read-only buffers.
611 if not bypass_readonly and self.read_only():
612 raise EditReadOnlyBuffer()
614 # Set text and cursor position first.
615 text_changed = self._set_text(value.text)
616 cursor_position_changed = self._set_cursor_position(value.cursor_position)
618 # Now handle change events. (We do this when text/cursor position is
619 # both set and consistent.)
620 if text_changed:
621 self._text_changed()
622 self.history_search_text = None
624 if cursor_position_changed:
625 self._cursor_position_changed()
627 @property
628 def is_returnable(self) -> bool:
629 """
630 True when there is something handling accept.
631 """
632 return bool(self.accept_handler)
634 # End of <getters/setters>
636 def save_to_undo_stack(self, clear_redo_stack: bool = True) -> None:
637 """
638 Safe current state (input text and cursor position), so that we can
639 restore it by calling undo.
640 """
641 # Safe if the text is different from the text at the top of the stack
642 # is different. If the text is the same, just update the cursor position.
643 if self._undo_stack and self._undo_stack[-1][0] == self.text:
644 self._undo_stack[-1] = (self._undo_stack[-1][0], self.cursor_position)
645 else:
646 self._undo_stack.append((self.text, self.cursor_position))
648 # Saving anything to the undo stack, clears the redo stack.
649 if clear_redo_stack:
650 self._redo_stack = []
652 def transform_lines(
653 self,
654 line_index_iterator: Iterable[int],
655 transform_callback: Callable[[str], str],
656 ) -> str:
657 """
658 Transforms the text on a range of lines.
659 When the iterator yield an index not in the range of lines that the
660 document contains, it skips them silently.
662 To uppercase some lines::
664 new_text = transform_lines(range(5,10), lambda text: text.upper())
666 :param line_index_iterator: Iterator of line numbers (int)
667 :param transform_callback: callable that takes the original text of a
668 line, and return the new text for this line.
670 :returns: The new text.
671 """
672 # Split lines
673 lines = self.text.split("\n")
675 # Apply transformation
676 for index in line_index_iterator:
677 try:
678 lines[index] = transform_callback(lines[index])
679 except IndexError:
680 pass
682 return "\n".join(lines)
684 def transform_current_line(self, transform_callback: Callable[[str], str]) -> None:
685 """
686 Apply the given transformation function to the current line.
688 :param transform_callback: callable that takes a string and return a new string.
689 """
690 document = self.document
691 a = document.cursor_position + document.get_start_of_line_position()
692 b = document.cursor_position + document.get_end_of_line_position()
693 self.text = (
694 document.text[:a]
695 + transform_callback(document.text[a:b])
696 + document.text[b:]
697 )
699 def transform_region(
700 self, from_: int, to: int, transform_callback: Callable[[str], str]
701 ) -> None:
702 """
703 Transform a part of the input string.
705 :param from_: (int) start position.
706 :param to: (int) end position.
707 :param transform_callback: Callable which accepts a string and returns
708 the transformed string.
709 """
710 assert from_ < to
712 self.text = "".join(
713 [
714 self.text[:from_]
715 + transform_callback(self.text[from_:to])
716 + self.text[to:]
717 ]
718 )
720 def cursor_left(self, count: int = 1) -> None:
721 self.cursor_position += self.document.get_cursor_left_position(count=count)
723 def cursor_right(self, count: int = 1) -> None:
724 self.cursor_position += self.document.get_cursor_right_position(count=count)
726 def cursor_up(self, count: int = 1) -> None:
727 """(for multiline edit). Move cursor to the previous line."""
728 original_column = self.preferred_column or self.document.cursor_position_col
729 self.cursor_position += self.document.get_cursor_up_position(
730 count=count, preferred_column=original_column
731 )
733 # Remember the original column for the next up/down movement.
734 self.preferred_column = original_column
736 def cursor_down(self, count: int = 1) -> None:
737 """(for multiline edit). Move cursor to the next line."""
738 original_column = self.preferred_column or self.document.cursor_position_col
739 self.cursor_position += self.document.get_cursor_down_position(
740 count=count, preferred_column=original_column
741 )
743 # Remember the original column for the next up/down movement.
744 self.preferred_column = original_column
746 def auto_up(
747 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False
748 ) -> None:
749 """
750 If we're not on the first line (of a multiline input) go a line up,
751 otherwise go back in history. (If nothing is selected.)
752 """
753 if self.complete_state:
754 self.complete_previous(count=count)
755 elif self.document.cursor_position_row > 0:
756 self.cursor_up(count=count)
757 elif not self.selection_state:
758 self.history_backward(count=count)
760 # Go to the start of the line?
761 if go_to_start_of_line_if_history_changes:
762 self.cursor_position += self.document.get_start_of_line_position()
764 def auto_down(
765 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False
766 ) -> None:
767 """
768 If we're not on the last line (of a multiline input) go a line down,
769 otherwise go forward in history. (If nothing is selected.)
770 """
771 if self.complete_state:
772 self.complete_next(count=count)
773 elif self.document.cursor_position_row < self.document.line_count - 1:
774 self.cursor_down(count=count)
775 elif not self.selection_state:
776 self.history_forward(count=count)
778 # Go to the start of the line?
779 if go_to_start_of_line_if_history_changes:
780 self.cursor_position += self.document.get_start_of_line_position()
782 def delete_before_cursor(self, count: int = 1) -> str:
783 """
784 Delete specified number of characters before cursor and return the
785 deleted text.
786 """
787 assert count >= 0
788 deleted = ""
790 if self.cursor_position > 0:
791 deleted = self.text[self.cursor_position - count : self.cursor_position]
793 new_text = (
794 self.text[: self.cursor_position - count]
795 + self.text[self.cursor_position :]
796 )
797 new_cursor_position = self.cursor_position - len(deleted)
799 # Set new Document atomically.
800 self.document = Document(new_text, new_cursor_position)
802 return deleted
804 def delete(self, count: int = 1) -> str:
805 """
806 Delete specified number of characters and Return the deleted text.
807 """
808 if self.cursor_position < len(self.text):
809 deleted = self.document.text_after_cursor[:count]
810 self.text = (
811 self.text[: self.cursor_position]
812 + self.text[self.cursor_position + len(deleted) :]
813 )
814 return deleted
815 else:
816 return ""
818 def join_next_line(self, separator: str = " ") -> None:
819 """
820 Join the next line to the current one by deleting the line ending after
821 the current line.
822 """
823 if not self.document.on_last_line:
824 self.cursor_position += self.document.get_end_of_line_position()
825 self.delete()
827 # Remove spaces.
828 self.text = (
829 self.document.text_before_cursor
830 + separator
831 + self.document.text_after_cursor.lstrip(" ")
832 )
834 def join_selected_lines(self, separator: str = " ") -> None:
835 """
836 Join the selected lines.
837 """
838 assert self.selection_state
840 # Get lines.
841 from_, to = sorted(
842 [self.cursor_position, self.selection_state.original_cursor_position]
843 )
845 before = self.text[:from_]
846 lines = self.text[from_:to].splitlines()
847 after = self.text[to:]
849 # Replace leading spaces with just one space.
850 lines = [l.lstrip(" ") + separator for l in lines]
852 # Set new document.
853 self.document = Document(
854 text=before + "".join(lines) + after,
855 cursor_position=len(before + "".join(lines[:-1])) - 1,
856 )
858 def swap_characters_before_cursor(self) -> None:
859 """
860 Swap the last two characters before the cursor.
861 """
862 pos = self.cursor_position
864 if pos >= 2:
865 a = self.text[pos - 2]
866 b = self.text[pos - 1]
868 self.text = self.text[: pos - 2] + b + a + self.text[pos:]
870 def go_to_history(self, index: int) -> None:
871 """
872 Go to this item in the history.
873 """
874 if index < len(self._working_lines):
875 self.working_index = index
876 self.cursor_position = len(self.text)
878 def complete_next(self, count: int = 1, disable_wrap_around: bool = False) -> None:
879 """
880 Browse to the next completions.
881 (Does nothing if there are no completion.)
882 """
883 index: int | None
885 if self.complete_state:
886 completions_count = len(self.complete_state.completions)
888 if self.complete_state.complete_index is None:
889 index = 0
890 elif self.complete_state.complete_index == completions_count - 1:
891 index = None
893 if disable_wrap_around:
894 return
895 else:
896 index = min(
897 completions_count - 1, self.complete_state.complete_index + count
898 )
899 self.go_to_completion(index)
901 def complete_previous(
902 self, count: int = 1, disable_wrap_around: bool = False
903 ) -> None:
904 """
905 Browse to the previous completions.
906 (Does nothing if there are no completion.)
907 """
908 index: int | None
910 if self.complete_state:
911 if self.complete_state.complete_index == 0:
912 index = None
914 if disable_wrap_around:
915 return
916 elif self.complete_state.complete_index is None:
917 index = len(self.complete_state.completions) - 1
918 else:
919 index = max(0, self.complete_state.complete_index - count)
921 self.go_to_completion(index)
923 def cancel_completion(self) -> None:
924 """
925 Cancel completion, go back to the original text.
926 """
927 if self.complete_state:
928 self.go_to_completion(None)
929 self.complete_state = None
931 def _set_completions(self, completions: list[Completion]) -> CompletionState:
932 """
933 Start completions. (Generate list of completions and initialize.)
935 By default, no completion will be selected.
936 """
937 self.complete_state = CompletionState(
938 original_document=self.document, completions=completions
939 )
941 # Trigger event. This should eventually invalidate the layout.
942 self.on_completions_changed.fire()
944 return self.complete_state
946 def start_history_lines_completion(self) -> None:
947 """
948 Start a completion based on all the other lines in the document and the
949 history.
950 """
951 found_completions: set[str] = set()
952 completions = []
954 # For every line of the whole history, find matches with the current line.
955 current_line = self.document.current_line_before_cursor.lstrip()
957 for i, string in enumerate(self._working_lines):
958 for j, l in enumerate(string.split("\n")):
959 l = l.strip()
960 if l and l.startswith(current_line):
961 # When a new line has been found.
962 if l not in found_completions:
963 found_completions.add(l)
965 # Create completion.
966 if i == self.working_index:
967 display_meta = "Current, line %s" % (j + 1)
968 else:
969 display_meta = f"History {i + 1}, line {j + 1}"
971 completions.append(
972 Completion(
973 text=l,
974 start_position=-len(current_line),
975 display_meta=display_meta,
976 )
977 )
979 self._set_completions(completions=completions[::-1])
980 self.go_to_completion(0)
982 def go_to_completion(self, index: int | None) -> None:
983 """
984 Select a completion from the list of current completions.
985 """
986 assert self.complete_state
988 # Set new completion
989 state = self.complete_state
990 state.go_to_index(index)
992 # Set text/cursor position
993 new_text, new_cursor_position = state.new_text_and_position()
994 self.document = Document(new_text, new_cursor_position)
996 # (changing text/cursor position will unset complete_state.)
997 self.complete_state = state
999 def apply_completion(self, completion: Completion) -> None:
1000 """
1001 Insert a given completion.
1002 """
1003 # If there was already a completion active, cancel that one.
1004 if self.complete_state:
1005 self.go_to_completion(None)
1006 self.complete_state = None
1008 # Insert text from the given completion.
1009 self.delete_before_cursor(-completion.start_position)
1010 self.insert_text(completion.text)
1012 def _set_history_search(self) -> None:
1013 """
1014 Set `history_search_text`.
1015 (The text before the cursor will be used for filtering the history.)
1016 """
1017 if self.enable_history_search():
1018 if self.history_search_text is None:
1019 self.history_search_text = self.document.text_before_cursor
1020 else:
1021 self.history_search_text = None
1023 def _history_matches(self, i: int) -> bool:
1024 """
1025 True when the current entry matches the history search.
1026 (when we don't have history search, it's also True.)
1027 """
1028 return self.history_search_text is None or self._working_lines[i].startswith(
1029 self.history_search_text
1030 )
1032 def history_forward(self, count: int = 1) -> None:
1033 """
1034 Move forwards through the history.
1036 :param count: Amount of items to move forward.
1037 """
1038 self._set_history_search()
1040 # Go forward in history.
1041 found_something = False
1043 for i in range(self.working_index + 1, len(self._working_lines)):
1044 if self._history_matches(i):
1045 self.working_index = i
1046 count -= 1
1047 found_something = True
1048 if count == 0:
1049 break
1051 # If we found an entry, move cursor to the end of the first line.
1052 if found_something:
1053 self.cursor_position = 0
1054 self.cursor_position += self.document.get_end_of_line_position()
1056 def history_backward(self, count: int = 1) -> None:
1057 """
1058 Move backwards through history.
1059 """
1060 self._set_history_search()
1062 # Go back in history.
1063 found_something = False
1065 for i in range(self.working_index - 1, -1, -1):
1066 if self._history_matches(i):
1067 self.working_index = i
1068 count -= 1
1069 found_something = True
1070 if count == 0:
1071 break
1073 # If we move to another entry, move cursor to the end of the line.
1074 if found_something:
1075 self.cursor_position = len(self.text)
1077 def yank_nth_arg(self, n: int | None = None, _yank_last_arg: bool = False) -> None:
1078 """
1079 Pick nth word from previous history entry (depending on current
1080 `yank_nth_arg_state`) and insert it at current position. Rotate through
1081 history if called repeatedly. If no `n` has been given, take the first
1082 argument. (The second word.)
1084 :param n: (None or int), The index of the word from the previous line
1085 to take.
1086 """
1087 assert n is None or isinstance(n, int)
1088 history_strings = self.history.get_strings()
1090 if not len(history_strings):
1091 return
1093 # Make sure we have a `YankNthArgState`.
1094 if self.yank_nth_arg_state is None:
1095 state = YankNthArgState(n=-1 if _yank_last_arg else 1)
1096 else:
1097 state = self.yank_nth_arg_state
1099 if n is not None:
1100 state.n = n
1102 # Get new history position.
1103 new_pos = state.history_position - 1
1104 if -new_pos > len(history_strings):
1105 new_pos = -1
1107 # Take argument from line.
1108 line = history_strings[new_pos]
1110 words = [w.strip() for w in _QUOTED_WORDS_RE.split(line)]
1111 words = [w for w in words if w]
1112 try:
1113 word = words[state.n]
1114 except IndexError:
1115 word = ""
1117 # Insert new argument.
1118 if state.previous_inserted_word:
1119 self.delete_before_cursor(len(state.previous_inserted_word))
1120 self.insert_text(word)
1122 # Save state again for next completion. (Note that the 'insert'
1123 # operation from above clears `self.yank_nth_arg_state`.)
1124 state.previous_inserted_word = word
1125 state.history_position = new_pos
1126 self.yank_nth_arg_state = state
1128 def yank_last_arg(self, n: int | None = None) -> None:
1129 """
1130 Like `yank_nth_arg`, but if no argument has been given, yank the last
1131 word by default.
1132 """
1133 self.yank_nth_arg(n=n, _yank_last_arg=True)
1135 def start_selection(
1136 self, selection_type: SelectionType = SelectionType.CHARACTERS
1137 ) -> None:
1138 """
1139 Take the current cursor position as the start of this selection.
1140 """
1141 self.selection_state = SelectionState(self.cursor_position, selection_type)
1143 def copy_selection(self, _cut: bool = False) -> ClipboardData:
1144 """
1145 Copy selected text and return :class:`.ClipboardData` instance.
1147 Notice that this doesn't store the copied data on the clipboard yet.
1148 You can store it like this:
1150 .. code:: python
1152 data = buffer.copy_selection()
1153 get_app().clipboard.set_data(data)
1154 """
1155 new_document, clipboard_data = self.document.cut_selection()
1156 if _cut:
1157 self.document = new_document
1159 self.selection_state = None
1160 return clipboard_data
1162 def cut_selection(self) -> ClipboardData:
1163 """
1164 Delete selected text and return :class:`.ClipboardData` instance.
1165 """
1166 return self.copy_selection(_cut=True)
1168 def paste_clipboard_data(
1169 self,
1170 data: ClipboardData,
1171 paste_mode: PasteMode = PasteMode.EMACS,
1172 count: int = 1,
1173 ) -> None:
1174 """
1175 Insert the data from the clipboard.
1176 """
1177 assert isinstance(data, ClipboardData)
1178 assert paste_mode in (PasteMode.VI_BEFORE, PasteMode.VI_AFTER, PasteMode.EMACS)
1180 original_document = self.document
1181 self.document = self.document.paste_clipboard_data(
1182 data, paste_mode=paste_mode, count=count
1183 )
1185 # Remember original document. This assignment should come at the end,
1186 # because assigning to 'document' will erase it.
1187 self.document_before_paste = original_document
1189 def newline(self, copy_margin: bool = True) -> None:
1190 """
1191 Insert a line ending at the current position.
1192 """
1193 if copy_margin:
1194 self.insert_text("\n" + self.document.leading_whitespace_in_current_line)
1195 else:
1196 self.insert_text("\n")
1198 def insert_line_above(self, copy_margin: bool = True) -> None:
1199 """
1200 Insert a new line above the current one.
1201 """
1202 if copy_margin:
1203 insert = self.document.leading_whitespace_in_current_line + "\n"
1204 else:
1205 insert = "\n"
1207 self.cursor_position += self.document.get_start_of_line_position()
1208 self.insert_text(insert)
1209 self.cursor_position -= 1
1211 def insert_line_below(self, copy_margin: bool = True) -> None:
1212 """
1213 Insert a new line below the current one.
1214 """
1215 if copy_margin:
1216 insert = "\n" + self.document.leading_whitespace_in_current_line
1217 else:
1218 insert = "\n"
1220 self.cursor_position += self.document.get_end_of_line_position()
1221 self.insert_text(insert)
1223 def insert_text(
1224 self,
1225 data: str,
1226 overwrite: bool = False,
1227 move_cursor: bool = True,
1228 fire_event: bool = True,
1229 ) -> None:
1230 """
1231 Insert characters at cursor position.
1233 :param fire_event: Fire `on_text_insert` event. This is mainly used to
1234 trigger autocompletion while typing.
1235 """
1236 # Original text & cursor position.
1237 otext = self.text
1238 ocpos = self.cursor_position
1240 # In insert/text mode.
1241 if overwrite:
1242 # Don't overwrite the newline itself. Just before the line ending,
1243 # it should act like insert mode.
1244 overwritten_text = otext[ocpos : ocpos + len(data)]
1245 if "\n" in overwritten_text:
1246 overwritten_text = overwritten_text[: overwritten_text.find("\n")]
1248 text = otext[:ocpos] + data + otext[ocpos + len(overwritten_text) :]
1249 else:
1250 text = otext[:ocpos] + data + otext[ocpos:]
1252 if move_cursor:
1253 cpos = self.cursor_position + len(data)
1254 else:
1255 cpos = self.cursor_position
1257 # Set new document.
1258 # (Set text and cursor position at the same time. Otherwise, setting
1259 # the text will fire a change event before the cursor position has been
1260 # set. It works better to have this atomic.)
1261 self.document = Document(text, cpos)
1263 # Fire 'on_text_insert' event.
1264 if fire_event: # XXX: rename to `start_complete`.
1265 self.on_text_insert.fire()
1267 # Only complete when "complete_while_typing" is enabled.
1268 if self.completer and self.complete_while_typing():
1269 get_app().create_background_task(self._async_completer())
1271 # Call auto_suggest.
1272 if self.auto_suggest:
1273 get_app().create_background_task(self._async_suggester())
1275 def undo(self) -> None:
1276 # Pop from the undo-stack until we find a text that if different from
1277 # the current text. (The current logic of `save_to_undo_stack` will
1278 # cause that the top of the undo stack is usually the same as the
1279 # current text, so in that case we have to pop twice.)
1280 while self._undo_stack:
1281 text, pos = self._undo_stack.pop()
1283 if text != self.text:
1284 # Push current text to redo stack.
1285 self._redo_stack.append((self.text, self.cursor_position))
1287 # Set new text/cursor_position.
1288 self.document = Document(text, cursor_position=pos)
1289 break
1291 def redo(self) -> None:
1292 if self._redo_stack:
1293 # Copy current state on undo stack.
1294 self.save_to_undo_stack(clear_redo_stack=False)
1296 # Pop state from redo stack.
1297 text, pos = self._redo_stack.pop()
1298 self.document = Document(text, cursor_position=pos)
1300 def validate(self, set_cursor: bool = False) -> bool:
1301 """
1302 Returns `True` if valid.
1304 :param set_cursor: Set the cursor position, if an error was found.
1305 """
1306 # Don't call the validator again, if it was already called for the
1307 # current input.
1308 if self.validation_state != ValidationState.UNKNOWN:
1309 return self.validation_state == ValidationState.VALID
1311 # Call validator.
1312 if self.validator:
1313 try:
1314 self.validator.validate(self.document)
1315 except ValidationError as e:
1316 # Set cursor position (don't allow invalid values.)
1317 if set_cursor:
1318 self.cursor_position = min(
1319 max(0, e.cursor_position), len(self.text)
1320 )
1322 self.validation_state = ValidationState.INVALID
1323 self.validation_error = e
1324 return False
1326 # Handle validation result.
1327 self.validation_state = ValidationState.VALID
1328 self.validation_error = None
1329 return True
1331 async def _validate_async(self) -> None:
1332 """
1333 Asynchronous version of `validate()`.
1334 This one doesn't set the cursor position.
1336 We have both variants, because a synchronous version is required.
1337 Handling the ENTER key needs to be completely synchronous, otherwise
1338 stuff like type-ahead is going to give very weird results. (People
1339 could type input while the ENTER key is still processed.)
1341 An asynchronous version is required if we have `validate_while_typing`
1342 enabled.
1343 """
1344 while True:
1345 # Don't call the validator again, if it was already called for the
1346 # current input.
1347 if self.validation_state != ValidationState.UNKNOWN:
1348 return
1350 # Call validator.
1351 error = None
1352 document = self.document
1354 if self.validator:
1355 try:
1356 await self.validator.validate_async(self.document)
1357 except ValidationError as e:
1358 error = e
1360 # If the document changed during the validation, try again.
1361 if self.document != document:
1362 continue
1364 # Handle validation result.
1365 if error:
1366 self.validation_state = ValidationState.INVALID
1367 else:
1368 self.validation_state = ValidationState.VALID
1370 self.validation_error = error
1371 get_app().invalidate() # Trigger redraw (display error).
1373 def append_to_history(self) -> None:
1374 """
1375 Append the current input to the history.
1376 """
1377 # Save at the tail of the history. (But don't if the last entry the
1378 # history is already the same.)
1379 if self.text:
1380 history_strings = self.history.get_strings()
1381 if not len(history_strings) or history_strings[-1] != self.text:
1382 self.history.append_string(self.text)
1384 def _search(
1385 self,
1386 search_state: SearchState,
1387 include_current_position: bool = False,
1388 count: int = 1,
1389 ) -> tuple[int, int] | None:
1390 """
1391 Execute search. Return (working_index, cursor_position) tuple when this
1392 search is applied. Returns `None` when this text cannot be found.
1393 """
1394 assert count > 0
1396 text = search_state.text
1397 direction = search_state.direction
1398 ignore_case = search_state.ignore_case()
1400 def search_once(
1401 working_index: int, document: Document
1402 ) -> tuple[int, Document] | None:
1403 """
1404 Do search one time.
1405 Return (working_index, document) or `None`
1406 """
1407 if direction == SearchDirection.FORWARD:
1408 # Try find at the current input.
1409 new_index = document.find(
1410 text,
1411 include_current_position=include_current_position,
1412 ignore_case=ignore_case,
1413 )
1415 if new_index is not None:
1416 return (
1417 working_index,
1418 Document(document.text, document.cursor_position + new_index),
1419 )
1420 else:
1421 # No match, go forward in the history. (Include len+1 to wrap around.)
1422 # (Here we should always include all cursor positions, because
1423 # it's a different line.)
1424 for i in range(working_index + 1, len(self._working_lines) + 1):
1425 i %= len(self._working_lines)
1427 document = Document(self._working_lines[i], 0)
1428 new_index = document.find(
1429 text, include_current_position=True, ignore_case=ignore_case
1430 )
1431 if new_index is not None:
1432 return (i, Document(document.text, new_index))
1433 else:
1434 # Try find at the current input.
1435 new_index = document.find_backwards(text, ignore_case=ignore_case)
1437 if new_index is not None:
1438 return (
1439 working_index,
1440 Document(document.text, document.cursor_position + new_index),
1441 )
1442 else:
1443 # No match, go back in the history. (Include -1 to wrap around.)
1444 for i in range(working_index - 1, -2, -1):
1445 i %= len(self._working_lines)
1447 document = Document(
1448 self._working_lines[i], len(self._working_lines[i])
1449 )
1450 new_index = document.find_backwards(
1451 text, ignore_case=ignore_case
1452 )
1453 if new_index is not None:
1454 return (
1455 i,
1456 Document(document.text, len(document.text) + new_index),
1457 )
1458 return None
1460 # Do 'count' search iterations.
1461 working_index = self.working_index
1462 document = self.document
1463 for _ in range(count):
1464 result = search_once(working_index, document)
1465 if result is None:
1466 return None # Nothing found.
1467 else:
1468 working_index, document = result
1470 return (working_index, document.cursor_position)
1472 def document_for_search(self, search_state: SearchState) -> Document:
1473 """
1474 Return a :class:`~prompt_toolkit.document.Document` instance that has
1475 the text/cursor position for this search, if we would apply it. This
1476 will be used in the
1477 :class:`~prompt_toolkit.layout.BufferControl` to display feedback while
1478 searching.
1479 """
1480 search_result = self._search(search_state, include_current_position=True)
1482 if search_result is None:
1483 return self.document
1484 else:
1485 working_index, cursor_position = search_result
1487 # Keep selection, when `working_index` was not changed.
1488 if working_index == self.working_index:
1489 selection = self.selection_state
1490 else:
1491 selection = None
1493 return Document(
1494 self._working_lines[working_index], cursor_position, selection=selection
1495 )
1497 def get_search_position(
1498 self,
1499 search_state: SearchState,
1500 include_current_position: bool = True,
1501 count: int = 1,
1502 ) -> int:
1503 """
1504 Get the cursor position for this search.
1505 (This operation won't change the `working_index`. It's won't go through
1506 the history. Vi text objects can't span multiple items.)
1507 """
1508 search_result = self._search(
1509 search_state, include_current_position=include_current_position, count=count
1510 )
1512 if search_result is None:
1513 return self.cursor_position
1514 else:
1515 working_index, cursor_position = search_result
1516 return cursor_position
1518 def apply_search(
1519 self,
1520 search_state: SearchState,
1521 include_current_position: bool = True,
1522 count: int = 1,
1523 ) -> None:
1524 """
1525 Apply search. If something is found, set `working_index` and
1526 `cursor_position`.
1527 """
1528 search_result = self._search(
1529 search_state, include_current_position=include_current_position, count=count
1530 )
1532 if search_result is not None:
1533 working_index, cursor_position = search_result
1534 self.working_index = working_index
1535 self.cursor_position = cursor_position
1537 def exit_selection(self) -> None:
1538 self.selection_state = None
1540 def _editor_simple_tempfile(self) -> tuple[str, Callable[[], None]]:
1541 """
1542 Simple (file) tempfile implementation.
1543 Return (tempfile, cleanup_func).
1544 """
1545 suffix = to_str(self.tempfile_suffix)
1546 descriptor, filename = tempfile.mkstemp(suffix)
1548 os.write(descriptor, self.text.encode("utf-8"))
1549 os.close(descriptor)
1551 def cleanup() -> None:
1552 os.unlink(filename)
1554 return filename, cleanup
1556 def _editor_complex_tempfile(self) -> tuple[str, Callable[[], None]]:
1557 # Complex (directory) tempfile implementation.
1558 headtail = to_str(self.tempfile)
1559 if not headtail:
1560 # Revert to simple case.
1561 return self._editor_simple_tempfile()
1562 headtail = str(headtail)
1564 # Try to make according to tempfile logic.
1565 head, tail = os.path.split(headtail)
1566 if os.path.isabs(head):
1567 head = head[1:]
1569 dirpath = tempfile.mkdtemp()
1570 if head:
1571 dirpath = os.path.join(dirpath, head)
1572 # Assume there is no issue creating dirs in this temp dir.
1573 os.makedirs(dirpath)
1575 # Open the filename and write current text.
1576 filename = os.path.join(dirpath, tail)
1577 with open(filename, "w", encoding="utf-8") as fh:
1578 fh.write(self.text)
1580 def cleanup() -> None:
1581 shutil.rmtree(dirpath)
1583 return filename, cleanup
1585 def open_in_editor(self, validate_and_handle: bool = False) -> asyncio.Task[None]:
1586 """
1587 Open code in editor.
1589 This returns a future, and runs in a thread executor.
1590 """
1591 if self.read_only():
1592 raise EditReadOnlyBuffer()
1594 # Write current text to temporary file
1595 if self.tempfile:
1596 filename, cleanup_func = self._editor_complex_tempfile()
1597 else:
1598 filename, cleanup_func = self._editor_simple_tempfile()
1600 async def run() -> None:
1601 try:
1602 # Open in editor
1603 # (We need to use `run_in_terminal`, because not all editors go to
1604 # the alternate screen buffer, and some could influence the cursor
1605 # position.)
1606 succes = await run_in_terminal(
1607 lambda: self._open_file_in_editor(filename), in_executor=True
1608 )
1610 # Read content again.
1611 if succes:
1612 with open(filename, "rb") as f:
1613 text = f.read().decode("utf-8")
1615 # Drop trailing newline. (Editors are supposed to add it at the
1616 # end, but we don't need it.)
1617 if text.endswith("\n"):
1618 text = text[:-1]
1620 self.document = Document(text=text, cursor_position=len(text))
1622 # Accept the input.
1623 if validate_and_handle:
1624 self.validate_and_handle()
1626 finally:
1627 # Clean up temp dir/file.
1628 cleanup_func()
1630 return get_app().create_background_task(run())
1632 def _open_file_in_editor(self, filename: str) -> bool:
1633 """
1634 Call editor executable.
1636 Return True when we received a zero return code.
1637 """
1638 # If the 'VISUAL' or 'EDITOR' environment variable has been set, use that.
1639 # Otherwise, fall back to the first available editor that we can find.
1640 visual = os.environ.get("VISUAL")
1641 editor = os.environ.get("EDITOR")
1643 editors = [
1644 visual,
1645 editor,
1646 # Order of preference.
1647 "/usr/bin/editor",
1648 "/usr/bin/nano",
1649 "/usr/bin/pico",
1650 "/usr/bin/vi",
1651 "/usr/bin/emacs",
1652 ]
1654 for e in editors:
1655 if e:
1656 try:
1657 # Use 'shlex.split()', because $VISUAL can contain spaces
1658 # and quotes.
1659 returncode = subprocess.call(shlex.split(e) + [filename])
1660 return returncode == 0
1662 except OSError:
1663 # Executable does not exist, try the next one.
1664 pass
1666 return False
1668 def start_completion(
1669 self,
1670 select_first: bool = False,
1671 select_last: bool = False,
1672 insert_common_part: bool = False,
1673 complete_event: CompleteEvent | None = None,
1674 ) -> None:
1675 """
1676 Start asynchronous autocompletion of this buffer.
1677 (This will do nothing if a previous completion was still in progress.)
1678 """
1679 # Only one of these options can be selected.
1680 assert select_first + select_last + insert_common_part <= 1
1682 get_app().create_background_task(
1683 self._async_completer(
1684 select_first=select_first,
1685 select_last=select_last,
1686 insert_common_part=insert_common_part,
1687 complete_event=complete_event
1688 or CompleteEvent(completion_requested=True),
1689 )
1690 )
1692 def _create_completer_coroutine(self) -> Callable[..., Coroutine[Any, Any, None]]:
1693 """
1694 Create function for asynchronous autocompletion.
1696 (This consumes the asynchronous completer generator, which possibly
1697 runs the completion algorithm in another thread.)
1698 """
1700 def completion_does_nothing(document: Document, completion: Completion) -> bool:
1701 """
1702 Return `True` if applying this completion doesn't have any effect.
1703 (When it doesn't insert any new text.
1704 """
1705 text_before_cursor = document.text_before_cursor
1706 replaced_text = text_before_cursor[
1707 len(text_before_cursor) + completion.start_position :
1708 ]
1709 return replaced_text == completion.text
1711 @_only_one_at_a_time
1712 async def async_completer(
1713 select_first: bool = False,
1714 select_last: bool = False,
1715 insert_common_part: bool = False,
1716 complete_event: CompleteEvent | None = None,
1717 ) -> None:
1718 document = self.document
1719 complete_event = complete_event or CompleteEvent(text_inserted=True)
1721 # Don't complete when we already have completions.
1722 if self.complete_state or not self.completer:
1723 return
1725 # Create an empty CompletionState.
1726 complete_state = CompletionState(original_document=self.document)
1727 self.complete_state = complete_state
1729 def proceed() -> bool:
1730 """Keep retrieving completions. Input text has not yet changed
1731 while generating completions."""
1732 return self.complete_state == complete_state
1734 refresh_needed = asyncio.Event()
1736 async def refresh_while_loading() -> None:
1737 """Background loop to refresh the UI at most 3 times a second
1738 while the completion are loading. Calling
1739 `on_completions_changed.fire()` for every completion that we
1740 receive is too expensive when there are many completions. (We
1741 could tune `Application.max_render_postpone_time` and
1742 `Application.min_redraw_interval`, but having this here is a
1743 better approach.)
1744 """
1745 while True:
1746 self.on_completions_changed.fire()
1747 refresh_needed.clear()
1748 await asyncio.sleep(0.3)
1749 await refresh_needed.wait()
1751 refresh_task = asyncio.ensure_future(refresh_while_loading())
1752 try:
1753 # Load.
1754 async with aclosing(
1755 self.completer.get_completions_async(document, complete_event)
1756 ) as async_generator:
1757 async for completion in async_generator:
1758 complete_state.completions.append(completion)
1759 refresh_needed.set()
1761 # If the input text changes, abort.
1762 if not proceed():
1763 break
1764 finally:
1765 refresh_task.cancel()
1767 # Refresh one final time after we got everything.
1768 self.on_completions_changed.fire()
1770 completions = complete_state.completions
1772 # When there is only one completion, which has nothing to add, ignore it.
1773 if len(completions) == 1 and completion_does_nothing(
1774 document, completions[0]
1775 ):
1776 del completions[:]
1778 # Set completions if the text was not yet changed.
1779 if proceed():
1780 # When no completions were found, or when the user selected
1781 # already a completion by using the arrow keys, don't do anything.
1782 if (
1783 not self.complete_state
1784 or self.complete_state.complete_index is not None
1785 ):
1786 return
1788 # When there are no completions, reset completion state anyway.
1789 if not completions:
1790 self.complete_state = None
1791 # Render the ui if the completion menu was shown
1792 # it is needed especially if there is one completion and it was deleted.
1793 self.on_completions_changed.fire()
1794 return
1796 # Select first/last or insert common part, depending on the key
1797 # binding. (For this we have to wait until all completions are
1798 # loaded.)
1800 if select_first:
1801 self.go_to_completion(0)
1803 elif select_last:
1804 self.go_to_completion(len(completions) - 1)
1806 elif insert_common_part:
1807 common_part = get_common_complete_suffix(document, completions)
1808 if common_part:
1809 # Insert the common part, update completions.
1810 self.insert_text(common_part)
1811 if len(completions) > 1:
1812 # (Don't call `async_completer` again, but
1813 # recalculate completions. See:
1814 # https://github.com/ipython/ipython/issues/9658)
1815 completions[:] = [
1816 c.new_completion_from_position(len(common_part))
1817 for c in completions
1818 ]
1820 self._set_completions(completions=completions)
1821 else:
1822 self.complete_state = None
1823 else:
1824 # When we were asked to insert the "common"
1825 # prefix, but there was no common suffix but
1826 # still exactly one match, then select the
1827 # first. (It could be that we have a completion
1828 # which does * expansion, like '*.py', with
1829 # exactly one match.)
1830 if len(completions) == 1:
1831 self.go_to_completion(0)
1833 else:
1834 # If the last operation was an insert, (not a delete), restart
1835 # the completion coroutine.
1837 if self.document.text_before_cursor == document.text_before_cursor:
1838 return # Nothing changed.
1840 if self.document.text_before_cursor.startswith(
1841 document.text_before_cursor
1842 ):
1843 raise _Retry
1845 return async_completer
1847 def _create_auto_suggest_coroutine(self) -> Callable[[], Coroutine[Any, Any, None]]:
1848 """
1849 Create function for asynchronous auto suggestion.
1850 (This can be in another thread.)
1851 """
1853 @_only_one_at_a_time
1854 async def async_suggestor() -> None:
1855 document = self.document
1857 # Don't suggest when we already have a suggestion.
1858 if self.suggestion or not self.auto_suggest:
1859 return
1861 suggestion = await self.auto_suggest.get_suggestion_async(self, document)
1863 # Set suggestion only if the text was not yet changed.
1864 if self.document == document:
1865 # Set suggestion and redraw interface.
1866 self.suggestion = suggestion
1867 self.on_suggestion_set.fire()
1868 else:
1869 # Otherwise, restart thread.
1870 raise _Retry
1872 return async_suggestor
1874 def _create_auto_validate_coroutine(
1875 self,
1876 ) -> Callable[[], Coroutine[Any, Any, None]]:
1877 """
1878 Create a function for asynchronous validation while typing.
1879 (This can be in another thread.)
1880 """
1882 @_only_one_at_a_time
1883 async def async_validator() -> None:
1884 await self._validate_async()
1886 return async_validator
1888 def validate_and_handle(self) -> None:
1889 """
1890 Validate buffer and handle the accept action.
1891 """
1892 valid = self.validate(set_cursor=True)
1894 # When the validation succeeded, accept the input.
1895 if valid:
1896 if self.accept_handler:
1897 keep_text = self.accept_handler(self)
1898 else:
1899 keep_text = False
1901 self.append_to_history()
1903 if not keep_text:
1904 self.reset()
1907_T = TypeVar("_T", bound=Callable[..., Awaitable[None]])
1910def _only_one_at_a_time(coroutine: _T) -> _T:
1911 """
1912 Decorator that only starts the coroutine only if the previous call has
1913 finished. (Used to make sure that we have only one autocompleter, auto
1914 suggestor and validator running at a time.)
1916 When the coroutine raises `_Retry`, it is restarted.
1917 """
1918 running = False
1920 @wraps(coroutine)
1921 async def new_coroutine(*a: Any, **kw: Any) -> Any:
1922 nonlocal running
1924 # Don't start a new function, if the previous is still in progress.
1925 if running:
1926 return
1928 running = True
1930 try:
1931 while True:
1932 try:
1933 await coroutine(*a, **kw)
1934 except _Retry:
1935 continue
1936 else:
1937 return None
1938 finally:
1939 running = False
1941 return cast(_T, new_coroutine)
1944class _Retry(Exception):
1945 "Retry in `_only_one_at_a_time`."
1948def indent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None:
1949 """
1950 Indent text of a :class:`.Buffer` object.
1951 """
1952 current_row = buffer.document.cursor_position_row
1953 line_range = range(from_row, to_row)
1955 # Apply transformation.
1956 new_text = buffer.transform_lines(line_range, lambda l: " " * count + l)
1957 buffer.document = Document(
1958 new_text, Document(new_text).translate_row_col_to_index(current_row, 0)
1959 )
1961 # Go to the start of the line.
1962 buffer.cursor_position += buffer.document.get_start_of_line_position(
1963 after_whitespace=True
1964 )
1967def unindent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None:
1968 """
1969 Unindent text of a :class:`.Buffer` object.
1970 """
1971 current_row = buffer.document.cursor_position_row
1972 line_range = range(from_row, to_row)
1974 def transform(text: str) -> str:
1975 remove = " " * count
1976 if text.startswith(remove):
1977 return text[len(remove) :]
1978 else:
1979 return text.lstrip()
1981 # Apply transformation.
1982 new_text = buffer.transform_lines(line_range, transform)
1983 buffer.document = Document(
1984 new_text, Document(new_text).translate_row_col_to_index(current_row, 0)
1985 )
1987 # Go to the start of the line.
1988 buffer.cursor_position += buffer.document.get_start_of_line_position(
1989 after_whitespace=True
1990 )
1993def reshape_text(buffer: Buffer, from_row: int, to_row: int) -> None:
1994 """
1995 Reformat text, taking the width into account.
1996 `to_row` is included.
1997 (Vi 'gq' operator.)
1998 """
1999 lines = buffer.text.splitlines(True)
2000 lines_before = lines[:from_row]
2001 lines_after = lines[to_row + 1 :]
2002 lines_to_reformat = lines[from_row : to_row + 1]
2004 if lines_to_reformat:
2005 # Take indentation from the first line.
2006 match = re.search(r"^\s*", lines_to_reformat[0])
2007 length = match.end() if match else 0 # `match` can't be None, actually.
2009 indent = lines_to_reformat[0][:length].replace("\n", "")
2011 # Now, take all the 'words' from the lines to be reshaped.
2012 words = "".join(lines_to_reformat).split()
2014 # And reshape.
2015 width = (buffer.text_width or 80) - len(indent)
2016 reshaped_text = [indent]
2017 current_width = 0
2018 for w in words:
2019 if current_width:
2020 if len(w) + current_width + 1 > width:
2021 reshaped_text.append("\n")
2022 reshaped_text.append(indent)
2023 current_width = 0
2024 else:
2025 reshaped_text.append(" ")
2026 current_width += 1
2028 reshaped_text.append(w)
2029 current_width += len(w)
2031 if reshaped_text[-1] != "\n":
2032 reshaped_text.append("\n")
2034 # Apply result.
2035 buffer.document = Document(
2036 text="".join(lines_before + reshaped_text + lines_after),
2037 cursor_position=len("".join(lines_before + reshaped_text)),
2038 )