Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/prompt_toolkit/buffer.py: 17%
807 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:05 +0000
1"""
2Data structures for the Buffer.
3It holds the text, cursor position, history, etc...
4"""
5from __future__ import annotations
7import asyncio
8import logging
9import os
10import re
11import shlex
12import shutil
13import subprocess
14import tempfile
15from collections import deque
16from enum import Enum
17from functools import wraps
18from typing import Any, Awaitable, Callable, Coroutine, Deque, Iterable, TypeVar, cast
20from .application.current import get_app
21from .application.run_in_terminal import run_in_terminal
22from .auto_suggest import AutoSuggest, Suggestion
23from .cache import FastDictCache
24from .clipboard import ClipboardData
25from .completion import (
26 CompleteEvent,
27 Completer,
28 Completion,
29 DummyCompleter,
30 get_common_complete_suffix,
31)
32from .document import Document
33from .eventloop import aclosing
34from .filters import FilterOrBool, to_filter
35from .history import History, InMemoryHistory
36from .search import SearchDirection, SearchState
37from .selection import PasteMode, SelectionState, SelectionType
38from .utils import Event, to_str
39from .validation import ValidationError, Validator
41__all__ = [
42 "EditReadOnlyBuffer",
43 "Buffer",
44 "CompletionState",
45 "indent",
46 "unindent",
47 "reshape_text",
48]
50logger = logging.getLogger(__name__)
53class EditReadOnlyBuffer(Exception):
54 "Attempt editing of read-only :class:`.Buffer`."
57class ValidationState(Enum):
58 "The validation state of a buffer. This is set after the validation."
59 VALID = "VALID"
60 INVALID = "INVALID"
61 UNKNOWN = "UNKNOWN"
64class CompletionState:
65 """
66 Immutable class that contains a completion state.
67 """
69 def __init__(
70 self,
71 original_document: Document,
72 completions: list[Completion] | None = None,
73 complete_index: int | None = None,
74 ) -> None:
75 #: Document as it was when the completion started.
76 self.original_document = original_document
78 #: List of all the current Completion instances which are possible at
79 #: this point.
80 self.completions = completions or []
82 #: Position in the `completions` array.
83 #: This can be `None` to indicate "no completion", the original text.
84 self.complete_index = complete_index # Position in the `_completions` array.
86 def __repr__(self) -> str:
87 return "{}({!r}, <{!r}> completions, index={!r})".format(
88 self.__class__.__name__,
89 self.original_document,
90 len(self.completions),
91 self.complete_index,
92 )
94 def go_to_index(self, index: int | None) -> None:
95 """
96 Create a new :class:`.CompletionState` object with the new index.
98 When `index` is `None` deselect the completion.
99 """
100 if self.completions:
101 assert index is None or 0 <= index < len(self.completions)
102 self.complete_index = index
104 def new_text_and_position(self) -> tuple[str, int]:
105 """
106 Return (new_text, new_cursor_position) for this completion.
107 """
108 if self.complete_index is None:
109 return self.original_document.text, self.original_document.cursor_position
110 else:
111 original_text_before_cursor = self.original_document.text_before_cursor
112 original_text_after_cursor = self.original_document.text_after_cursor
114 c = self.completions[self.complete_index]
115 if c.start_position == 0:
116 before = original_text_before_cursor
117 else:
118 before = original_text_before_cursor[: c.start_position]
120 new_text = before + c.text + original_text_after_cursor
121 new_cursor_position = len(before) + len(c.text)
122 return new_text, new_cursor_position
124 @property
125 def current_completion(self) -> Completion | None:
126 """
127 Return the current completion, or return `None` when no completion is
128 selected.
129 """
130 if self.complete_index is not None:
131 return self.completions[self.complete_index]
132 return None
135_QUOTED_WORDS_RE = re.compile(r"""(\s+|".*?"|'.*?')""")
138class YankNthArgState:
139 """
140 For yank-last-arg/yank-nth-arg: Keep track of where we are in the history.
141 """
143 def __init__(
144 self, history_position: int = 0, n: int = -1, previous_inserted_word: str = ""
145 ) -> None:
146 self.history_position = history_position
147 self.previous_inserted_word = previous_inserted_word
148 self.n = n
150 def __repr__(self) -> str:
151 return "{}(history_position={!r}, n={!r}, previous_inserted_word={!r})".format(
152 self.__class__.__name__,
153 self.history_position,
154 self.n,
155 self.previous_inserted_word,
156 )
159BufferEventHandler = Callable[["Buffer"], None]
160BufferAcceptHandler = Callable[["Buffer"], bool]
163class Buffer:
164 """
165 The core data structure that holds the text and cursor position of the
166 current input line and implements all text manipulations on top of it. It
167 also implements the history, undo stack and the completion state.
169 :param completer: :class:`~prompt_toolkit.completion.Completer` instance.
170 :param history: :class:`~prompt_toolkit.history.History` instance.
171 :param tempfile_suffix: The tempfile suffix (extension) to be used for the
172 "open in editor" function. For a Python REPL, this would be ".py", so
173 that the editor knows the syntax highlighting to use. This can also be
174 a callable that returns a string.
175 :param tempfile: For more advanced tempfile situations where you need
176 control over the subdirectories and filename. For a Git Commit Message,
177 this would be ".git/COMMIT_EDITMSG", so that the editor knows the syntax
178 highlighting to use. This can also be a callable that returns a string.
179 :param name: Name for this buffer. E.g. DEFAULT_BUFFER. This is mostly
180 useful for key bindings where we sometimes prefer to refer to a buffer
181 by their name instead of by reference.
182 :param accept_handler: Called when the buffer input is accepted. (Usually
183 when the user presses `enter`.) The accept handler receives this
184 `Buffer` as input and should return True when the buffer text should be
185 kept instead of calling reset.
187 In case of a `PromptSession` for instance, we want to keep the text,
188 because we will exit the application, and only reset it during the next
189 run.
191 Events:
193 :param on_text_changed: When the buffer text changes. (Callable or None.)
194 :param on_text_insert: When new text is inserted. (Callable or None.)
195 :param on_cursor_position_changed: When the cursor moves. (Callable or None.)
196 :param on_completions_changed: When the completions were changed. (Callable or None.)
197 :param on_suggestion_set: When an auto-suggestion text has been set. (Callable or None.)
199 Filters:
201 :param complete_while_typing: :class:`~prompt_toolkit.filters.Filter`
202 or `bool`. Decide whether or not to do asynchronous autocompleting while
203 typing.
204 :param validate_while_typing: :class:`~prompt_toolkit.filters.Filter`
205 or `bool`. Decide whether or not to do asynchronous validation while
206 typing.
207 :param enable_history_search: :class:`~prompt_toolkit.filters.Filter` or
208 `bool` to indicate when up-arrow partial string matching is enabled. It
209 is advised to not enable this at the same time as
210 `complete_while_typing`, because when there is an autocompletion found,
211 the up arrows usually browse through the completions, rather than
212 through the history.
213 :param read_only: :class:`~prompt_toolkit.filters.Filter`. When True,
214 changes will not be allowed.
215 :param multiline: :class:`~prompt_toolkit.filters.Filter` or `bool`. When
216 not set, pressing `Enter` will call the `accept_handler`. Otherwise,
217 pressing `Esc-Enter` is required.
218 """
220 def __init__(
221 self,
222 completer: Completer | None = None,
223 auto_suggest: AutoSuggest | None = None,
224 history: History | None = None,
225 validator: Validator | None = None,
226 tempfile_suffix: str | Callable[[], str] = "",
227 tempfile: str | Callable[[], str] = "",
228 name: str = "",
229 complete_while_typing: FilterOrBool = False,
230 validate_while_typing: FilterOrBool = False,
231 enable_history_search: FilterOrBool = False,
232 document: Document | None = None,
233 accept_handler: BufferAcceptHandler | None = None,
234 read_only: FilterOrBool = False,
235 multiline: FilterOrBool = True,
236 on_text_changed: BufferEventHandler | None = None,
237 on_text_insert: BufferEventHandler | None = None,
238 on_cursor_position_changed: BufferEventHandler | None = None,
239 on_completions_changed: BufferEventHandler | None = None,
240 on_suggestion_set: BufferEventHandler | None = None,
241 ):
242 # Accept both filters and booleans as input.
243 enable_history_search = to_filter(enable_history_search)
244 complete_while_typing = to_filter(complete_while_typing)
245 validate_while_typing = to_filter(validate_while_typing)
246 read_only = to_filter(read_only)
247 multiline = to_filter(multiline)
249 self.completer = completer or DummyCompleter()
250 self.auto_suggest = auto_suggest
251 self.validator = validator
252 self.tempfile_suffix = tempfile_suffix
253 self.tempfile = tempfile
254 self.name = name
255 self.accept_handler = accept_handler
257 # Filters. (Usually, used by the key bindings to drive the buffer.)
258 self.complete_while_typing = complete_while_typing
259 self.validate_while_typing = validate_while_typing
260 self.enable_history_search = enable_history_search
261 self.read_only = read_only
262 self.multiline = multiline
264 # Text width. (For wrapping, used by the Vi 'gq' operator.)
265 self.text_width = 0
267 #: The command buffer history.
268 # Note that we shouldn't use a lazy 'or' here. bool(history) could be
269 # False when empty.
270 self.history = InMemoryHistory() if history is None else history
272 self.__cursor_position = 0
274 # Events
275 self.on_text_changed: Event[Buffer] = Event(self, on_text_changed)
276 self.on_text_insert: Event[Buffer] = Event(self, on_text_insert)
277 self.on_cursor_position_changed: Event[Buffer] = Event(
278 self, on_cursor_position_changed
279 )
280 self.on_completions_changed: Event[Buffer] = Event(self, on_completions_changed)
281 self.on_suggestion_set: Event[Buffer] = Event(self, on_suggestion_set)
283 # Document cache. (Avoid creating new Document instances.)
284 self._document_cache: FastDictCache[
285 tuple[str, int, SelectionState | None], Document
286 ] = FastDictCache(Document, size=10)
288 # Create completer / auto suggestion / validation coroutines.
289 self._async_suggester = self._create_auto_suggest_coroutine()
290 self._async_completer = self._create_completer_coroutine()
291 self._async_validator = self._create_auto_validate_coroutine()
293 # Asyncio task for populating the history.
294 self._load_history_task: asyncio.Future[None] | None = None
296 # Reset other attributes.
297 self.reset(document=document)
299 def __repr__(self) -> str:
300 if len(self.text) < 15:
301 text = self.text
302 else:
303 text = self.text[:12] + "..."
305 return f"<Buffer(name={self.name!r}, text={text!r}) at {id(self)!r}>"
307 def reset(
308 self, document: Document | None = None, append_to_history: bool = False
309 ) -> None:
310 """
311 :param append_to_history: Append current input to history first.
312 """
313 if append_to_history:
314 self.append_to_history()
316 document = document or Document()
318 self.__cursor_position = document.cursor_position
320 # `ValidationError` instance. (Will be set when the input is wrong.)
321 self.validation_error: ValidationError | None = None
322 self.validation_state: ValidationState | None = ValidationState.UNKNOWN
324 # State of the selection.
325 self.selection_state: SelectionState | None = None
327 # Multiple cursor mode. (When we press 'I' or 'A' in visual-block mode,
328 # we can insert text on multiple lines at once. This is implemented by
329 # using multiple cursors.)
330 self.multiple_cursor_positions: list[int] = []
332 # When doing consecutive up/down movements, prefer to stay at this column.
333 self.preferred_column: int | None = None
335 # State of complete browser
336 # For interactive completion through Ctrl-N/Ctrl-P.
337 self.complete_state: CompletionState | None = None
339 # State of Emacs yank-nth-arg completion.
340 self.yank_nth_arg_state: YankNthArgState | None = None # for yank-nth-arg.
342 # Remember the document that we had *right before* the last paste
343 # operation. This is used for rotating through the kill ring.
344 self.document_before_paste: Document | None = None
346 # Current suggestion.
347 self.suggestion: Suggestion | None = None
349 # The history search text. (Used for filtering the history when we
350 # browse through it.)
351 self.history_search_text: str | None = None
353 # Undo/redo stacks (stack of `(text, cursor_position)`).
354 self._undo_stack: list[tuple[str, int]] = []
355 self._redo_stack: list[tuple[str, int]] = []
357 # Cancel history loader. If history loading was still ongoing.
358 # Cancel the `_load_history_task`, so that next repaint of the
359 # `BufferControl` we will repopulate it.
360 if self._load_history_task is not None:
361 self._load_history_task.cancel()
362 self._load_history_task = None
364 #: The working lines. Similar to history, except that this can be
365 #: modified. The user can press arrow_up and edit previous entries.
366 #: Ctrl-C should reset this, and copy the whole history back in here.
367 #: Enter should process the current command and append to the real
368 #: history.
369 self._working_lines: Deque[str] = deque([document.text])
370 self.__working_index = 0
372 def load_history_if_not_yet_loaded(self) -> None:
373 """
374 Create task for populating the buffer history (if not yet done).
376 Note::
378 This needs to be called from within the event loop of the
379 application, because history loading is async, and we need to be
380 sure the right event loop is active. Therefor, we call this method
381 in the `BufferControl.create_content`.
383 There are situations where prompt_toolkit applications are created
384 in one thread, but will later run in a different thread (Ptpython
385 is one example. The REPL runs in a separate thread, in order to
386 prevent interfering with a potential different event loop in the
387 main thread. The REPL UI however is still created in the main
388 thread.) We could decide to not support creating prompt_toolkit
389 objects in one thread and running the application in a different
390 thread, but history loading is the only place where it matters, and
391 this solves it.
392 """
393 if self._load_history_task is None:
395 async def load_history() -> None:
396 async for item in self.history.load():
397 self._working_lines.appendleft(item)
398 self.__working_index += 1
400 self._load_history_task = get_app().create_background_task(load_history())
402 def load_history_done(f: asyncio.Future[None]) -> None:
403 """
404 Handle `load_history` result when either done, cancelled, or
405 when an exception was raised.
406 """
407 try:
408 f.result()
409 except asyncio.CancelledError:
410 # Ignore cancellation. But handle it, so that we don't get
411 # this traceback.
412 pass
413 except GeneratorExit:
414 # Probably not needed, but we had situations where
415 # `GeneratorExit` was raised in `load_history` during
416 # cancellation.
417 pass
418 except BaseException:
419 # Log error if something goes wrong. (We don't have a
420 # caller to which we can propagate this exception.)
421 logger.exception("Loading history failed")
423 self._load_history_task.add_done_callback(load_history_done)
425 # <getters/setters>
427 def _set_text(self, value: str) -> bool:
428 """set text at current working_index. Return whether it changed."""
429 working_index = self.working_index
430 working_lines = self._working_lines
432 original_value = working_lines[working_index]
433 working_lines[working_index] = value
435 # Return True when this text has been changed.
436 if len(value) != len(original_value):
437 # For Python 2, it seems that when two strings have a different
438 # length and one is a prefix of the other, Python still scans
439 # character by character to see whether the strings are different.
440 # (Some benchmarking showed significant differences for big
441 # documents. >100,000 of lines.)
442 return True
443 elif value != original_value:
444 return True
445 return False
447 def _set_cursor_position(self, value: int) -> bool:
448 """Set cursor position. Return whether it changed."""
449 original_position = self.__cursor_position
450 self.__cursor_position = max(0, value)
452 return self.__cursor_position != original_position
454 @property
455 def text(self) -> str:
456 return self._working_lines[self.working_index]
458 @text.setter
459 def text(self, value: str) -> None:
460 """
461 Setting text. (When doing this, make sure that the cursor_position is
462 valid for this text. text/cursor_position should be consistent at any time,
463 otherwise set a Document instead.)
464 """
465 # Ensure cursor position remains within the size of the text.
466 if self.cursor_position > len(value):
467 self.cursor_position = len(value)
469 # Don't allow editing of read-only buffers.
470 if self.read_only():
471 raise EditReadOnlyBuffer()
473 changed = self._set_text(value)
475 if changed:
476 self._text_changed()
478 # Reset history search text.
479 # (Note that this doesn't need to happen when working_index
480 # changes, which is when we traverse the history. That's why we
481 # don't do this in `self._text_changed`.)
482 self.history_search_text = None
484 @property
485 def cursor_position(self) -> int:
486 return self.__cursor_position
488 @cursor_position.setter
489 def cursor_position(self, value: int) -> None:
490 """
491 Setting cursor position.
492 """
493 assert isinstance(value, int)
495 # Ensure cursor position is within the size of the text.
496 if value > len(self.text):
497 value = len(self.text)
498 if value < 0:
499 value = 0
501 changed = self._set_cursor_position(value)
503 if changed:
504 self._cursor_position_changed()
506 @property
507 def working_index(self) -> int:
508 return self.__working_index
510 @working_index.setter
511 def working_index(self, value: int) -> None:
512 if self.__working_index != value:
513 self.__working_index = value
514 # Make sure to reset the cursor position, otherwise we end up in
515 # situations where the cursor position is out of the bounds of the
516 # text.
517 self.cursor_position = 0
518 self._text_changed()
520 def _text_changed(self) -> None:
521 # Remove any validation errors and complete state.
522 self.validation_error = None
523 self.validation_state = ValidationState.UNKNOWN
524 self.complete_state = None
525 self.yank_nth_arg_state = None
526 self.document_before_paste = None
527 self.selection_state = None
528 self.suggestion = None
529 self.preferred_column = None
531 # fire 'on_text_changed' event.
532 self.on_text_changed.fire()
534 # Input validation.
535 # (This happens on all change events, unlike auto completion, also when
536 # deleting text.)
537 if self.validator and self.validate_while_typing():
538 get_app().create_background_task(self._async_validator())
540 def _cursor_position_changed(self) -> None:
541 # Remove any complete state.
542 # (Input validation should only be undone when the cursor position
543 # changes.)
544 self.complete_state = None
545 self.yank_nth_arg_state = None
546 self.document_before_paste = None
548 # Unset preferred_column. (Will be set after the cursor movement, if
549 # required.)
550 self.preferred_column = None
552 # Note that the cursor position can change if we have a selection the
553 # new position of the cursor determines the end of the selection.
555 # fire 'on_cursor_position_changed' event.
556 self.on_cursor_position_changed.fire()
558 @property
559 def document(self) -> Document:
560 """
561 Return :class:`~prompt_toolkit.document.Document` instance from the
562 current text, cursor position and selection state.
563 """
564 return self._document_cache[
565 self.text, self.cursor_position, self.selection_state
566 ]
568 @document.setter
569 def document(self, value: Document) -> None:
570 """
571 Set :class:`~prompt_toolkit.document.Document` instance.
573 This will set both the text and cursor position at the same time, but
574 atomically. (Change events will be triggered only after both have been set.)
575 """
576 self.set_document(value)
578 def set_document(self, value: Document, bypass_readonly: bool = False) -> None:
579 """
580 Set :class:`~prompt_toolkit.document.Document` instance. Like the
581 ``document`` property, but accept an ``bypass_readonly`` argument.
583 :param bypass_readonly: When True, don't raise an
584 :class:`.EditReadOnlyBuffer` exception, even
585 when the buffer is read-only.
587 .. warning::
589 When this buffer is read-only and `bypass_readonly` was not passed,
590 the `EditReadOnlyBuffer` exception will be caught by the
591 `KeyProcessor` and is silently suppressed. This is important to
592 keep in mind when writing key bindings, because it won't do what
593 you expect, and there won't be a stack trace. Use try/finally
594 around this function if you need some cleanup code.
595 """
596 # Don't allow editing of read-only buffers.
597 if not bypass_readonly and self.read_only():
598 raise EditReadOnlyBuffer()
600 # Set text and cursor position first.
601 text_changed = self._set_text(value.text)
602 cursor_position_changed = self._set_cursor_position(value.cursor_position)
604 # Now handle change events. (We do this when text/cursor position is
605 # both set and consistent.)
606 if text_changed:
607 self._text_changed()
608 self.history_search_text = None
610 if cursor_position_changed:
611 self._cursor_position_changed()
613 @property
614 def is_returnable(self) -> bool:
615 """
616 True when there is something handling accept.
617 """
618 return bool(self.accept_handler)
620 # End of <getters/setters>
622 def save_to_undo_stack(self, clear_redo_stack: bool = True) -> None:
623 """
624 Safe current state (input text and cursor position), so that we can
625 restore it by calling undo.
626 """
627 # Safe if the text is different from the text at the top of the stack
628 # is different. If the text is the same, just update the cursor position.
629 if self._undo_stack and self._undo_stack[-1][0] == self.text:
630 self._undo_stack[-1] = (self._undo_stack[-1][0], self.cursor_position)
631 else:
632 self._undo_stack.append((self.text, self.cursor_position))
634 # Saving anything to the undo stack, clears the redo stack.
635 if clear_redo_stack:
636 self._redo_stack = []
638 def transform_lines(
639 self,
640 line_index_iterator: Iterable[int],
641 transform_callback: Callable[[str], str],
642 ) -> str:
643 """
644 Transforms the text on a range of lines.
645 When the iterator yield an index not in the range of lines that the
646 document contains, it skips them silently.
648 To uppercase some lines::
650 new_text = transform_lines(range(5,10), lambda text: text.upper())
652 :param line_index_iterator: Iterator of line numbers (int)
653 :param transform_callback: callable that takes the original text of a
654 line, and return the new text for this line.
656 :returns: The new text.
657 """
658 # Split lines
659 lines = self.text.split("\n")
661 # Apply transformation
662 for index in line_index_iterator:
663 try:
664 lines[index] = transform_callback(lines[index])
665 except IndexError:
666 pass
668 return "\n".join(lines)
670 def transform_current_line(self, transform_callback: Callable[[str], str]) -> None:
671 """
672 Apply the given transformation function to the current line.
674 :param transform_callback: callable that takes a string and return a new string.
675 """
676 document = self.document
677 a = document.cursor_position + document.get_start_of_line_position()
678 b = document.cursor_position + document.get_end_of_line_position()
679 self.text = (
680 document.text[:a]
681 + transform_callback(document.text[a:b])
682 + document.text[b:]
683 )
685 def transform_region(
686 self, from_: int, to: int, transform_callback: Callable[[str], str]
687 ) -> None:
688 """
689 Transform a part of the input string.
691 :param from_: (int) start position.
692 :param to: (int) end position.
693 :param transform_callback: Callable which accepts a string and returns
694 the transformed string.
695 """
696 assert from_ < to
698 self.text = "".join(
699 [
700 self.text[:from_]
701 + transform_callback(self.text[from_:to])
702 + self.text[to:]
703 ]
704 )
706 def cursor_left(self, count: int = 1) -> None:
707 self.cursor_position += self.document.get_cursor_left_position(count=count)
709 def cursor_right(self, count: int = 1) -> None:
710 self.cursor_position += self.document.get_cursor_right_position(count=count)
712 def cursor_up(self, count: int = 1) -> None:
713 """(for multiline edit). Move cursor to the previous line."""
714 original_column = self.preferred_column or self.document.cursor_position_col
715 self.cursor_position += self.document.get_cursor_up_position(
716 count=count, preferred_column=original_column
717 )
719 # Remember the original column for the next up/down movement.
720 self.preferred_column = original_column
722 def cursor_down(self, count: int = 1) -> None:
723 """(for multiline edit). Move cursor to the next line."""
724 original_column = self.preferred_column or self.document.cursor_position_col
725 self.cursor_position += self.document.get_cursor_down_position(
726 count=count, preferred_column=original_column
727 )
729 # Remember the original column for the next up/down movement.
730 self.preferred_column = original_column
732 def auto_up(
733 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False
734 ) -> None:
735 """
736 If we're not on the first line (of a multiline input) go a line up,
737 otherwise go back in history. (If nothing is selected.)
738 """
739 if self.complete_state:
740 self.complete_previous(count=count)
741 elif self.document.cursor_position_row > 0:
742 self.cursor_up(count=count)
743 elif not self.selection_state:
744 self.history_backward(count=count)
746 # Go to the start of the line?
747 if go_to_start_of_line_if_history_changes:
748 self.cursor_position += self.document.get_start_of_line_position()
750 def auto_down(
751 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False
752 ) -> None:
753 """
754 If we're not on the last line (of a multiline input) go a line down,
755 otherwise go forward in history. (If nothing is selected.)
756 """
757 if self.complete_state:
758 self.complete_next(count=count)
759 elif self.document.cursor_position_row < self.document.line_count - 1:
760 self.cursor_down(count=count)
761 elif not self.selection_state:
762 self.history_forward(count=count)
764 # Go to the start of the line?
765 if go_to_start_of_line_if_history_changes:
766 self.cursor_position += self.document.get_start_of_line_position()
768 def delete_before_cursor(self, count: int = 1) -> str:
769 """
770 Delete specified number of characters before cursor and return the
771 deleted text.
772 """
773 assert count >= 0
774 deleted = ""
776 if self.cursor_position > 0:
777 deleted = self.text[self.cursor_position - count : self.cursor_position]
779 new_text = (
780 self.text[: self.cursor_position - count]
781 + self.text[self.cursor_position :]
782 )
783 new_cursor_position = self.cursor_position - len(deleted)
785 # Set new Document atomically.
786 self.document = Document(new_text, new_cursor_position)
788 return deleted
790 def delete(self, count: int = 1) -> str:
791 """
792 Delete specified number of characters and Return the deleted text.
793 """
794 if self.cursor_position < len(self.text):
795 deleted = self.document.text_after_cursor[:count]
796 self.text = (
797 self.text[: self.cursor_position]
798 + self.text[self.cursor_position + len(deleted) :]
799 )
800 return deleted
801 else:
802 return ""
804 def join_next_line(self, separator: str = " ") -> None:
805 """
806 Join the next line to the current one by deleting the line ending after
807 the current line.
808 """
809 if not self.document.on_last_line:
810 self.cursor_position += self.document.get_end_of_line_position()
811 self.delete()
813 # Remove spaces.
814 self.text = (
815 self.document.text_before_cursor
816 + separator
817 + self.document.text_after_cursor.lstrip(" ")
818 )
820 def join_selected_lines(self, separator: str = " ") -> None:
821 """
822 Join the selected lines.
823 """
824 assert self.selection_state
826 # Get lines.
827 from_, to = sorted(
828 [self.cursor_position, self.selection_state.original_cursor_position]
829 )
831 before = self.text[:from_]
832 lines = self.text[from_:to].splitlines()
833 after = self.text[to:]
835 # Replace leading spaces with just one space.
836 lines = [l.lstrip(" ") + separator for l in lines]
838 # Set new document.
839 self.document = Document(
840 text=before + "".join(lines) + after,
841 cursor_position=len(before + "".join(lines[:-1])) - 1,
842 )
844 def swap_characters_before_cursor(self) -> None:
845 """
846 Swap the last two characters before the cursor.
847 """
848 pos = self.cursor_position
850 if pos >= 2:
851 a = self.text[pos - 2]
852 b = self.text[pos - 1]
854 self.text = self.text[: pos - 2] + b + a + self.text[pos:]
856 def go_to_history(self, index: int) -> None:
857 """
858 Go to this item in the history.
859 """
860 if index < len(self._working_lines):
861 self.working_index = index
862 self.cursor_position = len(self.text)
864 def complete_next(self, count: int = 1, disable_wrap_around: bool = False) -> None:
865 """
866 Browse to the next completions.
867 (Does nothing if there are no completion.)
868 """
869 index: int | None
871 if self.complete_state:
872 completions_count = len(self.complete_state.completions)
874 if self.complete_state.complete_index is None:
875 index = 0
876 elif self.complete_state.complete_index == completions_count - 1:
877 index = None
879 if disable_wrap_around:
880 return
881 else:
882 index = min(
883 completions_count - 1, self.complete_state.complete_index + count
884 )
885 self.go_to_completion(index)
887 def complete_previous(
888 self, count: int = 1, disable_wrap_around: bool = False
889 ) -> None:
890 """
891 Browse to the previous completions.
892 (Does nothing if there are no completion.)
893 """
894 index: int | None
896 if self.complete_state:
897 if self.complete_state.complete_index == 0:
898 index = None
900 if disable_wrap_around:
901 return
902 elif self.complete_state.complete_index is None:
903 index = len(self.complete_state.completions) - 1
904 else:
905 index = max(0, self.complete_state.complete_index - count)
907 self.go_to_completion(index)
909 def cancel_completion(self) -> None:
910 """
911 Cancel completion, go back to the original text.
912 """
913 if self.complete_state:
914 self.go_to_completion(None)
915 self.complete_state = None
917 def _set_completions(self, completions: list[Completion]) -> CompletionState:
918 """
919 Start completions. (Generate list of completions and initialize.)
921 By default, no completion will be selected.
922 """
923 self.complete_state = CompletionState(
924 original_document=self.document, completions=completions
925 )
927 # Trigger event. This should eventually invalidate the layout.
928 self.on_completions_changed.fire()
930 return self.complete_state
932 def start_history_lines_completion(self) -> None:
933 """
934 Start a completion based on all the other lines in the document and the
935 history.
936 """
937 found_completions: set[str] = set()
938 completions = []
940 # For every line of the whole history, find matches with the current line.
941 current_line = self.document.current_line_before_cursor.lstrip()
943 for i, string in enumerate(self._working_lines):
944 for j, l in enumerate(string.split("\n")):
945 l = l.strip()
946 if l and l.startswith(current_line):
947 # When a new line has been found.
948 if l not in found_completions:
949 found_completions.add(l)
951 # Create completion.
952 if i == self.working_index:
953 display_meta = "Current, line %s" % (j + 1)
954 else:
955 display_meta = f"History {i + 1}, line {j + 1}"
957 completions.append(
958 Completion(
959 text=l,
960 start_position=-len(current_line),
961 display_meta=display_meta,
962 )
963 )
965 self._set_completions(completions=completions[::-1])
966 self.go_to_completion(0)
968 def go_to_completion(self, index: int | None) -> None:
969 """
970 Select a completion from the list of current completions.
971 """
972 assert self.complete_state
974 # Set new completion
975 state = self.complete_state
976 state.go_to_index(index)
978 # Set text/cursor position
979 new_text, new_cursor_position = state.new_text_and_position()
980 self.document = Document(new_text, new_cursor_position)
982 # (changing text/cursor position will unset complete_state.)
983 self.complete_state = state
985 def apply_completion(self, completion: Completion) -> None:
986 """
987 Insert a given completion.
988 """
989 # If there was already a completion active, cancel that one.
990 if self.complete_state:
991 self.go_to_completion(None)
992 self.complete_state = None
994 # Insert text from the given completion.
995 self.delete_before_cursor(-completion.start_position)
996 self.insert_text(completion.text)
998 def _set_history_search(self) -> None:
999 """
1000 Set `history_search_text`.
1001 (The text before the cursor will be used for filtering the history.)
1002 """
1003 if self.enable_history_search():
1004 if self.history_search_text is None:
1005 self.history_search_text = self.document.text_before_cursor
1006 else:
1007 self.history_search_text = None
1009 def _history_matches(self, i: int) -> bool:
1010 """
1011 True when the current entry matches the history search.
1012 (when we don't have history search, it's also True.)
1013 """
1014 return self.history_search_text is None or self._working_lines[i].startswith(
1015 self.history_search_text
1016 )
1018 def history_forward(self, count: int = 1) -> None:
1019 """
1020 Move forwards through the history.
1022 :param count: Amount of items to move forward.
1023 """
1024 self._set_history_search()
1026 # Go forward in history.
1027 found_something = False
1029 for i in range(self.working_index + 1, len(self._working_lines)):
1030 if self._history_matches(i):
1031 self.working_index = i
1032 count -= 1
1033 found_something = True
1034 if count == 0:
1035 break
1037 # If we found an entry, move cursor to the end of the first line.
1038 if found_something:
1039 self.cursor_position = 0
1040 self.cursor_position += self.document.get_end_of_line_position()
1042 def history_backward(self, count: int = 1) -> None:
1043 """
1044 Move backwards through history.
1045 """
1046 self._set_history_search()
1048 # Go back in history.
1049 found_something = False
1051 for i in range(self.working_index - 1, -1, -1):
1052 if self._history_matches(i):
1053 self.working_index = i
1054 count -= 1
1055 found_something = True
1056 if count == 0:
1057 break
1059 # If we move to another entry, move cursor to the end of the line.
1060 if found_something:
1061 self.cursor_position = len(self.text)
1063 def yank_nth_arg(self, n: int | None = None, _yank_last_arg: bool = False) -> None:
1064 """
1065 Pick nth word from previous history entry (depending on current
1066 `yank_nth_arg_state`) and insert it at current position. Rotate through
1067 history if called repeatedly. If no `n` has been given, take the first
1068 argument. (The second word.)
1070 :param n: (None or int), The index of the word from the previous line
1071 to take.
1072 """
1073 assert n is None or isinstance(n, int)
1074 history_strings = self.history.get_strings()
1076 if not len(history_strings):
1077 return
1079 # Make sure we have a `YankNthArgState`.
1080 if self.yank_nth_arg_state is None:
1081 state = YankNthArgState(n=-1 if _yank_last_arg else 1)
1082 else:
1083 state = self.yank_nth_arg_state
1085 if n is not None:
1086 state.n = n
1088 # Get new history position.
1089 new_pos = state.history_position - 1
1090 if -new_pos > len(history_strings):
1091 new_pos = -1
1093 # Take argument from line.
1094 line = history_strings[new_pos]
1096 words = [w.strip() for w in _QUOTED_WORDS_RE.split(line)]
1097 words = [w for w in words if w]
1098 try:
1099 word = words[state.n]
1100 except IndexError:
1101 word = ""
1103 # Insert new argument.
1104 if state.previous_inserted_word:
1105 self.delete_before_cursor(len(state.previous_inserted_word))
1106 self.insert_text(word)
1108 # Save state again for next completion. (Note that the 'insert'
1109 # operation from above clears `self.yank_nth_arg_state`.)
1110 state.previous_inserted_word = word
1111 state.history_position = new_pos
1112 self.yank_nth_arg_state = state
1114 def yank_last_arg(self, n: int | None = None) -> None:
1115 """
1116 Like `yank_nth_arg`, but if no argument has been given, yank the last
1117 word by default.
1118 """
1119 self.yank_nth_arg(n=n, _yank_last_arg=True)
1121 def start_selection(
1122 self, selection_type: SelectionType = SelectionType.CHARACTERS
1123 ) -> None:
1124 """
1125 Take the current cursor position as the start of this selection.
1126 """
1127 self.selection_state = SelectionState(self.cursor_position, selection_type)
1129 def copy_selection(self, _cut: bool = False) -> ClipboardData:
1130 """
1131 Copy selected text and return :class:`.ClipboardData` instance.
1133 Notice that this doesn't store the copied data on the clipboard yet.
1134 You can store it like this:
1136 .. code:: python
1138 data = buffer.copy_selection()
1139 get_app().clipboard.set_data(data)
1140 """
1141 new_document, clipboard_data = self.document.cut_selection()
1142 if _cut:
1143 self.document = new_document
1145 self.selection_state = None
1146 return clipboard_data
1148 def cut_selection(self) -> ClipboardData:
1149 """
1150 Delete selected text and return :class:`.ClipboardData` instance.
1151 """
1152 return self.copy_selection(_cut=True)
1154 def paste_clipboard_data(
1155 self,
1156 data: ClipboardData,
1157 paste_mode: PasteMode = PasteMode.EMACS,
1158 count: int = 1,
1159 ) -> None:
1160 """
1161 Insert the data from the clipboard.
1162 """
1163 assert isinstance(data, ClipboardData)
1164 assert paste_mode in (PasteMode.VI_BEFORE, PasteMode.VI_AFTER, PasteMode.EMACS)
1166 original_document = self.document
1167 self.document = self.document.paste_clipboard_data(
1168 data, paste_mode=paste_mode, count=count
1169 )
1171 # Remember original document. This assignment should come at the end,
1172 # because assigning to 'document' will erase it.
1173 self.document_before_paste = original_document
1175 def newline(self, copy_margin: bool = True) -> None:
1176 """
1177 Insert a line ending at the current position.
1178 """
1179 if copy_margin:
1180 self.insert_text("\n" + self.document.leading_whitespace_in_current_line)
1181 else:
1182 self.insert_text("\n")
1184 def insert_line_above(self, copy_margin: bool = True) -> None:
1185 """
1186 Insert a new line above the current one.
1187 """
1188 if copy_margin:
1189 insert = self.document.leading_whitespace_in_current_line + "\n"
1190 else:
1191 insert = "\n"
1193 self.cursor_position += self.document.get_start_of_line_position()
1194 self.insert_text(insert)
1195 self.cursor_position -= 1
1197 def insert_line_below(self, copy_margin: bool = True) -> None:
1198 """
1199 Insert a new line below the current one.
1200 """
1201 if copy_margin:
1202 insert = "\n" + self.document.leading_whitespace_in_current_line
1203 else:
1204 insert = "\n"
1206 self.cursor_position += self.document.get_end_of_line_position()
1207 self.insert_text(insert)
1209 def insert_text(
1210 self,
1211 data: str,
1212 overwrite: bool = False,
1213 move_cursor: bool = True,
1214 fire_event: bool = True,
1215 ) -> None:
1216 """
1217 Insert characters at cursor position.
1219 :param fire_event: Fire `on_text_insert` event. This is mainly used to
1220 trigger autocompletion while typing.
1221 """
1222 # Original text & cursor position.
1223 otext = self.text
1224 ocpos = self.cursor_position
1226 # In insert/text mode.
1227 if overwrite:
1228 # Don't overwrite the newline itself. Just before the line ending,
1229 # it should act like insert mode.
1230 overwritten_text = otext[ocpos : ocpos + len(data)]
1231 if "\n" in overwritten_text:
1232 overwritten_text = overwritten_text[: overwritten_text.find("\n")]
1234 text = otext[:ocpos] + data + otext[ocpos + len(overwritten_text) :]
1235 else:
1236 text = otext[:ocpos] + data + otext[ocpos:]
1238 if move_cursor:
1239 cpos = self.cursor_position + len(data)
1240 else:
1241 cpos = self.cursor_position
1243 # Set new document.
1244 # (Set text and cursor position at the same time. Otherwise, setting
1245 # the text will fire a change event before the cursor position has been
1246 # set. It works better to have this atomic.)
1247 self.document = Document(text, cpos)
1249 # Fire 'on_text_insert' event.
1250 if fire_event: # XXX: rename to `start_complete`.
1251 self.on_text_insert.fire()
1253 # Only complete when "complete_while_typing" is enabled.
1254 if self.completer and self.complete_while_typing():
1255 get_app().create_background_task(self._async_completer())
1257 # Call auto_suggest.
1258 if self.auto_suggest:
1259 get_app().create_background_task(self._async_suggester())
1261 def undo(self) -> None:
1262 # Pop from the undo-stack until we find a text that if different from
1263 # the current text. (The current logic of `save_to_undo_stack` will
1264 # cause that the top of the undo stack is usually the same as the
1265 # current text, so in that case we have to pop twice.)
1266 while self._undo_stack:
1267 text, pos = self._undo_stack.pop()
1269 if text != self.text:
1270 # Push current text to redo stack.
1271 self._redo_stack.append((self.text, self.cursor_position))
1273 # Set new text/cursor_position.
1274 self.document = Document(text, cursor_position=pos)
1275 break
1277 def redo(self) -> None:
1278 if self._redo_stack:
1279 # Copy current state on undo stack.
1280 self.save_to_undo_stack(clear_redo_stack=False)
1282 # Pop state from redo stack.
1283 text, pos = self._redo_stack.pop()
1284 self.document = Document(text, cursor_position=pos)
1286 def validate(self, set_cursor: bool = False) -> bool:
1287 """
1288 Returns `True` if valid.
1290 :param set_cursor: Set the cursor position, if an error was found.
1291 """
1292 # Don't call the validator again, if it was already called for the
1293 # current input.
1294 if self.validation_state != ValidationState.UNKNOWN:
1295 return self.validation_state == ValidationState.VALID
1297 # Call validator.
1298 if self.validator:
1299 try:
1300 self.validator.validate(self.document)
1301 except ValidationError as e:
1302 # Set cursor position (don't allow invalid values.)
1303 if set_cursor:
1304 self.cursor_position = min(
1305 max(0, e.cursor_position), len(self.text)
1306 )
1308 self.validation_state = ValidationState.INVALID
1309 self.validation_error = e
1310 return False
1312 # Handle validation result.
1313 self.validation_state = ValidationState.VALID
1314 self.validation_error = None
1315 return True
1317 async def _validate_async(self) -> None:
1318 """
1319 Asynchronous version of `validate()`.
1320 This one doesn't set the cursor position.
1322 We have both variants, because a synchronous version is required.
1323 Handling the ENTER key needs to be completely synchronous, otherwise
1324 stuff like type-ahead is going to give very weird results. (People
1325 could type input while the ENTER key is still processed.)
1327 An asynchronous version is required if we have `validate_while_typing`
1328 enabled.
1329 """
1330 while True:
1331 # Don't call the validator again, if it was already called for the
1332 # current input.
1333 if self.validation_state != ValidationState.UNKNOWN:
1334 return
1336 # Call validator.
1337 error = None
1338 document = self.document
1340 if self.validator:
1341 try:
1342 await self.validator.validate_async(self.document)
1343 except ValidationError as e:
1344 error = e
1346 # If the document changed during the validation, try again.
1347 if self.document != document:
1348 continue
1350 # Handle validation result.
1351 if error:
1352 self.validation_state = ValidationState.INVALID
1353 else:
1354 self.validation_state = ValidationState.VALID
1356 self.validation_error = error
1357 get_app().invalidate() # Trigger redraw (display error).
1359 def append_to_history(self) -> None:
1360 """
1361 Append the current input to the history.
1362 """
1363 # Save at the tail of the history. (But don't if the last entry the
1364 # history is already the same.)
1365 if self.text:
1366 history_strings = self.history.get_strings()
1367 if not len(history_strings) or history_strings[-1] != self.text:
1368 self.history.append_string(self.text)
1370 def _search(
1371 self,
1372 search_state: SearchState,
1373 include_current_position: bool = False,
1374 count: int = 1,
1375 ) -> tuple[int, int] | None:
1376 """
1377 Execute search. Return (working_index, cursor_position) tuple when this
1378 search is applied. Returns `None` when this text cannot be found.
1379 """
1380 assert count > 0
1382 text = search_state.text
1383 direction = search_state.direction
1384 ignore_case = search_state.ignore_case()
1386 def search_once(
1387 working_index: int, document: Document
1388 ) -> tuple[int, Document] | None:
1389 """
1390 Do search one time.
1391 Return (working_index, document) or `None`
1392 """
1393 if direction == SearchDirection.FORWARD:
1394 # Try find at the current input.
1395 new_index = document.find(
1396 text,
1397 include_current_position=include_current_position,
1398 ignore_case=ignore_case,
1399 )
1401 if new_index is not None:
1402 return (
1403 working_index,
1404 Document(document.text, document.cursor_position + new_index),
1405 )
1406 else:
1407 # No match, go forward in the history. (Include len+1 to wrap around.)
1408 # (Here we should always include all cursor positions, because
1409 # it's a different line.)
1410 for i in range(working_index + 1, len(self._working_lines) + 1):
1411 i %= len(self._working_lines)
1413 document = Document(self._working_lines[i], 0)
1414 new_index = document.find(
1415 text, include_current_position=True, ignore_case=ignore_case
1416 )
1417 if new_index is not None:
1418 return (i, Document(document.text, new_index))
1419 else:
1420 # Try find at the current input.
1421 new_index = document.find_backwards(text, ignore_case=ignore_case)
1423 if new_index is not None:
1424 return (
1425 working_index,
1426 Document(document.text, document.cursor_position + new_index),
1427 )
1428 else:
1429 # No match, go back in the history. (Include -1 to wrap around.)
1430 for i in range(working_index - 1, -2, -1):
1431 i %= len(self._working_lines)
1433 document = Document(
1434 self._working_lines[i], len(self._working_lines[i])
1435 )
1436 new_index = document.find_backwards(
1437 text, ignore_case=ignore_case
1438 )
1439 if new_index is not None:
1440 return (
1441 i,
1442 Document(document.text, len(document.text) + new_index),
1443 )
1444 return None
1446 # Do 'count' search iterations.
1447 working_index = self.working_index
1448 document = self.document
1449 for _ in range(count):
1450 result = search_once(working_index, document)
1451 if result is None:
1452 return None # Nothing found.
1453 else:
1454 working_index, document = result
1456 return (working_index, document.cursor_position)
1458 def document_for_search(self, search_state: SearchState) -> Document:
1459 """
1460 Return a :class:`~prompt_toolkit.document.Document` instance that has
1461 the text/cursor position for this search, if we would apply it. This
1462 will be used in the
1463 :class:`~prompt_toolkit.layout.BufferControl` to display feedback while
1464 searching.
1465 """
1466 search_result = self._search(search_state, include_current_position=True)
1468 if search_result is None:
1469 return self.document
1470 else:
1471 working_index, cursor_position = search_result
1473 # Keep selection, when `working_index` was not changed.
1474 if working_index == self.working_index:
1475 selection = self.selection_state
1476 else:
1477 selection = None
1479 return Document(
1480 self._working_lines[working_index], cursor_position, selection=selection
1481 )
1483 def get_search_position(
1484 self,
1485 search_state: SearchState,
1486 include_current_position: bool = True,
1487 count: int = 1,
1488 ) -> int:
1489 """
1490 Get the cursor position for this search.
1491 (This operation won't change the `working_index`. It's won't go through
1492 the history. Vi text objects can't span multiple items.)
1493 """
1494 search_result = self._search(
1495 search_state, include_current_position=include_current_position, count=count
1496 )
1498 if search_result is None:
1499 return self.cursor_position
1500 else:
1501 working_index, cursor_position = search_result
1502 return cursor_position
1504 def apply_search(
1505 self,
1506 search_state: SearchState,
1507 include_current_position: bool = True,
1508 count: int = 1,
1509 ) -> None:
1510 """
1511 Apply search. If something is found, set `working_index` and
1512 `cursor_position`.
1513 """
1514 search_result = self._search(
1515 search_state, include_current_position=include_current_position, count=count
1516 )
1518 if search_result is not None:
1519 working_index, cursor_position = search_result
1520 self.working_index = working_index
1521 self.cursor_position = cursor_position
1523 def exit_selection(self) -> None:
1524 self.selection_state = None
1526 def _editor_simple_tempfile(self) -> tuple[str, Callable[[], None]]:
1527 """
1528 Simple (file) tempfile implementation.
1529 Return (tempfile, cleanup_func).
1530 """
1531 suffix = to_str(self.tempfile_suffix)
1532 descriptor, filename = tempfile.mkstemp(suffix)
1534 os.write(descriptor, self.text.encode("utf-8"))
1535 os.close(descriptor)
1537 def cleanup() -> None:
1538 os.unlink(filename)
1540 return filename, cleanup
1542 def _editor_complex_tempfile(self) -> tuple[str, Callable[[], None]]:
1543 # Complex (directory) tempfile implementation.
1544 headtail = to_str(self.tempfile)
1545 if not headtail:
1546 # Revert to simple case.
1547 return self._editor_simple_tempfile()
1548 headtail = str(headtail)
1550 # Try to make according to tempfile logic.
1551 head, tail = os.path.split(headtail)
1552 if os.path.isabs(head):
1553 head = head[1:]
1555 dirpath = tempfile.mkdtemp()
1556 if head:
1557 dirpath = os.path.join(dirpath, head)
1558 # Assume there is no issue creating dirs in this temp dir.
1559 os.makedirs(dirpath)
1561 # Open the filename and write current text.
1562 filename = os.path.join(dirpath, tail)
1563 with open(filename, "w", encoding="utf-8") as fh:
1564 fh.write(self.text)
1566 def cleanup() -> None:
1567 shutil.rmtree(dirpath)
1569 return filename, cleanup
1571 def open_in_editor(self, validate_and_handle: bool = False) -> asyncio.Task[None]:
1572 """
1573 Open code in editor.
1575 This returns a future, and runs in a thread executor.
1576 """
1577 if self.read_only():
1578 raise EditReadOnlyBuffer()
1580 # Write current text to temporary file
1581 if self.tempfile:
1582 filename, cleanup_func = self._editor_complex_tempfile()
1583 else:
1584 filename, cleanup_func = self._editor_simple_tempfile()
1586 async def run() -> None:
1587 try:
1588 # Open in editor
1589 # (We need to use `run_in_terminal`, because not all editors go to
1590 # the alternate screen buffer, and some could influence the cursor
1591 # position.)
1592 success = await run_in_terminal(
1593 lambda: self._open_file_in_editor(filename), in_executor=True
1594 )
1596 # Read content again.
1597 if success:
1598 with open(filename, "rb") as f:
1599 text = f.read().decode("utf-8")
1601 # Drop trailing newline. (Editors are supposed to add it at the
1602 # end, but we don't need it.)
1603 if text.endswith("\n"):
1604 text = text[:-1]
1606 self.document = Document(text=text, cursor_position=len(text))
1608 # Accept the input.
1609 if validate_and_handle:
1610 self.validate_and_handle()
1612 finally:
1613 # Clean up temp dir/file.
1614 cleanup_func()
1616 return get_app().create_background_task(run())
1618 def _open_file_in_editor(self, filename: str) -> bool:
1619 """
1620 Call editor executable.
1622 Return True when we received a zero return code.
1623 """
1624 # If the 'VISUAL' or 'EDITOR' environment variable has been set, use that.
1625 # Otherwise, fall back to the first available editor that we can find.
1626 visual = os.environ.get("VISUAL")
1627 editor = os.environ.get("EDITOR")
1629 editors = [
1630 visual,
1631 editor,
1632 # Order of preference.
1633 "/usr/bin/editor",
1634 "/usr/bin/nano",
1635 "/usr/bin/pico",
1636 "/usr/bin/vi",
1637 "/usr/bin/emacs",
1638 ]
1640 for e in editors:
1641 if e:
1642 try:
1643 # Use 'shlex.split()', because $VISUAL can contain spaces
1644 # and quotes.
1645 returncode = subprocess.call(shlex.split(e) + [filename])
1646 return returncode == 0
1648 except OSError:
1649 # Executable does not exist, try the next one.
1650 pass
1652 return False
1654 def start_completion(
1655 self,
1656 select_first: bool = False,
1657 select_last: bool = False,
1658 insert_common_part: bool = False,
1659 complete_event: CompleteEvent | None = None,
1660 ) -> None:
1661 """
1662 Start asynchronous autocompletion of this buffer.
1663 (This will do nothing if a previous completion was still in progress.)
1664 """
1665 # Only one of these options can be selected.
1666 assert select_first + select_last + insert_common_part <= 1
1668 get_app().create_background_task(
1669 self._async_completer(
1670 select_first=select_first,
1671 select_last=select_last,
1672 insert_common_part=insert_common_part,
1673 complete_event=complete_event
1674 or CompleteEvent(completion_requested=True),
1675 )
1676 )
1678 def _create_completer_coroutine(self) -> Callable[..., Coroutine[Any, Any, None]]:
1679 """
1680 Create function for asynchronous autocompletion.
1682 (This consumes the asynchronous completer generator, which possibly
1683 runs the completion algorithm in another thread.)
1684 """
1686 def completion_does_nothing(document: Document, completion: Completion) -> bool:
1687 """
1688 Return `True` if applying this completion doesn't have any effect.
1689 (When it doesn't insert any new text.
1690 """
1691 text_before_cursor = document.text_before_cursor
1692 replaced_text = text_before_cursor[
1693 len(text_before_cursor) + completion.start_position :
1694 ]
1695 return replaced_text == completion.text
1697 @_only_one_at_a_time
1698 async def async_completer(
1699 select_first: bool = False,
1700 select_last: bool = False,
1701 insert_common_part: bool = False,
1702 complete_event: CompleteEvent | None = None,
1703 ) -> None:
1704 document = self.document
1705 complete_event = complete_event or CompleteEvent(text_inserted=True)
1707 # Don't complete when we already have completions.
1708 if self.complete_state or not self.completer:
1709 return
1711 # Create an empty CompletionState.
1712 complete_state = CompletionState(original_document=self.document)
1713 self.complete_state = complete_state
1715 def proceed() -> bool:
1716 """Keep retrieving completions. Input text has not yet changed
1717 while generating completions."""
1718 return self.complete_state == complete_state
1720 refresh_needed = asyncio.Event()
1722 async def refresh_while_loading() -> None:
1723 """Background loop to refresh the UI at most 3 times a second
1724 while the completion are loading. Calling
1725 `on_completions_changed.fire()` for every completion that we
1726 receive is too expensive when there are many completions. (We
1727 could tune `Application.max_render_postpone_time` and
1728 `Application.min_redraw_interval`, but having this here is a
1729 better approach.)
1730 """
1731 while True:
1732 self.on_completions_changed.fire()
1733 refresh_needed.clear()
1734 await asyncio.sleep(0.3)
1735 await refresh_needed.wait()
1737 refresh_task = asyncio.ensure_future(refresh_while_loading())
1738 try:
1739 # Load.
1740 async with aclosing(
1741 self.completer.get_completions_async(document, complete_event)
1742 ) as async_generator:
1743 async for completion in async_generator:
1744 complete_state.completions.append(completion)
1745 refresh_needed.set()
1747 # If the input text changes, abort.
1748 if not proceed():
1749 break
1750 finally:
1751 refresh_task.cancel()
1753 # Refresh one final time after we got everything.
1754 self.on_completions_changed.fire()
1756 completions = complete_state.completions
1758 # When there is only one completion, which has nothing to add, ignore it.
1759 if len(completions) == 1 and completion_does_nothing(
1760 document, completions[0]
1761 ):
1762 del completions[:]
1764 # Set completions if the text was not yet changed.
1765 if proceed():
1766 # When no completions were found, or when the user selected
1767 # already a completion by using the arrow keys, don't do anything.
1768 if (
1769 not self.complete_state
1770 or self.complete_state.complete_index is not None
1771 ):
1772 return
1774 # When there are no completions, reset completion state anyway.
1775 if not completions:
1776 self.complete_state = None
1777 # Render the ui if the completion menu was shown
1778 # it is needed especially if there is one completion and it was deleted.
1779 self.on_completions_changed.fire()
1780 return
1782 # Select first/last or insert common part, depending on the key
1783 # binding. (For this we have to wait until all completions are
1784 # loaded.)
1786 if select_first:
1787 self.go_to_completion(0)
1789 elif select_last:
1790 self.go_to_completion(len(completions) - 1)
1792 elif insert_common_part:
1793 common_part = get_common_complete_suffix(document, completions)
1794 if common_part:
1795 # Insert the common part, update completions.
1796 self.insert_text(common_part)
1797 if len(completions) > 1:
1798 # (Don't call `async_completer` again, but
1799 # recalculate completions. See:
1800 # https://github.com/ipython/ipython/issues/9658)
1801 completions[:] = [
1802 c.new_completion_from_position(len(common_part))
1803 for c in completions
1804 ]
1806 self._set_completions(completions=completions)
1807 else:
1808 self.complete_state = None
1809 else:
1810 # When we were asked to insert the "common"
1811 # prefix, but there was no common suffix but
1812 # still exactly one match, then select the
1813 # first. (It could be that we have a completion
1814 # which does * expansion, like '*.py', with
1815 # exactly one match.)
1816 if len(completions) == 1:
1817 self.go_to_completion(0)
1819 else:
1820 # If the last operation was an insert, (not a delete), restart
1821 # the completion coroutine.
1823 if self.document.text_before_cursor == document.text_before_cursor:
1824 return # Nothing changed.
1826 if self.document.text_before_cursor.startswith(
1827 document.text_before_cursor
1828 ):
1829 raise _Retry
1831 return async_completer
1833 def _create_auto_suggest_coroutine(self) -> Callable[[], Coroutine[Any, Any, None]]:
1834 """
1835 Create function for asynchronous auto suggestion.
1836 (This can be in another thread.)
1837 """
1839 @_only_one_at_a_time
1840 async def async_suggestor() -> None:
1841 document = self.document
1843 # Don't suggest when we already have a suggestion.
1844 if self.suggestion or not self.auto_suggest:
1845 return
1847 suggestion = await self.auto_suggest.get_suggestion_async(self, document)
1849 # Set suggestion only if the text was not yet changed.
1850 if self.document == document:
1851 # Set suggestion and redraw interface.
1852 self.suggestion = suggestion
1853 self.on_suggestion_set.fire()
1854 else:
1855 # Otherwise, restart thread.
1856 raise _Retry
1858 return async_suggestor
1860 def _create_auto_validate_coroutine(
1861 self,
1862 ) -> Callable[[], Coroutine[Any, Any, None]]:
1863 """
1864 Create a function for asynchronous validation while typing.
1865 (This can be in another thread.)
1866 """
1868 @_only_one_at_a_time
1869 async def async_validator() -> None:
1870 await self._validate_async()
1872 return async_validator
1874 def validate_and_handle(self) -> None:
1875 """
1876 Validate buffer and handle the accept action.
1877 """
1878 valid = self.validate(set_cursor=True)
1880 # When the validation succeeded, accept the input.
1881 if valid:
1882 if self.accept_handler:
1883 keep_text = self.accept_handler(self)
1884 else:
1885 keep_text = False
1887 self.append_to_history()
1889 if not keep_text:
1890 self.reset()
1893_T = TypeVar("_T", bound=Callable[..., Awaitable[None]])
1896def _only_one_at_a_time(coroutine: _T) -> _T:
1897 """
1898 Decorator that only starts the coroutine only if the previous call has
1899 finished. (Used to make sure that we have only one autocompleter, auto
1900 suggestor and validator running at a time.)
1902 When the coroutine raises `_Retry`, it is restarted.
1903 """
1904 running = False
1906 @wraps(coroutine)
1907 async def new_coroutine(*a: Any, **kw: Any) -> Any:
1908 nonlocal running
1910 # Don't start a new function, if the previous is still in progress.
1911 if running:
1912 return
1914 running = True
1916 try:
1917 while True:
1918 try:
1919 await coroutine(*a, **kw)
1920 except _Retry:
1921 continue
1922 else:
1923 return None
1924 finally:
1925 running = False
1927 return cast(_T, new_coroutine)
1930class _Retry(Exception):
1931 "Retry in `_only_one_at_a_time`."
1934def indent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None:
1935 """
1936 Indent text of a :class:`.Buffer` object.
1937 """
1938 current_row = buffer.document.cursor_position_row
1939 line_range = range(from_row, to_row)
1941 # Apply transformation.
1942 new_text = buffer.transform_lines(line_range, lambda l: " " * count + l)
1943 buffer.document = Document(
1944 new_text, Document(new_text).translate_row_col_to_index(current_row, 0)
1945 )
1947 # Go to the start of the line.
1948 buffer.cursor_position += buffer.document.get_start_of_line_position(
1949 after_whitespace=True
1950 )
1953def unindent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None:
1954 """
1955 Unindent text of a :class:`.Buffer` object.
1956 """
1957 current_row = buffer.document.cursor_position_row
1958 line_range = range(from_row, to_row)
1960 def transform(text: str) -> str:
1961 remove = " " * count
1962 if text.startswith(remove):
1963 return text[len(remove) :]
1964 else:
1965 return text.lstrip()
1967 # Apply transformation.
1968 new_text = buffer.transform_lines(line_range, transform)
1969 buffer.document = Document(
1970 new_text, Document(new_text).translate_row_col_to_index(current_row, 0)
1971 )
1973 # Go to the start of the line.
1974 buffer.cursor_position += buffer.document.get_start_of_line_position(
1975 after_whitespace=True
1976 )
1979def reshape_text(buffer: Buffer, from_row: int, to_row: int) -> None:
1980 """
1981 Reformat text, taking the width into account.
1982 `to_row` is included.
1983 (Vi 'gq' operator.)
1984 """
1985 lines = buffer.text.splitlines(True)
1986 lines_before = lines[:from_row]
1987 lines_after = lines[to_row + 1 :]
1988 lines_to_reformat = lines[from_row : to_row + 1]
1990 if lines_to_reformat:
1991 # Take indentation from the first line.
1992 match = re.search(r"^\s*", lines_to_reformat[0])
1993 length = match.end() if match else 0 # `match` can't be None, actually.
1995 indent = lines_to_reformat[0][:length].replace("\n", "")
1997 # Now, take all the 'words' from the lines to be reshaped.
1998 words = "".join(lines_to_reformat).split()
2000 # And reshape.
2001 width = (buffer.text_width or 80) - len(indent)
2002 reshaped_text = [indent]
2003 current_width = 0
2004 for w in words:
2005 if current_width:
2006 if len(w) + current_width + 1 > width:
2007 reshaped_text.append("\n")
2008 reshaped_text.append(indent)
2009 current_width = 0
2010 else:
2011 reshaped_text.append(" ")
2012 current_width += 1
2014 reshaped_text.append(w)
2015 current_width += len(w)
2017 if reshaped_text[-1] != "\n":
2018 reshaped_text.append("\n")
2020 # Apply result.
2021 buffer.document = Document(
2022 text="".join(lines_before + reshaped_text + lines_after),
2023 cursor_position=len("".join(lines_before + reshaped_text)),
2024 )