Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/buffer.py: 17%
811 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1"""
2Data structures for the Buffer.
3It holds the text, cursor position, history, etc...
4"""
5from __future__ import annotations
7import asyncio
8import logging
9import os
10import re
11import shlex
12import shutil
13import subprocess
14import tempfile
15from collections import deque
16from enum import Enum
17from functools import wraps
18from typing import Any, Callable, Coroutine, Iterable, TypeVar, cast
20from .application.current import get_app
21from .application.run_in_terminal import run_in_terminal
22from .auto_suggest import AutoSuggest, Suggestion
23from .cache import FastDictCache
24from .clipboard import ClipboardData
25from .completion import (
26 CompleteEvent,
27 Completer,
28 Completion,
29 DummyCompleter,
30 get_common_complete_suffix,
31)
32from .document import Document
33from .eventloop import aclosing
34from .filters import FilterOrBool, to_filter
35from .history import History, InMemoryHistory
36from .search import SearchDirection, SearchState
37from .selection import PasteMode, SelectionState, SelectionType
38from .utils import Event, to_str
39from .validation import ValidationError, Validator
41__all__ = [
42 "EditReadOnlyBuffer",
43 "Buffer",
44 "CompletionState",
45 "indent",
46 "unindent",
47 "reshape_text",
48]
50logger = logging.getLogger(__name__)
53class EditReadOnlyBuffer(Exception):
54 "Attempt editing of read-only :class:`.Buffer`."
57class ValidationState(Enum):
58 "The validation state of a buffer. This is set after the validation."
60 VALID = "VALID"
61 INVALID = "INVALID"
62 UNKNOWN = "UNKNOWN"
65class CompletionState:
66 """
67 Immutable class that contains a completion state.
68 """
70 def __init__(
71 self,
72 original_document: Document,
73 completions: list[Completion] | None = None,
74 complete_index: int | None = None,
75 ) -> None:
76 #: Document as it was when the completion started.
77 self.original_document = original_document
79 #: List of all the current Completion instances which are possible at
80 #: this point.
81 self.completions = completions or []
83 #: Position in the `completions` array.
84 #: This can be `None` to indicate "no completion", the original text.
85 self.complete_index = complete_index # Position in the `_completions` array.
87 def __repr__(self) -> str:
88 return "{}({!r}, <{!r}> completions, index={!r})".format(
89 self.__class__.__name__,
90 self.original_document,
91 len(self.completions),
92 self.complete_index,
93 )
95 def go_to_index(self, index: int | None) -> None:
96 """
97 Create a new :class:`.CompletionState` object with the new index.
99 When `index` is `None` deselect the completion.
100 """
101 if self.completions:
102 assert index is None or 0 <= index < len(self.completions)
103 self.complete_index = index
105 def new_text_and_position(self) -> tuple[str, int]:
106 """
107 Return (new_text, new_cursor_position) for this completion.
108 """
109 if self.complete_index is None:
110 return self.original_document.text, self.original_document.cursor_position
111 else:
112 original_text_before_cursor = self.original_document.text_before_cursor
113 original_text_after_cursor = self.original_document.text_after_cursor
115 c = self.completions[self.complete_index]
116 if c.start_position == 0:
117 before = original_text_before_cursor
118 else:
119 before = original_text_before_cursor[: c.start_position]
121 new_text = before + c.text + original_text_after_cursor
122 new_cursor_position = len(before) + len(c.text)
123 return new_text, new_cursor_position
125 @property
126 def current_completion(self) -> Completion | None:
127 """
128 Return the current completion, or return `None` when no completion is
129 selected.
130 """
131 if self.complete_index is not None:
132 return self.completions[self.complete_index]
133 return None
136_QUOTED_WORDS_RE = re.compile(r"""(\s+|".*?"|'.*?')""")
139class YankNthArgState:
140 """
141 For yank-last-arg/yank-nth-arg: Keep track of where we are in the history.
142 """
144 def __init__(
145 self, history_position: int = 0, n: int = -1, previous_inserted_word: str = ""
146 ) -> None:
147 self.history_position = history_position
148 self.previous_inserted_word = previous_inserted_word
149 self.n = n
151 def __repr__(self) -> str:
152 return "{}(history_position={!r}, n={!r}, previous_inserted_word={!r})".format(
153 self.__class__.__name__,
154 self.history_position,
155 self.n,
156 self.previous_inserted_word,
157 )
160BufferEventHandler = Callable[["Buffer"], None]
161BufferAcceptHandler = Callable[["Buffer"], bool]
164class Buffer:
165 """
166 The core data structure that holds the text and cursor position of the
167 current input line and implements all text manipulations on top of it. It
168 also implements the history, undo stack and the completion state.
170 :param completer: :class:`~prompt_toolkit.completion.Completer` instance.
171 :param history: :class:`~prompt_toolkit.history.History` instance.
172 :param tempfile_suffix: The tempfile suffix (extension) to be used for the
173 "open in editor" function. For a Python REPL, this would be ".py", so
174 that the editor knows the syntax highlighting to use. This can also be
175 a callable that returns a string.
176 :param tempfile: For more advanced tempfile situations where you need
177 control over the subdirectories and filename. For a Git Commit Message,
178 this would be ".git/COMMIT_EDITMSG", so that the editor knows the syntax
179 highlighting to use. This can also be a callable that returns a string.
180 :param name: Name for this buffer. E.g. DEFAULT_BUFFER. This is mostly
181 useful for key bindings where we sometimes prefer to refer to a buffer
182 by their name instead of by reference.
183 :param accept_handler: Called when the buffer input is accepted. (Usually
184 when the user presses `enter`.) The accept handler receives this
185 `Buffer` as input and should return True when the buffer text should be
186 kept instead of calling reset.
188 In case of a `PromptSession` for instance, we want to keep the text,
189 because we will exit the application, and only reset it during the next
190 run.
192 Events:
194 :param on_text_changed: When the buffer text changes. (Callable or None.)
195 :param on_text_insert: When new text is inserted. (Callable or None.)
196 :param on_cursor_position_changed: When the cursor moves. (Callable or None.)
197 :param on_completions_changed: When the completions were changed. (Callable or None.)
198 :param on_suggestion_set: When an auto-suggestion text has been set. (Callable or None.)
200 Filters:
202 :param complete_while_typing: :class:`~prompt_toolkit.filters.Filter`
203 or `bool`. Decide whether or not to do asynchronous autocompleting while
204 typing.
205 :param validate_while_typing: :class:`~prompt_toolkit.filters.Filter`
206 or `bool`. Decide whether or not to do asynchronous validation while
207 typing.
208 :param enable_history_search: :class:`~prompt_toolkit.filters.Filter` or
209 `bool` to indicate when up-arrow partial string matching is enabled. It
210 is advised to not enable this at the same time as
211 `complete_while_typing`, because when there is an autocompletion found,
212 the up arrows usually browse through the completions, rather than
213 through the history.
214 :param read_only: :class:`~prompt_toolkit.filters.Filter`. When True,
215 changes will not be allowed.
216 :param multiline: :class:`~prompt_toolkit.filters.Filter` or `bool`. When
217 not set, pressing `Enter` will call the `accept_handler`. Otherwise,
218 pressing `Esc-Enter` is required.
219 """
221 def __init__(
222 self,
223 completer: Completer | None = None,
224 auto_suggest: AutoSuggest | None = None,
225 history: History | None = None,
226 validator: Validator | None = None,
227 tempfile_suffix: str | Callable[[], str] = "",
228 tempfile: str | Callable[[], str] = "",
229 name: str = "",
230 complete_while_typing: FilterOrBool = False,
231 validate_while_typing: FilterOrBool = False,
232 enable_history_search: FilterOrBool = False,
233 document: Document | None = None,
234 accept_handler: BufferAcceptHandler | None = None,
235 read_only: FilterOrBool = False,
236 multiline: FilterOrBool = True,
237 on_text_changed: BufferEventHandler | None = None,
238 on_text_insert: BufferEventHandler | None = None,
239 on_cursor_position_changed: BufferEventHandler | None = None,
240 on_completions_changed: BufferEventHandler | None = None,
241 on_suggestion_set: BufferEventHandler | None = None,
242 ):
243 # Accept both filters and booleans as input.
244 enable_history_search = to_filter(enable_history_search)
245 complete_while_typing = to_filter(complete_while_typing)
246 validate_while_typing = to_filter(validate_while_typing)
247 read_only = to_filter(read_only)
248 multiline = to_filter(multiline)
250 self.completer = completer or DummyCompleter()
251 self.auto_suggest = auto_suggest
252 self.validator = validator
253 self.tempfile_suffix = tempfile_suffix
254 self.tempfile = tempfile
255 self.name = name
256 self.accept_handler = accept_handler
258 # Filters. (Usually, used by the key bindings to drive the buffer.)
259 self.complete_while_typing = complete_while_typing
260 self.validate_while_typing = validate_while_typing
261 self.enable_history_search = enable_history_search
262 self.read_only = read_only
263 self.multiline = multiline
265 # Text width. (For wrapping, used by the Vi 'gq' operator.)
266 self.text_width = 0
268 #: The command buffer history.
269 # Note that we shouldn't use a lazy 'or' here. bool(history) could be
270 # False when empty.
271 self.history = InMemoryHistory() if history is None else history
273 self.__cursor_position = 0
275 # Events
276 self.on_text_changed: Event[Buffer] = Event(self, on_text_changed)
277 self.on_text_insert: Event[Buffer] = Event(self, on_text_insert)
278 self.on_cursor_position_changed: Event[Buffer] = Event(
279 self, on_cursor_position_changed
280 )
281 self.on_completions_changed: Event[Buffer] = Event(self, on_completions_changed)
282 self.on_suggestion_set: Event[Buffer] = Event(self, on_suggestion_set)
284 # Document cache. (Avoid creating new Document instances.)
285 self._document_cache: FastDictCache[
286 tuple[str, int, SelectionState | None], Document
287 ] = FastDictCache(Document, size=10)
289 # Create completer / auto suggestion / validation coroutines.
290 self._async_suggester = self._create_auto_suggest_coroutine()
291 self._async_completer = self._create_completer_coroutine()
292 self._async_validator = self._create_auto_validate_coroutine()
294 # Asyncio task for populating the history.
295 self._load_history_task: asyncio.Future[None] | None = None
297 # Reset other attributes.
298 self.reset(document=document)
300 def __repr__(self) -> str:
301 if len(self.text) < 15:
302 text = self.text
303 else:
304 text = self.text[:12] + "..."
306 return f"<Buffer(name={self.name!r}, text={text!r}) at {id(self)!r}>"
308 def reset(
309 self, document: Document | None = None, append_to_history: bool = False
310 ) -> None:
311 """
312 :param append_to_history: Append current input to history first.
313 """
314 if append_to_history:
315 self.append_to_history()
317 document = document or Document()
319 self.__cursor_position = document.cursor_position
321 # `ValidationError` instance. (Will be set when the input is wrong.)
322 self.validation_error: ValidationError | None = None
323 self.validation_state: ValidationState | None = ValidationState.UNKNOWN
325 # State of the selection.
326 self.selection_state: SelectionState | None = None
328 # Multiple cursor mode. (When we press 'I' or 'A' in visual-block mode,
329 # we can insert text on multiple lines at once. This is implemented by
330 # using multiple cursors.)
331 self.multiple_cursor_positions: list[int] = []
333 # When doing consecutive up/down movements, prefer to stay at this column.
334 self.preferred_column: int | None = None
336 # State of complete browser
337 # For interactive completion through Ctrl-N/Ctrl-P.
338 self.complete_state: CompletionState | None = None
340 # State of Emacs yank-nth-arg completion.
341 self.yank_nth_arg_state: YankNthArgState | None = None # for yank-nth-arg.
343 # Remember the document that we had *right before* the last paste
344 # operation. This is used for rotating through the kill ring.
345 self.document_before_paste: Document | None = None
347 # Current suggestion.
348 self.suggestion: Suggestion | None = None
350 # The history search text. (Used for filtering the history when we
351 # browse through it.)
352 self.history_search_text: str | None = None
354 # Undo/redo stacks (stack of `(text, cursor_position)`).
355 self._undo_stack: list[tuple[str, int]] = []
356 self._redo_stack: list[tuple[str, int]] = []
358 # Cancel history loader. If history loading was still ongoing.
359 # Cancel the `_load_history_task`, so that next repaint of the
360 # `BufferControl` we will repopulate it.
361 if self._load_history_task is not None:
362 self._load_history_task.cancel()
363 self._load_history_task = None
365 #: The working lines. Similar to history, except that this can be
366 #: modified. The user can press arrow_up and edit previous entries.
367 #: Ctrl-C should reset this, and copy the whole history back in here.
368 #: Enter should process the current command and append to the real
369 #: history.
370 self._working_lines: deque[str] = deque([document.text])
371 self.__working_index = 0
373 def load_history_if_not_yet_loaded(self) -> None:
374 """
375 Create task for populating the buffer history (if not yet done).
377 Note::
379 This needs to be called from within the event loop of the
380 application, because history loading is async, and we need to be
381 sure the right event loop is active. Therefor, we call this method
382 in the `BufferControl.create_content`.
384 There are situations where prompt_toolkit applications are created
385 in one thread, but will later run in a different thread (Ptpython
386 is one example. The REPL runs in a separate thread, in order to
387 prevent interfering with a potential different event loop in the
388 main thread. The REPL UI however is still created in the main
389 thread.) We could decide to not support creating prompt_toolkit
390 objects in one thread and running the application in a different
391 thread, but history loading is the only place where it matters, and
392 this solves it.
393 """
394 if self._load_history_task is None:
396 async def load_history() -> None:
397 async for item in self.history.load():
398 self._working_lines.appendleft(item)
399 self.__working_index += 1
401 self._load_history_task = get_app().create_background_task(load_history())
403 def load_history_done(f: asyncio.Future[None]) -> None:
404 """
405 Handle `load_history` result when either done, cancelled, or
406 when an exception was raised.
407 """
408 try:
409 f.result()
410 except asyncio.CancelledError:
411 # Ignore cancellation. But handle it, so that we don't get
412 # this traceback.
413 pass
414 except GeneratorExit:
415 # Probably not needed, but we had situations where
416 # `GeneratorExit` was raised in `load_history` during
417 # cancellation.
418 pass
419 except BaseException:
420 # Log error if something goes wrong. (We don't have a
421 # caller to which we can propagate this exception.)
422 logger.exception("Loading history failed")
424 self._load_history_task.add_done_callback(load_history_done)
426 # <getters/setters>
428 def _set_text(self, value: str) -> bool:
429 """set text at current working_index. Return whether it changed."""
430 working_index = self.working_index
431 working_lines = self._working_lines
433 original_value = working_lines[working_index]
434 working_lines[working_index] = value
436 # Return True when this text has been changed.
437 if len(value) != len(original_value):
438 # For Python 2, it seems that when two strings have a different
439 # length and one is a prefix of the other, Python still scans
440 # character by character to see whether the strings are different.
441 # (Some benchmarking showed significant differences for big
442 # documents. >100,000 of lines.)
443 return True
444 elif value != original_value:
445 return True
446 return False
448 def _set_cursor_position(self, value: int) -> bool:
449 """Set cursor position. Return whether it changed."""
450 original_position = self.__cursor_position
451 self.__cursor_position = max(0, value)
453 return self.__cursor_position != original_position
455 @property
456 def text(self) -> str:
457 return self._working_lines[self.working_index]
459 @text.setter
460 def text(self, value: str) -> None:
461 """
462 Setting text. (When doing this, make sure that the cursor_position is
463 valid for this text. text/cursor_position should be consistent at any time,
464 otherwise set a Document instead.)
465 """
466 # Ensure cursor position remains within the size of the text.
467 if self.cursor_position > len(value):
468 self.cursor_position = len(value)
470 # Don't allow editing of read-only buffers.
471 if self.read_only():
472 raise EditReadOnlyBuffer()
474 changed = self._set_text(value)
476 if changed:
477 self._text_changed()
479 # Reset history search text.
480 # (Note that this doesn't need to happen when working_index
481 # changes, which is when we traverse the history. That's why we
482 # don't do this in `self._text_changed`.)
483 self.history_search_text = None
485 @property
486 def cursor_position(self) -> int:
487 return self.__cursor_position
489 @cursor_position.setter
490 def cursor_position(self, value: int) -> None:
491 """
492 Setting cursor position.
493 """
494 assert isinstance(value, int)
496 # Ensure cursor position is within the size of the text.
497 if value > len(self.text):
498 value = len(self.text)
499 if value < 0:
500 value = 0
502 changed = self._set_cursor_position(value)
504 if changed:
505 self._cursor_position_changed()
507 @property
508 def working_index(self) -> int:
509 return self.__working_index
511 @working_index.setter
512 def working_index(self, value: int) -> None:
513 if self.__working_index != value:
514 self.__working_index = value
515 # Make sure to reset the cursor position, otherwise we end up in
516 # situations where the cursor position is out of the bounds of the
517 # text.
518 self.cursor_position = 0
519 self._text_changed()
521 def _text_changed(self) -> None:
522 # Remove any validation errors and complete state.
523 self.validation_error = None
524 self.validation_state = ValidationState.UNKNOWN
525 self.complete_state = None
526 self.yank_nth_arg_state = None
527 self.document_before_paste = None
528 self.selection_state = None
529 self.suggestion = None
530 self.preferred_column = None
532 # fire 'on_text_changed' event.
533 self.on_text_changed.fire()
535 # Input validation.
536 # (This happens on all change events, unlike auto completion, also when
537 # deleting text.)
538 if self.validator and self.validate_while_typing():
539 get_app().create_background_task(self._async_validator())
541 def _cursor_position_changed(self) -> None:
542 # Remove any complete state.
543 # (Input validation should only be undone when the cursor position
544 # changes.)
545 self.complete_state = None
546 self.yank_nth_arg_state = None
547 self.document_before_paste = None
549 # Unset preferred_column. (Will be set after the cursor movement, if
550 # required.)
551 self.preferred_column = None
553 # Note that the cursor position can change if we have a selection the
554 # new position of the cursor determines the end of the selection.
556 # fire 'on_cursor_position_changed' event.
557 self.on_cursor_position_changed.fire()
559 @property
560 def document(self) -> Document:
561 """
562 Return :class:`~prompt_toolkit.document.Document` instance from the
563 current text, cursor position and selection state.
564 """
565 return self._document_cache[
566 self.text, self.cursor_position, self.selection_state
567 ]
569 @document.setter
570 def document(self, value: Document) -> None:
571 """
572 Set :class:`~prompt_toolkit.document.Document` instance.
574 This will set both the text and cursor position at the same time, but
575 atomically. (Change events will be triggered only after both have been set.)
576 """
577 self.set_document(value)
579 def set_document(self, value: Document, bypass_readonly: bool = False) -> None:
580 """
581 Set :class:`~prompt_toolkit.document.Document` instance. Like the
582 ``document`` property, but accept an ``bypass_readonly`` argument.
584 :param bypass_readonly: When True, don't raise an
585 :class:`.EditReadOnlyBuffer` exception, even
586 when the buffer is read-only.
588 .. warning::
590 When this buffer is read-only and `bypass_readonly` was not passed,
591 the `EditReadOnlyBuffer` exception will be caught by the
592 `KeyProcessor` and is silently suppressed. This is important to
593 keep in mind when writing key bindings, because it won't do what
594 you expect, and there won't be a stack trace. Use try/finally
595 around this function if you need some cleanup code.
596 """
597 # Don't allow editing of read-only buffers.
598 if not bypass_readonly and self.read_only():
599 raise EditReadOnlyBuffer()
601 # Set text and cursor position first.
602 text_changed = self._set_text(value.text)
603 cursor_position_changed = self._set_cursor_position(value.cursor_position)
605 # Now handle change events. (We do this when text/cursor position is
606 # both set and consistent.)
607 if text_changed:
608 self._text_changed()
609 self.history_search_text = None
611 if cursor_position_changed:
612 self._cursor_position_changed()
614 @property
615 def is_returnable(self) -> bool:
616 """
617 True when there is something handling accept.
618 """
619 return bool(self.accept_handler)
621 # End of <getters/setters>
623 def save_to_undo_stack(self, clear_redo_stack: bool = True) -> None:
624 """
625 Safe current state (input text and cursor position), so that we can
626 restore it by calling undo.
627 """
628 # Safe if the text is different from the text at the top of the stack
629 # is different. If the text is the same, just update the cursor position.
630 if self._undo_stack and self._undo_stack[-1][0] == self.text:
631 self._undo_stack[-1] = (self._undo_stack[-1][0], self.cursor_position)
632 else:
633 self._undo_stack.append((self.text, self.cursor_position))
635 # Saving anything to the undo stack, clears the redo stack.
636 if clear_redo_stack:
637 self._redo_stack = []
639 def transform_lines(
640 self,
641 line_index_iterator: Iterable[int],
642 transform_callback: Callable[[str], str],
643 ) -> str:
644 """
645 Transforms the text on a range of lines.
646 When the iterator yield an index not in the range of lines that the
647 document contains, it skips them silently.
649 To uppercase some lines::
651 new_text = transform_lines(range(5,10), lambda text: text.upper())
653 :param line_index_iterator: Iterator of line numbers (int)
654 :param transform_callback: callable that takes the original text of a
655 line, and return the new text for this line.
657 :returns: The new text.
658 """
659 # Split lines
660 lines = self.text.split("\n")
662 # Apply transformation
663 for index in line_index_iterator:
664 try:
665 lines[index] = transform_callback(lines[index])
666 except IndexError:
667 pass
669 return "\n".join(lines)
671 def transform_current_line(self, transform_callback: Callable[[str], str]) -> None:
672 """
673 Apply the given transformation function to the current line.
675 :param transform_callback: callable that takes a string and return a new string.
676 """
677 document = self.document
678 a = document.cursor_position + document.get_start_of_line_position()
679 b = document.cursor_position + document.get_end_of_line_position()
680 self.text = (
681 document.text[:a]
682 + transform_callback(document.text[a:b])
683 + document.text[b:]
684 )
686 def transform_region(
687 self, from_: int, to: int, transform_callback: Callable[[str], str]
688 ) -> None:
689 """
690 Transform a part of the input string.
692 :param from_: (int) start position.
693 :param to: (int) end position.
694 :param transform_callback: Callable which accepts a string and returns
695 the transformed string.
696 """
697 assert from_ < to
699 self.text = "".join(
700 [
701 self.text[:from_]
702 + transform_callback(self.text[from_:to])
703 + self.text[to:]
704 ]
705 )
707 def cursor_left(self, count: int = 1) -> None:
708 self.cursor_position += self.document.get_cursor_left_position(count=count)
710 def cursor_right(self, count: int = 1) -> None:
711 self.cursor_position += self.document.get_cursor_right_position(count=count)
713 def cursor_up(self, count: int = 1) -> None:
714 """(for multiline edit). Move cursor to the previous line."""
715 original_column = self.preferred_column or self.document.cursor_position_col
716 self.cursor_position += self.document.get_cursor_up_position(
717 count=count, preferred_column=original_column
718 )
720 # Remember the original column for the next up/down movement.
721 self.preferred_column = original_column
723 def cursor_down(self, count: int = 1) -> None:
724 """(for multiline edit). Move cursor to the next line."""
725 original_column = self.preferred_column or self.document.cursor_position_col
726 self.cursor_position += self.document.get_cursor_down_position(
727 count=count, preferred_column=original_column
728 )
730 # Remember the original column for the next up/down movement.
731 self.preferred_column = original_column
733 def auto_up(
734 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False
735 ) -> None:
736 """
737 If we're not on the first line (of a multiline input) go a line up,
738 otherwise go back in history. (If nothing is selected.)
739 """
740 if self.complete_state:
741 self.complete_previous(count=count)
742 elif self.document.cursor_position_row > 0:
743 self.cursor_up(count=count)
744 elif not self.selection_state:
745 self.history_backward(count=count)
747 # Go to the start of the line?
748 if go_to_start_of_line_if_history_changes:
749 self.cursor_position += self.document.get_start_of_line_position()
751 def auto_down(
752 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False
753 ) -> None:
754 """
755 If we're not on the last line (of a multiline input) go a line down,
756 otherwise go forward in history. (If nothing is selected.)
757 """
758 if self.complete_state:
759 self.complete_next(count=count)
760 elif self.document.cursor_position_row < self.document.line_count - 1:
761 self.cursor_down(count=count)
762 elif not self.selection_state:
763 self.history_forward(count=count)
765 # Go to the start of the line?
766 if go_to_start_of_line_if_history_changes:
767 self.cursor_position += self.document.get_start_of_line_position()
769 def delete_before_cursor(self, count: int = 1) -> str:
770 """
771 Delete specified number of characters before cursor and return the
772 deleted text.
773 """
774 assert count >= 0
775 deleted = ""
777 if self.cursor_position > 0:
778 deleted = self.text[self.cursor_position - count : self.cursor_position]
780 new_text = (
781 self.text[: self.cursor_position - count]
782 + self.text[self.cursor_position :]
783 )
784 new_cursor_position = self.cursor_position - len(deleted)
786 # Set new Document atomically.
787 self.document = Document(new_text, new_cursor_position)
789 return deleted
791 def delete(self, count: int = 1) -> str:
792 """
793 Delete specified number of characters and Return the deleted text.
794 """
795 if self.cursor_position < len(self.text):
796 deleted = self.document.text_after_cursor[:count]
797 self.text = (
798 self.text[: self.cursor_position]
799 + self.text[self.cursor_position + len(deleted) :]
800 )
801 return deleted
802 else:
803 return ""
805 def join_next_line(self, separator: str = " ") -> None:
806 """
807 Join the next line to the current one by deleting the line ending after
808 the current line.
809 """
810 if not self.document.on_last_line:
811 self.cursor_position += self.document.get_end_of_line_position()
812 self.delete()
814 # Remove spaces.
815 self.text = (
816 self.document.text_before_cursor
817 + separator
818 + self.document.text_after_cursor.lstrip(" ")
819 )
821 def join_selected_lines(self, separator: str = " ") -> None:
822 """
823 Join the selected lines.
824 """
825 assert self.selection_state
827 # Get lines.
828 from_, to = sorted(
829 [self.cursor_position, self.selection_state.original_cursor_position]
830 )
832 before = self.text[:from_]
833 lines = self.text[from_:to].splitlines()
834 after = self.text[to:]
836 # Replace leading spaces with just one space.
837 lines = [l.lstrip(" ") + separator for l in lines]
839 # Set new document.
840 self.document = Document(
841 text=before + "".join(lines) + after,
842 cursor_position=len(before + "".join(lines[:-1])) - 1,
843 )
845 def swap_characters_before_cursor(self) -> None:
846 """
847 Swap the last two characters before the cursor.
848 """
849 pos = self.cursor_position
851 if pos >= 2:
852 a = self.text[pos - 2]
853 b = self.text[pos - 1]
855 self.text = self.text[: pos - 2] + b + a + self.text[pos:]
857 def go_to_history(self, index: int) -> None:
858 """
859 Go to this item in the history.
860 """
861 if index < len(self._working_lines):
862 self.working_index = index
863 self.cursor_position = len(self.text)
865 def complete_next(self, count: int = 1, disable_wrap_around: bool = False) -> None:
866 """
867 Browse to the next completions.
868 (Does nothing if there are no completion.)
869 """
870 index: int | None
872 if self.complete_state:
873 completions_count = len(self.complete_state.completions)
875 if self.complete_state.complete_index is None:
876 index = 0
877 elif self.complete_state.complete_index == completions_count - 1:
878 index = None
880 if disable_wrap_around:
881 return
882 else:
883 index = min(
884 completions_count - 1, self.complete_state.complete_index + count
885 )
886 self.go_to_completion(index)
888 def complete_previous(
889 self, count: int = 1, disable_wrap_around: bool = False
890 ) -> None:
891 """
892 Browse to the previous completions.
893 (Does nothing if there are no completion.)
894 """
895 index: int | None
897 if self.complete_state:
898 if self.complete_state.complete_index == 0:
899 index = None
901 if disable_wrap_around:
902 return
903 elif self.complete_state.complete_index is None:
904 index = len(self.complete_state.completions) - 1
905 else:
906 index = max(0, self.complete_state.complete_index - count)
908 self.go_to_completion(index)
910 def cancel_completion(self) -> None:
911 """
912 Cancel completion, go back to the original text.
913 """
914 if self.complete_state:
915 self.go_to_completion(None)
916 self.complete_state = None
918 def _set_completions(self, completions: list[Completion]) -> CompletionState:
919 """
920 Start completions. (Generate list of completions and initialize.)
922 By default, no completion will be selected.
923 """
924 self.complete_state = CompletionState(
925 original_document=self.document, completions=completions
926 )
928 # Trigger event. This should eventually invalidate the layout.
929 self.on_completions_changed.fire()
931 return self.complete_state
933 def start_history_lines_completion(self) -> None:
934 """
935 Start a completion based on all the other lines in the document and the
936 history.
937 """
938 found_completions: set[str] = set()
939 completions = []
941 # For every line of the whole history, find matches with the current line.
942 current_line = self.document.current_line_before_cursor.lstrip()
944 for i, string in enumerate(self._working_lines):
945 for j, l in enumerate(string.split("\n")):
946 l = l.strip()
947 if l and l.startswith(current_line):
948 # When a new line has been found.
949 if l not in found_completions:
950 found_completions.add(l)
952 # Create completion.
953 if i == self.working_index:
954 display_meta = "Current, line %s" % (j + 1)
955 else:
956 display_meta = f"History {i + 1}, line {j + 1}"
958 completions.append(
959 Completion(
960 text=l,
961 start_position=-len(current_line),
962 display_meta=display_meta,
963 )
964 )
966 self._set_completions(completions=completions[::-1])
967 self.go_to_completion(0)
969 def go_to_completion(self, index: int | None) -> None:
970 """
971 Select a completion from the list of current completions.
972 """
973 assert self.complete_state
975 # Set new completion
976 state = self.complete_state
977 state.go_to_index(index)
979 # Set text/cursor position
980 new_text, new_cursor_position = state.new_text_and_position()
981 self.document = Document(new_text, new_cursor_position)
983 # (changing text/cursor position will unset complete_state.)
984 self.complete_state = state
986 def apply_completion(self, completion: Completion) -> None:
987 """
988 Insert a given completion.
989 """
990 # If there was already a completion active, cancel that one.
991 if self.complete_state:
992 self.go_to_completion(None)
993 self.complete_state = None
995 # Insert text from the given completion.
996 self.delete_before_cursor(-completion.start_position)
997 self.insert_text(completion.text)
999 def _set_history_search(self) -> None:
1000 """
1001 Set `history_search_text`.
1002 (The text before the cursor will be used for filtering the history.)
1003 """
1004 if self.enable_history_search():
1005 if self.history_search_text is None:
1006 self.history_search_text = self.document.text_before_cursor
1007 else:
1008 self.history_search_text = None
1010 def _history_matches(self, i: int) -> bool:
1011 """
1012 True when the current entry matches the history search.
1013 (when we don't have history search, it's also True.)
1014 """
1015 return self.history_search_text is None or self._working_lines[i].startswith(
1016 self.history_search_text
1017 )
1019 def history_forward(self, count: int = 1) -> None:
1020 """
1021 Move forwards through the history.
1023 :param count: Amount of items to move forward.
1024 """
1025 self._set_history_search()
1027 # Go forward in history.
1028 found_something = False
1030 for i in range(self.working_index + 1, len(self._working_lines)):
1031 if self._history_matches(i):
1032 self.working_index = i
1033 count -= 1
1034 found_something = True
1035 if count == 0:
1036 break
1038 # If we found an entry, move cursor to the end of the first line.
1039 if found_something:
1040 self.cursor_position = 0
1041 self.cursor_position += self.document.get_end_of_line_position()
1043 def history_backward(self, count: int = 1) -> None:
1044 """
1045 Move backwards through history.
1046 """
1047 self._set_history_search()
1049 # Go back in history.
1050 found_something = False
1052 for i in range(self.working_index - 1, -1, -1):
1053 if self._history_matches(i):
1054 self.working_index = i
1055 count -= 1
1056 found_something = True
1057 if count == 0:
1058 break
1060 # If we move to another entry, move cursor to the end of the line.
1061 if found_something:
1062 self.cursor_position = len(self.text)
1064 def yank_nth_arg(self, n: int | None = None, _yank_last_arg: bool = False) -> None:
1065 """
1066 Pick nth word from previous history entry (depending on current
1067 `yank_nth_arg_state`) and insert it at current position. Rotate through
1068 history if called repeatedly. If no `n` has been given, take the first
1069 argument. (The second word.)
1071 :param n: (None or int), The index of the word from the previous line
1072 to take.
1073 """
1074 assert n is None or isinstance(n, int)
1075 history_strings = self.history.get_strings()
1077 if not len(history_strings):
1078 return
1080 # Make sure we have a `YankNthArgState`.
1081 if self.yank_nth_arg_state is None:
1082 state = YankNthArgState(n=-1 if _yank_last_arg else 1)
1083 else:
1084 state = self.yank_nth_arg_state
1086 if n is not None:
1087 state.n = n
1089 # Get new history position.
1090 new_pos = state.history_position - 1
1091 if -new_pos > len(history_strings):
1092 new_pos = -1
1094 # Take argument from line.
1095 line = history_strings[new_pos]
1097 words = [w.strip() for w in _QUOTED_WORDS_RE.split(line)]
1098 words = [w for w in words if w]
1099 try:
1100 word = words[state.n]
1101 except IndexError:
1102 word = ""
1104 # Insert new argument.
1105 if state.previous_inserted_word:
1106 self.delete_before_cursor(len(state.previous_inserted_word))
1107 self.insert_text(word)
1109 # Save state again for next completion. (Note that the 'insert'
1110 # operation from above clears `self.yank_nth_arg_state`.)
1111 state.previous_inserted_word = word
1112 state.history_position = new_pos
1113 self.yank_nth_arg_state = state
1115 def yank_last_arg(self, n: int | None = None) -> None:
1116 """
1117 Like `yank_nth_arg`, but if no argument has been given, yank the last
1118 word by default.
1119 """
1120 self.yank_nth_arg(n=n, _yank_last_arg=True)
1122 def start_selection(
1123 self, selection_type: SelectionType = SelectionType.CHARACTERS
1124 ) -> None:
1125 """
1126 Take the current cursor position as the start of this selection.
1127 """
1128 self.selection_state = SelectionState(self.cursor_position, selection_type)
1130 def copy_selection(self, _cut: bool = False) -> ClipboardData:
1131 """
1132 Copy selected text and return :class:`.ClipboardData` instance.
1134 Notice that this doesn't store the copied data on the clipboard yet.
1135 You can store it like this:
1137 .. code:: python
1139 data = buffer.copy_selection()
1140 get_app().clipboard.set_data(data)
1141 """
1142 new_document, clipboard_data = self.document.cut_selection()
1143 if _cut:
1144 self.document = new_document
1146 self.selection_state = None
1147 return clipboard_data
1149 def cut_selection(self) -> ClipboardData:
1150 """
1151 Delete selected text and return :class:`.ClipboardData` instance.
1152 """
1153 return self.copy_selection(_cut=True)
1155 def paste_clipboard_data(
1156 self,
1157 data: ClipboardData,
1158 paste_mode: PasteMode = PasteMode.EMACS,
1159 count: int = 1,
1160 ) -> None:
1161 """
1162 Insert the data from the clipboard.
1163 """
1164 assert isinstance(data, ClipboardData)
1165 assert paste_mode in (PasteMode.VI_BEFORE, PasteMode.VI_AFTER, PasteMode.EMACS)
1167 original_document = self.document
1168 self.document = self.document.paste_clipboard_data(
1169 data, paste_mode=paste_mode, count=count
1170 )
1172 # Remember original document. This assignment should come at the end,
1173 # because assigning to 'document' will erase it.
1174 self.document_before_paste = original_document
1176 def newline(self, copy_margin: bool = True) -> None:
1177 """
1178 Insert a line ending at the current position.
1179 """
1180 if copy_margin:
1181 self.insert_text("\n" + self.document.leading_whitespace_in_current_line)
1182 else:
1183 self.insert_text("\n")
1185 def insert_line_above(self, copy_margin: bool = True) -> None:
1186 """
1187 Insert a new line above the current one.
1188 """
1189 if copy_margin:
1190 insert = self.document.leading_whitespace_in_current_line + "\n"
1191 else:
1192 insert = "\n"
1194 self.cursor_position += self.document.get_start_of_line_position()
1195 self.insert_text(insert)
1196 self.cursor_position -= 1
1198 def insert_line_below(self, copy_margin: bool = True) -> None:
1199 """
1200 Insert a new line below the current one.
1201 """
1202 if copy_margin:
1203 insert = "\n" + self.document.leading_whitespace_in_current_line
1204 else:
1205 insert = "\n"
1207 self.cursor_position += self.document.get_end_of_line_position()
1208 self.insert_text(insert)
1210 def insert_text(
1211 self,
1212 data: str,
1213 overwrite: bool = False,
1214 move_cursor: bool = True,
1215 fire_event: bool = True,
1216 ) -> None:
1217 """
1218 Insert characters at cursor position.
1220 :param fire_event: Fire `on_text_insert` event. This is mainly used to
1221 trigger autocompletion while typing.
1222 """
1223 # Original text & cursor position.
1224 otext = self.text
1225 ocpos = self.cursor_position
1227 # In insert/text mode.
1228 if overwrite:
1229 # Don't overwrite the newline itself. Just before the line ending,
1230 # it should act like insert mode.
1231 overwritten_text = otext[ocpos : ocpos + len(data)]
1232 if "\n" in overwritten_text:
1233 overwritten_text = overwritten_text[: overwritten_text.find("\n")]
1235 text = otext[:ocpos] + data + otext[ocpos + len(overwritten_text) :]
1236 else:
1237 text = otext[:ocpos] + data + otext[ocpos:]
1239 if move_cursor:
1240 cpos = self.cursor_position + len(data)
1241 else:
1242 cpos = self.cursor_position
1244 # Set new document.
1245 # (Set text and cursor position at the same time. Otherwise, setting
1246 # the text will fire a change event before the cursor position has been
1247 # set. It works better to have this atomic.)
1248 self.document = Document(text, cpos)
1250 # Fire 'on_text_insert' event.
1251 if fire_event: # XXX: rename to `start_complete`.
1252 self.on_text_insert.fire()
1254 # Only complete when "complete_while_typing" is enabled.
1255 if self.completer and self.complete_while_typing():
1256 get_app().create_background_task(self._async_completer())
1258 # Call auto_suggest.
1259 if self.auto_suggest:
1260 get_app().create_background_task(self._async_suggester())
1262 def undo(self) -> None:
1263 # Pop from the undo-stack until we find a text that if different from
1264 # the current text. (The current logic of `save_to_undo_stack` will
1265 # cause that the top of the undo stack is usually the same as the
1266 # current text, so in that case we have to pop twice.)
1267 while self._undo_stack:
1268 text, pos = self._undo_stack.pop()
1270 if text != self.text:
1271 # Push current text to redo stack.
1272 self._redo_stack.append((self.text, self.cursor_position))
1274 # Set new text/cursor_position.
1275 self.document = Document(text, cursor_position=pos)
1276 break
1278 def redo(self) -> None:
1279 if self._redo_stack:
1280 # Copy current state on undo stack.
1281 self.save_to_undo_stack(clear_redo_stack=False)
1283 # Pop state from redo stack.
1284 text, pos = self._redo_stack.pop()
1285 self.document = Document(text, cursor_position=pos)
1287 def validate(self, set_cursor: bool = False) -> bool:
1288 """
1289 Returns `True` if valid.
1291 :param set_cursor: Set the cursor position, if an error was found.
1292 """
1293 # Don't call the validator again, if it was already called for the
1294 # current input.
1295 if self.validation_state != ValidationState.UNKNOWN:
1296 return self.validation_state == ValidationState.VALID
1298 # Call validator.
1299 if self.validator:
1300 try:
1301 self.validator.validate(self.document)
1302 except ValidationError as e:
1303 # Set cursor position (don't allow invalid values.)
1304 if set_cursor:
1305 self.cursor_position = min(
1306 max(0, e.cursor_position), len(self.text)
1307 )
1309 self.validation_state = ValidationState.INVALID
1310 self.validation_error = e
1311 return False
1313 # Handle validation result.
1314 self.validation_state = ValidationState.VALID
1315 self.validation_error = None
1316 return True
1318 async def _validate_async(self) -> None:
1319 """
1320 Asynchronous version of `validate()`.
1321 This one doesn't set the cursor position.
1323 We have both variants, because a synchronous version is required.
1324 Handling the ENTER key needs to be completely synchronous, otherwise
1325 stuff like type-ahead is going to give very weird results. (People
1326 could type input while the ENTER key is still processed.)
1328 An asynchronous version is required if we have `validate_while_typing`
1329 enabled.
1330 """
1331 while True:
1332 # Don't call the validator again, if it was already called for the
1333 # current input.
1334 if self.validation_state != ValidationState.UNKNOWN:
1335 return
1337 # Call validator.
1338 error = None
1339 document = self.document
1341 if self.validator:
1342 try:
1343 await self.validator.validate_async(self.document)
1344 except ValidationError as e:
1345 error = e
1347 # If the document changed during the validation, try again.
1348 if self.document != document:
1349 continue
1351 # Handle validation result.
1352 if error:
1353 self.validation_state = ValidationState.INVALID
1354 else:
1355 self.validation_state = ValidationState.VALID
1357 self.validation_error = error
1358 get_app().invalidate() # Trigger redraw (display error).
1360 def append_to_history(self) -> None:
1361 """
1362 Append the current input to the history.
1363 """
1364 # Save at the tail of the history. (But don't if the last entry the
1365 # history is already the same.)
1366 if self.text:
1367 history_strings = self.history.get_strings()
1368 if not len(history_strings) or history_strings[-1] != self.text:
1369 self.history.append_string(self.text)
1371 def _search(
1372 self,
1373 search_state: SearchState,
1374 include_current_position: bool = False,
1375 count: int = 1,
1376 ) -> tuple[int, int] | None:
1377 """
1378 Execute search. Return (working_index, cursor_position) tuple when this
1379 search is applied. Returns `None` when this text cannot be found.
1380 """
1381 assert count > 0
1383 text = search_state.text
1384 direction = search_state.direction
1385 ignore_case = search_state.ignore_case()
1387 def search_once(
1388 working_index: int, document: Document
1389 ) -> tuple[int, Document] | None:
1390 """
1391 Do search one time.
1392 Return (working_index, document) or `None`
1393 """
1394 if direction == SearchDirection.FORWARD:
1395 # Try find at the current input.
1396 new_index = document.find(
1397 text,
1398 include_current_position=include_current_position,
1399 ignore_case=ignore_case,
1400 )
1402 if new_index is not None:
1403 return (
1404 working_index,
1405 Document(document.text, document.cursor_position + new_index),
1406 )
1407 else:
1408 # No match, go forward in the history. (Include len+1 to wrap around.)
1409 # (Here we should always include all cursor positions, because
1410 # it's a different line.)
1411 for i in range(working_index + 1, len(self._working_lines) + 1):
1412 i %= len(self._working_lines)
1414 document = Document(self._working_lines[i], 0)
1415 new_index = document.find(
1416 text, include_current_position=True, ignore_case=ignore_case
1417 )
1418 if new_index is not None:
1419 return (i, Document(document.text, new_index))
1420 else:
1421 # Try find at the current input.
1422 new_index = document.find_backwards(text, ignore_case=ignore_case)
1424 if new_index is not None:
1425 return (
1426 working_index,
1427 Document(document.text, document.cursor_position + new_index),
1428 )
1429 else:
1430 # No match, go back in the history. (Include -1 to wrap around.)
1431 for i in range(working_index - 1, -2, -1):
1432 i %= len(self._working_lines)
1434 document = Document(
1435 self._working_lines[i], len(self._working_lines[i])
1436 )
1437 new_index = document.find_backwards(
1438 text, ignore_case=ignore_case
1439 )
1440 if new_index is not None:
1441 return (
1442 i,
1443 Document(document.text, len(document.text) + new_index),
1444 )
1445 return None
1447 # Do 'count' search iterations.
1448 working_index = self.working_index
1449 document = self.document
1450 for _ in range(count):
1451 result = search_once(working_index, document)
1452 if result is None:
1453 return None # Nothing found.
1454 else:
1455 working_index, document = result
1457 return (working_index, document.cursor_position)
1459 def document_for_search(self, search_state: SearchState) -> Document:
1460 """
1461 Return a :class:`~prompt_toolkit.document.Document` instance that has
1462 the text/cursor position for this search, if we would apply it. This
1463 will be used in the
1464 :class:`~prompt_toolkit.layout.BufferControl` to display feedback while
1465 searching.
1466 """
1467 search_result = self._search(search_state, include_current_position=True)
1469 if search_result is None:
1470 return self.document
1471 else:
1472 working_index, cursor_position = search_result
1474 # Keep selection, when `working_index` was not changed.
1475 if working_index == self.working_index:
1476 selection = self.selection_state
1477 else:
1478 selection = None
1480 return Document(
1481 self._working_lines[working_index], cursor_position, selection=selection
1482 )
1484 def get_search_position(
1485 self,
1486 search_state: SearchState,
1487 include_current_position: bool = True,
1488 count: int = 1,
1489 ) -> int:
1490 """
1491 Get the cursor position for this search.
1492 (This operation won't change the `working_index`. It's won't go through
1493 the history. Vi text objects can't span multiple items.)
1494 """
1495 search_result = self._search(
1496 search_state, include_current_position=include_current_position, count=count
1497 )
1499 if search_result is None:
1500 return self.cursor_position
1501 else:
1502 working_index, cursor_position = search_result
1503 return cursor_position
1505 def apply_search(
1506 self,
1507 search_state: SearchState,
1508 include_current_position: bool = True,
1509 count: int = 1,
1510 ) -> None:
1511 """
1512 Apply search. If something is found, set `working_index` and
1513 `cursor_position`.
1514 """
1515 search_result = self._search(
1516 search_state, include_current_position=include_current_position, count=count
1517 )
1519 if search_result is not None:
1520 working_index, cursor_position = search_result
1521 self.working_index = working_index
1522 self.cursor_position = cursor_position
1524 def exit_selection(self) -> None:
1525 self.selection_state = None
1527 def _editor_simple_tempfile(self) -> tuple[str, Callable[[], None]]:
1528 """
1529 Simple (file) tempfile implementation.
1530 Return (tempfile, cleanup_func).
1531 """
1532 suffix = to_str(self.tempfile_suffix)
1533 descriptor, filename = tempfile.mkstemp(suffix)
1535 os.write(descriptor, self.text.encode("utf-8"))
1536 os.close(descriptor)
1538 def cleanup() -> None:
1539 os.unlink(filename)
1541 return filename, cleanup
1543 def _editor_complex_tempfile(self) -> tuple[str, Callable[[], None]]:
1544 # Complex (directory) tempfile implementation.
1545 headtail = to_str(self.tempfile)
1546 if not headtail:
1547 # Revert to simple case.
1548 return self._editor_simple_tempfile()
1549 headtail = str(headtail)
1551 # Try to make according to tempfile logic.
1552 head, tail = os.path.split(headtail)
1553 if os.path.isabs(head):
1554 head = head[1:]
1556 dirpath = tempfile.mkdtemp()
1557 if head:
1558 dirpath = os.path.join(dirpath, head)
1559 # Assume there is no issue creating dirs in this temp dir.
1560 os.makedirs(dirpath)
1562 # Open the filename and write current text.
1563 filename = os.path.join(dirpath, tail)
1564 with open(filename, "w", encoding="utf-8") as fh:
1565 fh.write(self.text)
1567 def cleanup() -> None:
1568 shutil.rmtree(dirpath)
1570 return filename, cleanup
1572 def open_in_editor(self, validate_and_handle: bool = False) -> asyncio.Task[None]:
1573 """
1574 Open code in editor.
1576 This returns a future, and runs in a thread executor.
1577 """
1578 if self.read_only():
1579 raise EditReadOnlyBuffer()
1581 # Write current text to temporary file
1582 if self.tempfile:
1583 filename, cleanup_func = self._editor_complex_tempfile()
1584 else:
1585 filename, cleanup_func = self._editor_simple_tempfile()
1587 async def run() -> None:
1588 try:
1589 # Open in editor
1590 # (We need to use `run_in_terminal`, because not all editors go to
1591 # the alternate screen buffer, and some could influence the cursor
1592 # position.)
1593 success = await run_in_terminal(
1594 lambda: self._open_file_in_editor(filename), in_executor=True
1595 )
1597 # Read content again.
1598 if success:
1599 with open(filename, "rb") as f:
1600 text = f.read().decode("utf-8")
1602 # Drop trailing newline. (Editors are supposed to add it at the
1603 # end, but we don't need it.)
1604 if text.endswith("\n"):
1605 text = text[:-1]
1607 self.document = Document(text=text, cursor_position=len(text))
1609 # Accept the input.
1610 if validate_and_handle:
1611 self.validate_and_handle()
1613 finally:
1614 # Clean up temp dir/file.
1615 cleanup_func()
1617 return get_app().create_background_task(run())
1619 def _open_file_in_editor(self, filename: str) -> bool:
1620 """
1621 Call editor executable.
1623 Return True when we received a zero return code.
1624 """
1625 # If the 'VISUAL' or 'EDITOR' environment variable has been set, use that.
1626 # Otherwise, fall back to the first available editor that we can find.
1627 visual = os.environ.get("VISUAL")
1628 editor = os.environ.get("EDITOR")
1630 editors = [
1631 visual,
1632 editor,
1633 # Order of preference.
1634 "/usr/bin/editor",
1635 "/usr/bin/nano",
1636 "/usr/bin/pico",
1637 "/usr/bin/vi",
1638 "/usr/bin/emacs",
1639 ]
1641 for e in editors:
1642 if e:
1643 try:
1644 # Use 'shlex.split()', because $VISUAL can contain spaces
1645 # and quotes.
1646 returncode = subprocess.call(shlex.split(e) + [filename])
1647 return returncode == 0
1649 except OSError:
1650 # Executable does not exist, try the next one.
1651 pass
1653 return False
1655 def start_completion(
1656 self,
1657 select_first: bool = False,
1658 select_last: bool = False,
1659 insert_common_part: bool = False,
1660 complete_event: CompleteEvent | None = None,
1661 ) -> None:
1662 """
1663 Start asynchronous autocompletion of this buffer.
1664 (This will do nothing if a previous completion was still in progress.)
1665 """
1666 # Only one of these options can be selected.
1667 assert select_first + select_last + insert_common_part <= 1
1669 get_app().create_background_task(
1670 self._async_completer(
1671 select_first=select_first,
1672 select_last=select_last,
1673 insert_common_part=insert_common_part,
1674 complete_event=complete_event
1675 or CompleteEvent(completion_requested=True),
1676 )
1677 )
1679 def _create_completer_coroutine(self) -> Callable[..., Coroutine[Any, Any, None]]:
1680 """
1681 Create function for asynchronous autocompletion.
1683 (This consumes the asynchronous completer generator, which possibly
1684 runs the completion algorithm in another thread.)
1685 """
1687 def completion_does_nothing(document: Document, completion: Completion) -> bool:
1688 """
1689 Return `True` if applying this completion doesn't have any effect.
1690 (When it doesn't insert any new text.
1691 """
1692 text_before_cursor = document.text_before_cursor
1693 replaced_text = text_before_cursor[
1694 len(text_before_cursor) + completion.start_position :
1695 ]
1696 return replaced_text == completion.text
1698 @_only_one_at_a_time
1699 async def async_completer(
1700 select_first: bool = False,
1701 select_last: bool = False,
1702 insert_common_part: bool = False,
1703 complete_event: CompleteEvent | None = None,
1704 ) -> None:
1705 document = self.document
1706 complete_event = complete_event or CompleteEvent(text_inserted=True)
1708 # Don't complete when we already have completions.
1709 if self.complete_state or not self.completer:
1710 return
1712 # Create an empty CompletionState.
1713 complete_state = CompletionState(original_document=self.document)
1714 self.complete_state = complete_state
1716 def proceed() -> bool:
1717 """Keep retrieving completions. Input text has not yet changed
1718 while generating completions."""
1719 return self.complete_state == complete_state
1721 refresh_needed = asyncio.Event()
1723 async def refresh_while_loading() -> None:
1724 """Background loop to refresh the UI at most 3 times a second
1725 while the completion are loading. Calling
1726 `on_completions_changed.fire()` for every completion that we
1727 receive is too expensive when there are many completions. (We
1728 could tune `Application.max_render_postpone_time` and
1729 `Application.min_redraw_interval`, but having this here is a
1730 better approach.)
1731 """
1732 while True:
1733 self.on_completions_changed.fire()
1734 refresh_needed.clear()
1735 await asyncio.sleep(0.3)
1736 await refresh_needed.wait()
1738 refresh_task = asyncio.ensure_future(refresh_while_loading())
1739 try:
1740 # Load.
1741 async with aclosing(
1742 self.completer.get_completions_async(document, complete_event)
1743 ) as async_generator:
1744 async for completion in async_generator:
1745 complete_state.completions.append(completion)
1746 refresh_needed.set()
1748 # If the input text changes, abort.
1749 if not proceed():
1750 break
1751 finally:
1752 refresh_task.cancel()
1754 # Refresh one final time after we got everything.
1755 self.on_completions_changed.fire()
1757 completions = complete_state.completions
1759 # When there is only one completion, which has nothing to add, ignore it.
1760 if len(completions) == 1 and completion_does_nothing(
1761 document, completions[0]
1762 ):
1763 del completions[:]
1765 # Set completions if the text was not yet changed.
1766 if proceed():
1767 # When no completions were found, or when the user selected
1768 # already a completion by using the arrow keys, don't do anything.
1769 if (
1770 not self.complete_state
1771 or self.complete_state.complete_index is not None
1772 ):
1773 return
1775 # When there are no completions, reset completion state anyway.
1776 if not completions:
1777 self.complete_state = None
1778 # Render the ui if the completion menu was shown
1779 # it is needed especially if there is one completion and it was deleted.
1780 self.on_completions_changed.fire()
1781 return
1783 # Select first/last or insert common part, depending on the key
1784 # binding. (For this we have to wait until all completions are
1785 # loaded.)
1787 if select_first:
1788 self.go_to_completion(0)
1790 elif select_last:
1791 self.go_to_completion(len(completions) - 1)
1793 elif insert_common_part:
1794 common_part = get_common_complete_suffix(document, completions)
1795 if common_part:
1796 # Insert the common part, update completions.
1797 self.insert_text(common_part)
1798 if len(completions) > 1:
1799 # (Don't call `async_completer` again, but
1800 # recalculate completions. See:
1801 # https://github.com/ipython/ipython/issues/9658)
1802 completions[:] = [
1803 c.new_completion_from_position(len(common_part))
1804 for c in completions
1805 ]
1807 self._set_completions(completions=completions)
1808 else:
1809 self.complete_state = None
1810 else:
1811 # When we were asked to insert the "common"
1812 # prefix, but there was no common suffix but
1813 # still exactly one match, then select the
1814 # first. (It could be that we have a completion
1815 # which does * expansion, like '*.py', with
1816 # exactly one match.)
1817 if len(completions) == 1:
1818 self.go_to_completion(0)
1820 else:
1821 # If the last operation was an insert, (not a delete), restart
1822 # the completion coroutine.
1824 if self.document.text_before_cursor == document.text_before_cursor:
1825 return # Nothing changed.
1827 if self.document.text_before_cursor.startswith(
1828 document.text_before_cursor
1829 ):
1830 raise _Retry
1832 return async_completer
1834 def _create_auto_suggest_coroutine(self) -> Callable[[], Coroutine[Any, Any, None]]:
1835 """
1836 Create function for asynchronous auto suggestion.
1837 (This can be in another thread.)
1838 """
1840 @_only_one_at_a_time
1841 async def async_suggestor() -> None:
1842 document = self.document
1844 # Don't suggest when we already have a suggestion.
1845 if self.suggestion or not self.auto_suggest:
1846 return
1848 suggestion = await self.auto_suggest.get_suggestion_async(self, document)
1850 # Set suggestion only if the text was not yet changed.
1851 if self.document == document:
1852 # Set suggestion and redraw interface.
1853 self.suggestion = suggestion
1854 self.on_suggestion_set.fire()
1855 else:
1856 # Otherwise, restart thread.
1857 raise _Retry
1859 return async_suggestor
1861 def _create_auto_validate_coroutine(
1862 self,
1863 ) -> Callable[[], Coroutine[Any, Any, None]]:
1864 """
1865 Create a function for asynchronous validation while typing.
1866 (This can be in another thread.)
1867 """
1869 @_only_one_at_a_time
1870 async def async_validator() -> None:
1871 await self._validate_async()
1873 return async_validator
1875 def validate_and_handle(self) -> None:
1876 """
1877 Validate buffer and handle the accept action.
1878 """
1879 valid = self.validate(set_cursor=True)
1881 # When the validation succeeded, accept the input.
1882 if valid:
1883 if self.accept_handler:
1884 keep_text = self.accept_handler(self)
1885 else:
1886 keep_text = False
1888 self.append_to_history()
1890 if not keep_text:
1891 self.reset()
1894_T = TypeVar("_T", bound=Callable[..., Coroutine[Any, Any, None]])
1897def _only_one_at_a_time(coroutine: _T) -> _T:
1898 """
1899 Decorator that only starts the coroutine only if the previous call has
1900 finished. (Used to make sure that we have only one autocompleter, auto
1901 suggestor and validator running at a time.)
1903 When the coroutine raises `_Retry`, it is restarted.
1904 """
1905 running = False
1907 @wraps(coroutine)
1908 async def new_coroutine(*a: Any, **kw: Any) -> Any:
1909 nonlocal running
1911 # Don't start a new function, if the previous is still in progress.
1912 if running:
1913 return
1915 running = True
1917 try:
1918 while True:
1919 try:
1920 await coroutine(*a, **kw)
1921 except _Retry:
1922 continue
1923 else:
1924 return None
1925 finally:
1926 running = False
1928 return cast(_T, new_coroutine)
1931class _Retry(Exception):
1932 "Retry in `_only_one_at_a_time`."
1935def indent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None:
1936 """
1937 Indent text of a :class:`.Buffer` object.
1938 """
1939 current_row = buffer.document.cursor_position_row
1940 current_col = buffer.document.cursor_position_col
1941 line_range = range(from_row, to_row)
1943 # Apply transformation.
1944 indent_content = " " * count
1945 new_text = buffer.transform_lines(line_range, lambda l: indent_content + l)
1946 buffer.document = Document(
1947 new_text, Document(new_text).translate_row_col_to_index(current_row, 0)
1948 )
1950 # Place cursor in the same position in text after indenting
1951 buffer.cursor_position += current_col + len(indent_content)
1954def unindent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None:
1955 """
1956 Unindent text of a :class:`.Buffer` object.
1957 """
1958 current_row = buffer.document.cursor_position_row
1959 current_col = buffer.document.cursor_position_col
1960 line_range = range(from_row, to_row)
1962 indent_content = " " * count
1964 def transform(text: str) -> str:
1965 remove = indent_content
1966 if text.startswith(remove):
1967 return text[len(remove) :]
1968 else:
1969 return text.lstrip()
1971 # Apply transformation.
1972 new_text = buffer.transform_lines(line_range, transform)
1973 buffer.document = Document(
1974 new_text, Document(new_text).translate_row_col_to_index(current_row, 0)
1975 )
1977 # Place cursor in the same position in text after dedent
1978 buffer.cursor_position += current_col - len(indent_content)
1981def reshape_text(buffer: Buffer, from_row: int, to_row: int) -> None:
1982 """
1983 Reformat text, taking the width into account.
1984 `to_row` is included.
1985 (Vi 'gq' operator.)
1986 """
1987 lines = buffer.text.splitlines(True)
1988 lines_before = lines[:from_row]
1989 lines_after = lines[to_row + 1 :]
1990 lines_to_reformat = lines[from_row : to_row + 1]
1992 if lines_to_reformat:
1993 # Take indentation from the first line.
1994 match = re.search(r"^\s*", lines_to_reformat[0])
1995 length = match.end() if match else 0 # `match` can't be None, actually.
1997 indent = lines_to_reformat[0][:length].replace("\n", "")
1999 # Now, take all the 'words' from the lines to be reshaped.
2000 words = "".join(lines_to_reformat).split()
2002 # And reshape.
2003 width = (buffer.text_width or 80) - len(indent)
2004 reshaped_text = [indent]
2005 current_width = 0
2006 for w in words:
2007 if current_width:
2008 if len(w) + current_width + 1 > width:
2009 reshaped_text.append("\n")
2010 reshaped_text.append(indent)
2011 current_width = 0
2012 else:
2013 reshaped_text.append(" ")
2014 current_width += 1
2016 reshaped_text.append(w)
2017 current_width += len(w)
2019 if reshaped_text[-1] != "\n":
2020 reshaped_text.append("\n")
2022 # Apply result.
2023 buffer.document = Document(
2024 text="".join(lines_before + reshaped_text + lines_after),
2025 cursor_position=len("".join(lines_before + reshaped_text)),
2026 )