1"""
2Data structures for the Buffer.
3It holds the text, cursor position, history, etc...
4"""
5
6from __future__ import annotations
7
8import asyncio
9import logging
10import os
11import re
12import shlex
13import shutil
14import subprocess
15import tempfile
16from collections import deque
17from enum import Enum
18from functools import wraps
19from typing import Any, Callable, Coroutine, Iterable, TypeVar, cast
20
21from .application.current import get_app
22from .application.run_in_terminal import run_in_terminal
23from .auto_suggest import AutoSuggest, Suggestion
24from .cache import FastDictCache
25from .clipboard import ClipboardData
26from .completion import (
27 CompleteEvent,
28 Completer,
29 Completion,
30 DummyCompleter,
31 get_common_complete_suffix,
32)
33from .document import Document
34from .eventloop import aclosing
35from .filters import FilterOrBool, to_filter
36from .history import History, InMemoryHistory
37from .search import SearchDirection, SearchState
38from .selection import PasteMode, SelectionState, SelectionType
39from .utils import Event, to_str
40from .validation import ValidationError, Validator
41
42__all__ = [
43 "EditReadOnlyBuffer",
44 "Buffer",
45 "CompletionState",
46 "indent",
47 "unindent",
48 "reshape_text",
49]
50
51logger = logging.getLogger(__name__)
52
53
54class EditReadOnlyBuffer(Exception):
55 "Attempt editing of read-only :class:`.Buffer`."
56
57
58class ValidationState(Enum):
59 "The validation state of a buffer. This is set after the validation."
60
61 VALID = "VALID"
62 INVALID = "INVALID"
63 UNKNOWN = "UNKNOWN"
64
65
66class CompletionState:
67 """
68 Immutable class that contains a completion state.
69 """
70
71 def __init__(
72 self,
73 original_document: Document,
74 completions: list[Completion] | None = None,
75 complete_index: int | None = None,
76 ) -> None:
77 #: Document as it was when the completion started.
78 self.original_document = original_document
79
80 #: List of all the current Completion instances which are possible at
81 #: this point.
82 self.completions = completions or []
83
84 #: Position in the `completions` array.
85 #: This can be `None` to indicate "no completion", the original text.
86 self.complete_index = complete_index # Position in the `_completions` array.
87
88 def __repr__(self) -> str:
89 return f"{self.__class__.__name__}({self.original_document!r}, <{len(self.completions)!r}> completions, index={self.complete_index!r})"
90
91 def go_to_index(self, index: int | None) -> None:
92 """
93 Create a new :class:`.CompletionState` object with the new index.
94
95 When `index` is `None` deselect the completion.
96 """
97 if self.completions:
98 assert index is None or 0 <= index < len(self.completions)
99 self.complete_index = index
100
101 def new_text_and_position(self) -> tuple[str, int]:
102 """
103 Return (new_text, new_cursor_position) for this completion.
104 """
105 if self.complete_index is None:
106 return self.original_document.text, self.original_document.cursor_position
107 else:
108 original_text_before_cursor = self.original_document.text_before_cursor
109 original_text_after_cursor = self.original_document.text_after_cursor
110
111 c = self.completions[self.complete_index]
112 if c.start_position == 0:
113 before = original_text_before_cursor
114 else:
115 before = original_text_before_cursor[: c.start_position]
116
117 new_text = before + c.text + original_text_after_cursor
118 new_cursor_position = len(before) + len(c.text)
119 return new_text, new_cursor_position
120
121 @property
122 def current_completion(self) -> Completion | None:
123 """
124 Return the current completion, or return `None` when no completion is
125 selected.
126 """
127 if self.complete_index is not None:
128 return self.completions[self.complete_index]
129 return None
130
131
132_QUOTED_WORDS_RE = re.compile(r"""(\s+|".*?"|'.*?')""")
133
134
135class YankNthArgState:
136 """
137 For yank-last-arg/yank-nth-arg: Keep track of where we are in the history.
138 """
139
140 def __init__(
141 self, history_position: int = 0, n: int = -1, previous_inserted_word: str = ""
142 ) -> None:
143 self.history_position = history_position
144 self.previous_inserted_word = previous_inserted_word
145 self.n = n
146
147 def __repr__(self) -> str:
148 return f"{self.__class__.__name__}(history_position={self.history_position!r}, n={self.n!r}, previous_inserted_word={self.previous_inserted_word!r})"
149
150
151BufferEventHandler = Callable[["Buffer"], None]
152BufferAcceptHandler = Callable[["Buffer"], bool]
153
154
155class Buffer:
156 """
157 The core data structure that holds the text and cursor position of the
158 current input line and implements all text manipulations on top of it. It
159 also implements the history, undo stack and the completion state.
160
161 :param completer: :class:`~prompt_toolkit.completion.Completer` instance.
162 :param history: :class:`~prompt_toolkit.history.History` instance.
163 :param tempfile_suffix: The tempfile suffix (extension) to be used for the
164 "open in editor" function. For a Python REPL, this would be ".py", so
165 that the editor knows the syntax highlighting to use. This can also be
166 a callable that returns a string.
167 :param tempfile: For more advanced tempfile situations where you need
168 control over the subdirectories and filename. For a Git Commit Message,
169 this would be ".git/COMMIT_EDITMSG", so that the editor knows the syntax
170 highlighting to use. This can also be a callable that returns a string.
171 :param name: Name for this buffer. E.g. DEFAULT_BUFFER. This is mostly
172 useful for key bindings where we sometimes prefer to refer to a buffer
173 by their name instead of by reference.
174 :param accept_handler: Called when the buffer input is accepted. (Usually
175 when the user presses `enter`.) The accept handler receives this
176 `Buffer` as input and should return True when the buffer text should be
177 kept instead of calling reset.
178
179 In case of a `PromptSession` for instance, we want to keep the text,
180 because we will exit the application, and only reset it during the next
181 run.
182 :param max_number_of_completions: Never display more than this number of
183 completions, even when the completer can produce more (limited by
184 default to 10k for performance).
185
186 Events:
187
188 :param on_text_changed: When the buffer text changes. (Callable or None.)
189 :param on_text_insert: When new text is inserted. (Callable or None.)
190 :param on_cursor_position_changed: When the cursor moves. (Callable or None.)
191 :param on_completions_changed: When the completions were changed. (Callable or None.)
192 :param on_suggestion_set: When an auto-suggestion text has been set. (Callable or None.)
193
194 Filters:
195
196 :param complete_while_typing: :class:`~prompt_toolkit.filters.Filter`
197 or `bool`. Decide whether or not to do asynchronous autocompleting while
198 typing.
199 :param validate_while_typing: :class:`~prompt_toolkit.filters.Filter`
200 or `bool`. Decide whether or not to do asynchronous validation while
201 typing.
202 :param enable_history_search: :class:`~prompt_toolkit.filters.Filter` or
203 `bool` to indicate when up-arrow partial string matching is enabled. It
204 is advised to not enable this at the same time as
205 `complete_while_typing`, because when there is an autocompletion found,
206 the up arrows usually browse through the completions, rather than
207 through the history.
208 :param read_only: :class:`~prompt_toolkit.filters.Filter`. When True,
209 changes will not be allowed.
210 :param multiline: :class:`~prompt_toolkit.filters.Filter` or `bool`. When
211 not set, pressing `Enter` will call the `accept_handler`. Otherwise,
212 pressing `Esc-Enter` is required.
213 """
214
215 def __init__(
216 self,
217 completer: Completer | None = None,
218 auto_suggest: AutoSuggest | None = None,
219 history: History | None = None,
220 validator: Validator | None = None,
221 tempfile_suffix: str | Callable[[], str] = "",
222 tempfile: str | Callable[[], str] = "",
223 name: str = "",
224 complete_while_typing: FilterOrBool = False,
225 validate_while_typing: FilterOrBool = False,
226 enable_history_search: FilterOrBool = False,
227 document: Document | None = None,
228 accept_handler: BufferAcceptHandler | None = None,
229 read_only: FilterOrBool = False,
230 multiline: FilterOrBool = True,
231 max_number_of_completions: int = 10000,
232 on_text_changed: BufferEventHandler | None = None,
233 on_text_insert: BufferEventHandler | None = None,
234 on_cursor_position_changed: BufferEventHandler | None = None,
235 on_completions_changed: BufferEventHandler | None = None,
236 on_suggestion_set: BufferEventHandler | None = None,
237 ) -> None:
238 # Accept both filters and booleans as input.
239 enable_history_search = to_filter(enable_history_search)
240 complete_while_typing = to_filter(complete_while_typing)
241 validate_while_typing = to_filter(validate_while_typing)
242 read_only = to_filter(read_only)
243 multiline = to_filter(multiline)
244
245 self.completer = completer or DummyCompleter()
246 self.auto_suggest = auto_suggest
247 self.validator = validator
248 self.tempfile_suffix = tempfile_suffix
249 self.tempfile = tempfile
250 self.name = name
251 self.accept_handler = accept_handler
252
253 # Filters. (Usually, used by the key bindings to drive the buffer.)
254 self.complete_while_typing = complete_while_typing
255 self.validate_while_typing = validate_while_typing
256 self.enable_history_search = enable_history_search
257 self.read_only = read_only
258 self.multiline = multiline
259 self.max_number_of_completions = max_number_of_completions
260
261 # Text width. (For wrapping, used by the Vi 'gq' operator.)
262 self.text_width = 0
263
264 #: The command buffer history.
265 # Note that we shouldn't use a lazy 'or' here. bool(history) could be
266 # False when empty.
267 self.history = InMemoryHistory() if history is None else history
268
269 self.__cursor_position = 0
270
271 # Events
272 self.on_text_changed: Event[Buffer] = Event(self, on_text_changed)
273 self.on_text_insert: Event[Buffer] = Event(self, on_text_insert)
274 self.on_cursor_position_changed: Event[Buffer] = Event(
275 self, on_cursor_position_changed
276 )
277 self.on_completions_changed: Event[Buffer] = Event(self, on_completions_changed)
278 self.on_suggestion_set: Event[Buffer] = Event(self, on_suggestion_set)
279
280 # Document cache. (Avoid creating new Document instances.)
281 self._document_cache: FastDictCache[
282 tuple[str, int, SelectionState | None], Document
283 ] = FastDictCache(Document, size=10)
284
285 # Create completer / auto suggestion / validation coroutines.
286 self._async_suggester = self._create_auto_suggest_coroutine()
287 self._async_completer = self._create_completer_coroutine()
288 self._async_validator = self._create_auto_validate_coroutine()
289
290 # Asyncio task for populating the history.
291 self._load_history_task: asyncio.Future[None] | None = None
292
293 # Reset other attributes.
294 self.reset(document=document)
295
296 def __repr__(self) -> str:
297 if len(self.text) < 15:
298 text = self.text
299 else:
300 text = self.text[:12] + "..."
301
302 return f"<Buffer(name={self.name!r}, text={text!r}) at {id(self)!r}>"
303
304 def reset(
305 self, document: Document | None = None, append_to_history: bool = False
306 ) -> None:
307 """
308 :param append_to_history: Append current input to history first.
309 """
310 if append_to_history:
311 self.append_to_history()
312
313 document = document or Document()
314
315 self.__cursor_position = document.cursor_position
316
317 # `ValidationError` instance. (Will be set when the input is wrong.)
318 self.validation_error: ValidationError | None = None
319 self.validation_state: ValidationState | None = ValidationState.UNKNOWN
320
321 # State of the selection.
322 self.selection_state: SelectionState | None = None
323
324 # Multiple cursor mode. (When we press 'I' or 'A' in visual-block mode,
325 # we can insert text on multiple lines at once. This is implemented by
326 # using multiple cursors.)
327 self.multiple_cursor_positions: list[int] = []
328
329 # When doing consecutive up/down movements, prefer to stay at this column.
330 self.preferred_column: int | None = None
331
332 # State of complete browser
333 # For interactive completion through Ctrl-N/Ctrl-P.
334 self.complete_state: CompletionState | None = None
335
336 # State of Emacs yank-nth-arg completion.
337 self.yank_nth_arg_state: YankNthArgState | None = None # for yank-nth-arg.
338
339 # Remember the document that we had *right before* the last paste
340 # operation. This is used for rotating through the kill ring.
341 self.document_before_paste: Document | None = None
342
343 # Current suggestion.
344 self.suggestion: Suggestion | None = None
345
346 # The history search text. (Used for filtering the history when we
347 # browse through it.)
348 self.history_search_text: str | None = None
349
350 # Undo/redo stacks (stack of `(text, cursor_position)`).
351 self._undo_stack: list[tuple[str, int]] = []
352 self._redo_stack: list[tuple[str, int]] = []
353
354 # Cancel history loader. If history loading was still ongoing.
355 # Cancel the `_load_history_task`, so that next repaint of the
356 # `BufferControl` we will repopulate it.
357 if self._load_history_task is not None:
358 self._load_history_task.cancel()
359 self._load_history_task = None
360
361 #: The working lines. Similar to history, except that this can be
362 #: modified. The user can press arrow_up and edit previous entries.
363 #: Ctrl-C should reset this, and copy the whole history back in here.
364 #: Enter should process the current command and append to the real
365 #: history.
366 self._working_lines: deque[str] = deque([document.text])
367 self.__working_index = 0
368
369 def load_history_if_not_yet_loaded(self) -> None:
370 """
371 Create task for populating the buffer history (if not yet done).
372
373 Note::
374
375 This needs to be called from within the event loop of the
376 application, because history loading is async, and we need to be
377 sure the right event loop is active. Therefor, we call this method
378 in the `BufferControl.create_content`.
379
380 There are situations where prompt_toolkit applications are created
381 in one thread, but will later run in a different thread (Ptpython
382 is one example. The REPL runs in a separate thread, in order to
383 prevent interfering with a potential different event loop in the
384 main thread. The REPL UI however is still created in the main
385 thread.) We could decide to not support creating prompt_toolkit
386 objects in one thread and running the application in a different
387 thread, but history loading is the only place where it matters, and
388 this solves it.
389 """
390 if self._load_history_task is None:
391
392 async def load_history() -> None:
393 async for item in self.history.load():
394 self._working_lines.appendleft(item)
395 self.__working_index += 1
396
397 self._load_history_task = get_app().create_background_task(load_history())
398
399 def load_history_done(f: asyncio.Future[None]) -> None:
400 """
401 Handle `load_history` result when either done, cancelled, or
402 when an exception was raised.
403 """
404 try:
405 f.result()
406 except asyncio.CancelledError:
407 # Ignore cancellation. But handle it, so that we don't get
408 # this traceback.
409 pass
410 except GeneratorExit:
411 # Probably not needed, but we had situations where
412 # `GeneratorExit` was raised in `load_history` during
413 # cancellation.
414 pass
415 except BaseException:
416 # Log error if something goes wrong. (We don't have a
417 # caller to which we can propagate this exception.)
418 logger.exception("Loading history failed")
419
420 self._load_history_task.add_done_callback(load_history_done)
421
422 # <getters/setters>
423
424 def _set_text(self, value: str) -> bool:
425 """set text at current working_index. Return whether it changed."""
426 working_index = self.working_index
427 working_lines = self._working_lines
428
429 original_value = working_lines[working_index]
430 working_lines[working_index] = value
431
432 # Return True when this text has been changed.
433 if len(value) != len(original_value):
434 # For Python 2, it seems that when two strings have a different
435 # length and one is a prefix of the other, Python still scans
436 # character by character to see whether the strings are different.
437 # (Some benchmarking showed significant differences for big
438 # documents. >100,000 of lines.)
439 return True
440 elif value != original_value:
441 return True
442 return False
443
444 def _set_cursor_position(self, value: int) -> bool:
445 """Set cursor position. Return whether it changed."""
446 original_position = self.__cursor_position
447 self.__cursor_position = max(0, value)
448
449 return self.__cursor_position != original_position
450
451 @property
452 def text(self) -> str:
453 return self._working_lines[self.working_index]
454
455 @text.setter
456 def text(self, value: str) -> None:
457 """
458 Setting text. (When doing this, make sure that the cursor_position is
459 valid for this text. text/cursor_position should be consistent at any time,
460 otherwise set a Document instead.)
461 """
462 # Ensure cursor position remains within the size of the text.
463 if self.cursor_position > len(value):
464 self.cursor_position = len(value)
465
466 # Don't allow editing of read-only buffers.
467 if self.read_only():
468 raise EditReadOnlyBuffer()
469
470 changed = self._set_text(value)
471
472 if changed:
473 self._text_changed()
474
475 # Reset history search text.
476 # (Note that this doesn't need to happen when working_index
477 # changes, which is when we traverse the history. That's why we
478 # don't do this in `self._text_changed`.)
479 self.history_search_text = None
480
481 @property
482 def cursor_position(self) -> int:
483 return self.__cursor_position
484
485 @cursor_position.setter
486 def cursor_position(self, value: int) -> None:
487 """
488 Setting cursor position.
489 """
490 assert isinstance(value, int)
491
492 # Ensure cursor position is within the size of the text.
493 if value > len(self.text):
494 value = len(self.text)
495 if value < 0:
496 value = 0
497
498 changed = self._set_cursor_position(value)
499
500 if changed:
501 self._cursor_position_changed()
502
503 @property
504 def working_index(self) -> int:
505 return self.__working_index
506
507 @working_index.setter
508 def working_index(self, value: int) -> None:
509 if self.__working_index != value:
510 self.__working_index = value
511 # Make sure to reset the cursor position, otherwise we end up in
512 # situations where the cursor position is out of the bounds of the
513 # text.
514 self.cursor_position = 0
515 self._text_changed()
516
517 def _text_changed(self) -> None:
518 # Remove any validation errors and complete state.
519 self.validation_error = None
520 self.validation_state = ValidationState.UNKNOWN
521 self.complete_state = None
522 self.yank_nth_arg_state = None
523 self.document_before_paste = None
524 self.selection_state = None
525 self.suggestion = None
526 self.preferred_column = None
527
528 # fire 'on_text_changed' event.
529 self.on_text_changed.fire()
530
531 # Input validation.
532 # (This happens on all change events, unlike auto completion, also when
533 # deleting text.)
534 if self.validator and self.validate_while_typing():
535 get_app().create_background_task(self._async_validator())
536
537 def _cursor_position_changed(self) -> None:
538 # Remove any complete state.
539 # (Input validation should only be undone when the cursor position
540 # changes.)
541 self.complete_state = None
542 self.yank_nth_arg_state = None
543 self.document_before_paste = None
544
545 # Unset preferred_column. (Will be set after the cursor movement, if
546 # required.)
547 self.preferred_column = None
548
549 # Note that the cursor position can change if we have a selection the
550 # new position of the cursor determines the end of the selection.
551
552 # fire 'on_cursor_position_changed' event.
553 self.on_cursor_position_changed.fire()
554
555 @property
556 def document(self) -> Document:
557 """
558 Return :class:`~prompt_toolkit.document.Document` instance from the
559 current text, cursor position and selection state.
560 """
561 return self._document_cache[
562 self.text, self.cursor_position, self.selection_state
563 ]
564
565 @document.setter
566 def document(self, value: Document) -> None:
567 """
568 Set :class:`~prompt_toolkit.document.Document` instance.
569
570 This will set both the text and cursor position at the same time, but
571 atomically. (Change events will be triggered only after both have been set.)
572 """
573 self.set_document(value)
574
575 def set_document(self, value: Document, bypass_readonly: bool = False) -> None:
576 """
577 Set :class:`~prompt_toolkit.document.Document` instance. Like the
578 ``document`` property, but accept an ``bypass_readonly`` argument.
579
580 :param bypass_readonly: When True, don't raise an
581 :class:`.EditReadOnlyBuffer` exception, even
582 when the buffer is read-only.
583
584 .. warning::
585
586 When this buffer is read-only and `bypass_readonly` was not passed,
587 the `EditReadOnlyBuffer` exception will be caught by the
588 `KeyProcessor` and is silently suppressed. This is important to
589 keep in mind when writing key bindings, because it won't do what
590 you expect, and there won't be a stack trace. Use try/finally
591 around this function if you need some cleanup code.
592 """
593 # Don't allow editing of read-only buffers.
594 if not bypass_readonly and self.read_only():
595 raise EditReadOnlyBuffer()
596
597 # Set text and cursor position first.
598 text_changed = self._set_text(value.text)
599 cursor_position_changed = self._set_cursor_position(value.cursor_position)
600
601 # Now handle change events. (We do this when text/cursor position is
602 # both set and consistent.)
603 if text_changed:
604 self._text_changed()
605 self.history_search_text = None
606
607 if cursor_position_changed:
608 self._cursor_position_changed()
609
610 @property
611 def is_returnable(self) -> bool:
612 """
613 True when there is something handling accept.
614 """
615 return bool(self.accept_handler)
616
617 # End of <getters/setters>
618
619 def save_to_undo_stack(self, clear_redo_stack: bool = True) -> None:
620 """
621 Safe current state (input text and cursor position), so that we can
622 restore it by calling undo.
623 """
624 # Safe if the text is different from the text at the top of the stack
625 # is different. If the text is the same, just update the cursor position.
626 if self._undo_stack and self._undo_stack[-1][0] == self.text:
627 self._undo_stack[-1] = (self._undo_stack[-1][0], self.cursor_position)
628 else:
629 self._undo_stack.append((self.text, self.cursor_position))
630
631 # Saving anything to the undo stack, clears the redo stack.
632 if clear_redo_stack:
633 self._redo_stack = []
634
635 def transform_lines(
636 self,
637 line_index_iterator: Iterable[int],
638 transform_callback: Callable[[str], str],
639 ) -> str:
640 """
641 Transforms the text on a range of lines.
642 When the iterator yield an index not in the range of lines that the
643 document contains, it skips them silently.
644
645 To uppercase some lines::
646
647 new_text = transform_lines(range(5,10), lambda text: text.upper())
648
649 :param line_index_iterator: Iterator of line numbers (int)
650 :param transform_callback: callable that takes the original text of a
651 line, and return the new text for this line.
652
653 :returns: The new text.
654 """
655 # Split lines
656 lines = self.text.split("\n")
657
658 # Apply transformation
659 for index in line_index_iterator:
660 try:
661 lines[index] = transform_callback(lines[index])
662 except IndexError:
663 pass
664
665 return "\n".join(lines)
666
667 def transform_current_line(self, transform_callback: Callable[[str], str]) -> None:
668 """
669 Apply the given transformation function to the current line.
670
671 :param transform_callback: callable that takes a string and return a new string.
672 """
673 document = self.document
674 a = document.cursor_position + document.get_start_of_line_position()
675 b = document.cursor_position + document.get_end_of_line_position()
676 self.text = (
677 document.text[:a]
678 + transform_callback(document.text[a:b])
679 + document.text[b:]
680 )
681
682 def transform_region(
683 self, from_: int, to: int, transform_callback: Callable[[str], str]
684 ) -> None:
685 """
686 Transform a part of the input string.
687
688 :param from_: (int) start position.
689 :param to: (int) end position.
690 :param transform_callback: Callable which accepts a string and returns
691 the transformed string.
692 """
693 assert from_ < to
694
695 self.text = "".join(
696 [
697 self.text[:from_]
698 + transform_callback(self.text[from_:to])
699 + self.text[to:]
700 ]
701 )
702
703 def cursor_left(self, count: int = 1) -> None:
704 self.cursor_position += self.document.get_cursor_left_position(count=count)
705
706 def cursor_right(self, count: int = 1) -> None:
707 self.cursor_position += self.document.get_cursor_right_position(count=count)
708
709 def cursor_up(self, count: int = 1) -> None:
710 """(for multiline edit). Move cursor to the previous line."""
711 original_column = self.preferred_column or self.document.cursor_position_col
712 self.cursor_position += self.document.get_cursor_up_position(
713 count=count, preferred_column=original_column
714 )
715
716 # Remember the original column for the next up/down movement.
717 self.preferred_column = original_column
718
719 def cursor_down(self, count: int = 1) -> None:
720 """(for multiline edit). Move cursor to the next line."""
721 original_column = self.preferred_column or self.document.cursor_position_col
722 self.cursor_position += self.document.get_cursor_down_position(
723 count=count, preferred_column=original_column
724 )
725
726 # Remember the original column for the next up/down movement.
727 self.preferred_column = original_column
728
729 def auto_up(
730 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False
731 ) -> None:
732 """
733 If we're not on the first line (of a multiline input) go a line up,
734 otherwise go back in history. (If nothing is selected.)
735 """
736 if self.complete_state:
737 self.complete_previous(count=count)
738 elif self.document.cursor_position_row > 0:
739 self.cursor_up(count=count)
740 elif not self.selection_state:
741 self.history_backward(count=count)
742
743 # Go to the start of the line?
744 if go_to_start_of_line_if_history_changes:
745 self.cursor_position += self.document.get_start_of_line_position()
746
747 def auto_down(
748 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False
749 ) -> None:
750 """
751 If we're not on the last line (of a multiline input) go a line down,
752 otherwise go forward in history. (If nothing is selected.)
753 """
754 if self.complete_state:
755 self.complete_next(count=count)
756 elif self.document.cursor_position_row < self.document.line_count - 1:
757 self.cursor_down(count=count)
758 elif not self.selection_state:
759 self.history_forward(count=count)
760
761 # Go to the start of the line?
762 if go_to_start_of_line_if_history_changes:
763 self.cursor_position += self.document.get_start_of_line_position()
764
765 def delete_before_cursor(self, count: int = 1) -> str:
766 """
767 Delete specified number of characters before cursor and return the
768 deleted text.
769 """
770 assert count >= 0
771 deleted = ""
772
773 if self.cursor_position > 0:
774 deleted = self.text[self.cursor_position - count : self.cursor_position]
775
776 new_text = (
777 self.text[: self.cursor_position - count]
778 + self.text[self.cursor_position :]
779 )
780 new_cursor_position = self.cursor_position - len(deleted)
781
782 # Set new Document atomically.
783 self.document = Document(new_text, new_cursor_position)
784
785 return deleted
786
787 def delete(self, count: int = 1) -> str:
788 """
789 Delete specified number of characters and Return the deleted text.
790 """
791 if self.cursor_position < len(self.text):
792 deleted = self.document.text_after_cursor[:count]
793 self.text = (
794 self.text[: self.cursor_position]
795 + self.text[self.cursor_position + len(deleted) :]
796 )
797 return deleted
798 else:
799 return ""
800
801 def join_next_line(self, separator: str = " ") -> None:
802 """
803 Join the next line to the current one by deleting the line ending after
804 the current line.
805 """
806 if not self.document.on_last_line:
807 self.cursor_position += self.document.get_end_of_line_position()
808 self.delete()
809
810 # Remove spaces.
811 self.text = (
812 self.document.text_before_cursor
813 + separator
814 + self.document.text_after_cursor.lstrip(" ")
815 )
816
817 def join_selected_lines(self, separator: str = " ") -> None:
818 """
819 Join the selected lines.
820 """
821 assert self.selection_state
822
823 # Get lines.
824 from_, to = sorted(
825 [self.cursor_position, self.selection_state.original_cursor_position]
826 )
827
828 before = self.text[:from_]
829 lines = self.text[from_:to].splitlines()
830 after = self.text[to:]
831
832 # Replace leading spaces with just one space.
833 lines = [l.lstrip(" ") + separator for l in lines]
834
835 # Set new document.
836 self.document = Document(
837 text=before + "".join(lines) + after,
838 cursor_position=len(before + "".join(lines[:-1])) - 1,
839 )
840
841 def swap_characters_before_cursor(self) -> None:
842 """
843 Swap the last two characters before the cursor.
844 """
845 pos = self.cursor_position
846
847 if pos >= 2:
848 a = self.text[pos - 2]
849 b = self.text[pos - 1]
850
851 self.text = self.text[: pos - 2] + b + a + self.text[pos:]
852
853 def go_to_history(self, index: int) -> None:
854 """
855 Go to this item in the history.
856 """
857 if index < len(self._working_lines):
858 self.working_index = index
859 self.cursor_position = len(self.text)
860
861 def complete_next(self, count: int = 1, disable_wrap_around: bool = False) -> None:
862 """
863 Browse to the next completions.
864 (Does nothing if there are no completion.)
865 """
866 index: int | None
867
868 if self.complete_state:
869 completions_count = len(self.complete_state.completions)
870
871 if self.complete_state.complete_index is None:
872 index = 0
873 elif self.complete_state.complete_index == completions_count - 1:
874 index = None
875
876 if disable_wrap_around:
877 return
878 else:
879 index = min(
880 completions_count - 1, self.complete_state.complete_index + count
881 )
882 self.go_to_completion(index)
883
884 def complete_previous(
885 self, count: int = 1, disable_wrap_around: bool = False
886 ) -> None:
887 """
888 Browse to the previous completions.
889 (Does nothing if there are no completion.)
890 """
891 index: int | None
892
893 if self.complete_state:
894 if self.complete_state.complete_index == 0:
895 index = None
896
897 if disable_wrap_around:
898 return
899 elif self.complete_state.complete_index is None:
900 index = len(self.complete_state.completions) - 1
901 else:
902 index = max(0, self.complete_state.complete_index - count)
903
904 self.go_to_completion(index)
905
906 def cancel_completion(self) -> None:
907 """
908 Cancel completion, go back to the original text.
909 """
910 if self.complete_state:
911 self.go_to_completion(None)
912 self.complete_state = None
913
914 def _set_completions(self, completions: list[Completion]) -> CompletionState:
915 """
916 Start completions. (Generate list of completions and initialize.)
917
918 By default, no completion will be selected.
919 """
920 self.complete_state = CompletionState(
921 original_document=self.document, completions=completions
922 )
923
924 # Trigger event. This should eventually invalidate the layout.
925 self.on_completions_changed.fire()
926
927 return self.complete_state
928
929 def start_history_lines_completion(self) -> None:
930 """
931 Start a completion based on all the other lines in the document and the
932 history.
933 """
934 found_completions: set[str] = set()
935 completions = []
936
937 # For every line of the whole history, find matches with the current line.
938 current_line = self.document.current_line_before_cursor.lstrip()
939
940 for i, string in enumerate(self._working_lines):
941 for j, l in enumerate(string.split("\n")):
942 l = l.strip()
943 if l and l.startswith(current_line):
944 # When a new line has been found.
945 if l not in found_completions:
946 found_completions.add(l)
947
948 # Create completion.
949 if i == self.working_index:
950 display_meta = "Current, line %s" % (j + 1)
951 else:
952 display_meta = f"History {i + 1}, line {j + 1}"
953
954 completions.append(
955 Completion(
956 text=l,
957 start_position=-len(current_line),
958 display_meta=display_meta,
959 )
960 )
961
962 self._set_completions(completions=completions[::-1])
963 self.go_to_completion(0)
964
965 def go_to_completion(self, index: int | None) -> None:
966 """
967 Select a completion from the list of current completions.
968 """
969 assert self.complete_state
970
971 # Set new completion
972 state = self.complete_state
973 state.go_to_index(index)
974
975 # Set text/cursor position
976 new_text, new_cursor_position = state.new_text_and_position()
977 self.document = Document(new_text, new_cursor_position)
978
979 # (changing text/cursor position will unset complete_state.)
980 self.complete_state = state
981
982 def apply_completion(self, completion: Completion) -> None:
983 """
984 Insert a given completion.
985 """
986 # If there was already a completion active, cancel that one.
987 if self.complete_state:
988 self.go_to_completion(None)
989 self.complete_state = None
990
991 # Insert text from the given completion.
992 self.delete_before_cursor(-completion.start_position)
993 self.insert_text(completion.text)
994
995 def _set_history_search(self) -> None:
996 """
997 Set `history_search_text`.
998 (The text before the cursor will be used for filtering the history.)
999 """
1000 if self.enable_history_search():
1001 if self.history_search_text is None:
1002 self.history_search_text = self.document.text_before_cursor
1003 else:
1004 self.history_search_text = None
1005
1006 def _history_matches(self, i: int) -> bool:
1007 """
1008 True when the current entry matches the history search.
1009 (when we don't have history search, it's also True.)
1010 """
1011 return self.history_search_text is None or self._working_lines[i].startswith(
1012 self.history_search_text
1013 )
1014
1015 def history_forward(self, count: int = 1) -> None:
1016 """
1017 Move forwards through the history.
1018
1019 :param count: Amount of items to move forward.
1020 """
1021 self._set_history_search()
1022
1023 # Go forward in history.
1024 found_something = False
1025
1026 for i in range(self.working_index + 1, len(self._working_lines)):
1027 if self._history_matches(i):
1028 self.working_index = i
1029 count -= 1
1030 found_something = True
1031 if count == 0:
1032 break
1033
1034 # If we found an entry, move cursor to the end of the first line.
1035 if found_something:
1036 self.cursor_position = 0
1037 self.cursor_position += self.document.get_end_of_line_position()
1038
1039 def history_backward(self, count: int = 1) -> None:
1040 """
1041 Move backwards through history.
1042 """
1043 self._set_history_search()
1044
1045 # Go back in history.
1046 found_something = False
1047
1048 for i in range(self.working_index - 1, -1, -1):
1049 if self._history_matches(i):
1050 self.working_index = i
1051 count -= 1
1052 found_something = True
1053 if count == 0:
1054 break
1055
1056 # If we move to another entry, move cursor to the end of the line.
1057 if found_something:
1058 self.cursor_position = len(self.text)
1059
1060 def yank_nth_arg(self, n: int | None = None, _yank_last_arg: bool = False) -> None:
1061 """
1062 Pick nth word from previous history entry (depending on current
1063 `yank_nth_arg_state`) and insert it at current position. Rotate through
1064 history if called repeatedly. If no `n` has been given, take the first
1065 argument. (The second word.)
1066
1067 :param n: (None or int), The index of the word from the previous line
1068 to take.
1069 """
1070 assert n is None or isinstance(n, int)
1071 history_strings = self.history.get_strings()
1072
1073 if not len(history_strings):
1074 return
1075
1076 # Make sure we have a `YankNthArgState`.
1077 if self.yank_nth_arg_state is None:
1078 state = YankNthArgState(n=-1 if _yank_last_arg else 1)
1079 else:
1080 state = self.yank_nth_arg_state
1081
1082 if n is not None:
1083 state.n = n
1084
1085 # Get new history position.
1086 new_pos = state.history_position - 1
1087 if -new_pos > len(history_strings):
1088 new_pos = -1
1089
1090 # Take argument from line.
1091 line = history_strings[new_pos]
1092
1093 words = [w.strip() for w in _QUOTED_WORDS_RE.split(line)]
1094 words = [w for w in words if w]
1095 try:
1096 word = words[state.n]
1097 except IndexError:
1098 word = ""
1099
1100 # Insert new argument.
1101 if state.previous_inserted_word:
1102 self.delete_before_cursor(len(state.previous_inserted_word))
1103 self.insert_text(word)
1104
1105 # Save state again for next completion. (Note that the 'insert'
1106 # operation from above clears `self.yank_nth_arg_state`.)
1107 state.previous_inserted_word = word
1108 state.history_position = new_pos
1109 self.yank_nth_arg_state = state
1110
1111 def yank_last_arg(self, n: int | None = None) -> None:
1112 """
1113 Like `yank_nth_arg`, but if no argument has been given, yank the last
1114 word by default.
1115 """
1116 self.yank_nth_arg(n=n, _yank_last_arg=True)
1117
1118 def start_selection(
1119 self, selection_type: SelectionType = SelectionType.CHARACTERS
1120 ) -> None:
1121 """
1122 Take the current cursor position as the start of this selection.
1123 """
1124 self.selection_state = SelectionState(self.cursor_position, selection_type)
1125
1126 def copy_selection(self, _cut: bool = False) -> ClipboardData:
1127 """
1128 Copy selected text and return :class:`.ClipboardData` instance.
1129
1130 Notice that this doesn't store the copied data on the clipboard yet.
1131 You can store it like this:
1132
1133 .. code:: python
1134
1135 data = buffer.copy_selection()
1136 get_app().clipboard.set_data(data)
1137 """
1138 new_document, clipboard_data = self.document.cut_selection()
1139 if _cut:
1140 self.document = new_document
1141
1142 self.selection_state = None
1143 return clipboard_data
1144
1145 def cut_selection(self) -> ClipboardData:
1146 """
1147 Delete selected text and return :class:`.ClipboardData` instance.
1148 """
1149 return self.copy_selection(_cut=True)
1150
1151 def paste_clipboard_data(
1152 self,
1153 data: ClipboardData,
1154 paste_mode: PasteMode = PasteMode.EMACS,
1155 count: int = 1,
1156 ) -> None:
1157 """
1158 Insert the data from the clipboard.
1159 """
1160 assert isinstance(data, ClipboardData)
1161 assert paste_mode in (PasteMode.VI_BEFORE, PasteMode.VI_AFTER, PasteMode.EMACS)
1162
1163 original_document = self.document
1164 self.document = self.document.paste_clipboard_data(
1165 data, paste_mode=paste_mode, count=count
1166 )
1167
1168 # Remember original document. This assignment should come at the end,
1169 # because assigning to 'document' will erase it.
1170 self.document_before_paste = original_document
1171
1172 def newline(self, copy_margin: bool = True) -> None:
1173 """
1174 Insert a line ending at the current position.
1175 """
1176 if copy_margin:
1177 self.insert_text("\n" + self.document.leading_whitespace_in_current_line)
1178 else:
1179 self.insert_text("\n")
1180
1181 def insert_line_above(self, copy_margin: bool = True) -> None:
1182 """
1183 Insert a new line above the current one.
1184 """
1185 if copy_margin:
1186 insert = self.document.leading_whitespace_in_current_line + "\n"
1187 else:
1188 insert = "\n"
1189
1190 self.cursor_position += self.document.get_start_of_line_position()
1191 self.insert_text(insert)
1192 self.cursor_position -= 1
1193
1194 def insert_line_below(self, copy_margin: bool = True) -> None:
1195 """
1196 Insert a new line below the current one.
1197 """
1198 if copy_margin:
1199 insert = "\n" + self.document.leading_whitespace_in_current_line
1200 else:
1201 insert = "\n"
1202
1203 self.cursor_position += self.document.get_end_of_line_position()
1204 self.insert_text(insert)
1205
1206 def insert_text(
1207 self,
1208 data: str,
1209 overwrite: bool = False,
1210 move_cursor: bool = True,
1211 fire_event: bool = True,
1212 ) -> None:
1213 """
1214 Insert characters at cursor position.
1215
1216 :param fire_event: Fire `on_text_insert` event. This is mainly used to
1217 trigger autocompletion while typing.
1218 """
1219 # Original text & cursor position.
1220 otext = self.text
1221 ocpos = self.cursor_position
1222
1223 # In insert/text mode.
1224 if overwrite:
1225 # Don't overwrite the newline itself. Just before the line ending,
1226 # it should act like insert mode.
1227 overwritten_text = otext[ocpos : ocpos + len(data)]
1228 if "\n" in overwritten_text:
1229 overwritten_text = overwritten_text[: overwritten_text.find("\n")]
1230
1231 text = otext[:ocpos] + data + otext[ocpos + len(overwritten_text) :]
1232 else:
1233 text = otext[:ocpos] + data + otext[ocpos:]
1234
1235 if move_cursor:
1236 cpos = self.cursor_position + len(data)
1237 else:
1238 cpos = self.cursor_position
1239
1240 # Set new document.
1241 # (Set text and cursor position at the same time. Otherwise, setting
1242 # the text will fire a change event before the cursor position has been
1243 # set. It works better to have this atomic.)
1244 self.document = Document(text, cpos)
1245
1246 # Fire 'on_text_insert' event.
1247 if fire_event: # XXX: rename to `start_complete`.
1248 self.on_text_insert.fire()
1249
1250 # Only complete when "complete_while_typing" is enabled.
1251 if self.completer and self.complete_while_typing():
1252 get_app().create_background_task(self._async_completer())
1253
1254 # Call auto_suggest.
1255 if self.auto_suggest:
1256 get_app().create_background_task(self._async_suggester())
1257
1258 def undo(self) -> None:
1259 # Pop from the undo-stack until we find a text that if different from
1260 # the current text. (The current logic of `save_to_undo_stack` will
1261 # cause that the top of the undo stack is usually the same as the
1262 # current text, so in that case we have to pop twice.)
1263 while self._undo_stack:
1264 text, pos = self._undo_stack.pop()
1265
1266 if text != self.text:
1267 # Push current text to redo stack.
1268 self._redo_stack.append((self.text, self.cursor_position))
1269
1270 # Set new text/cursor_position.
1271 self.document = Document(text, cursor_position=pos)
1272 break
1273
1274 def redo(self) -> None:
1275 if self._redo_stack:
1276 # Copy current state on undo stack.
1277 self.save_to_undo_stack(clear_redo_stack=False)
1278
1279 # Pop state from redo stack.
1280 text, pos = self._redo_stack.pop()
1281 self.document = Document(text, cursor_position=pos)
1282
1283 def validate(self, set_cursor: bool = False) -> bool:
1284 """
1285 Returns `True` if valid.
1286
1287 :param set_cursor: Set the cursor position, if an error was found.
1288 """
1289 # Don't call the validator again, if it was already called for the
1290 # current input.
1291 if self.validation_state != ValidationState.UNKNOWN:
1292 return self.validation_state == ValidationState.VALID
1293
1294 # Call validator.
1295 if self.validator:
1296 try:
1297 self.validator.validate(self.document)
1298 except ValidationError as e:
1299 # Set cursor position (don't allow invalid values.)
1300 if set_cursor:
1301 self.cursor_position = min(
1302 max(0, e.cursor_position), len(self.text)
1303 )
1304
1305 self.validation_state = ValidationState.INVALID
1306 self.validation_error = e
1307 return False
1308
1309 # Handle validation result.
1310 self.validation_state = ValidationState.VALID
1311 self.validation_error = None
1312 return True
1313
1314 async def _validate_async(self) -> None:
1315 """
1316 Asynchronous version of `validate()`.
1317 This one doesn't set the cursor position.
1318
1319 We have both variants, because a synchronous version is required.
1320 Handling the ENTER key needs to be completely synchronous, otherwise
1321 stuff like type-ahead is going to give very weird results. (People
1322 could type input while the ENTER key is still processed.)
1323
1324 An asynchronous version is required if we have `validate_while_typing`
1325 enabled.
1326 """
1327 while True:
1328 # Don't call the validator again, if it was already called for the
1329 # current input.
1330 if self.validation_state != ValidationState.UNKNOWN:
1331 return
1332
1333 # Call validator.
1334 error = None
1335 document = self.document
1336
1337 if self.validator:
1338 try:
1339 await self.validator.validate_async(self.document)
1340 except ValidationError as e:
1341 error = e
1342
1343 # If the document changed during the validation, try again.
1344 if self.document != document:
1345 continue
1346
1347 # Handle validation result.
1348 if error:
1349 self.validation_state = ValidationState.INVALID
1350 else:
1351 self.validation_state = ValidationState.VALID
1352
1353 self.validation_error = error
1354 get_app().invalidate() # Trigger redraw (display error).
1355
1356 def append_to_history(self) -> None:
1357 """
1358 Append the current input to the history.
1359 """
1360 # Save at the tail of the history. (But don't if the last entry the
1361 # history is already the same.)
1362 if self.text:
1363 history_strings = self.history.get_strings()
1364 if not len(history_strings) or history_strings[-1] != self.text:
1365 self.history.append_string(self.text)
1366
1367 def _search(
1368 self,
1369 search_state: SearchState,
1370 include_current_position: bool = False,
1371 count: int = 1,
1372 ) -> tuple[int, int] | None:
1373 """
1374 Execute search. Return (working_index, cursor_position) tuple when this
1375 search is applied. Returns `None` when this text cannot be found.
1376 """
1377 assert count > 0
1378
1379 text = search_state.text
1380 direction = search_state.direction
1381 ignore_case = search_state.ignore_case()
1382
1383 def search_once(
1384 working_index: int, document: Document
1385 ) -> tuple[int, Document] | None:
1386 """
1387 Do search one time.
1388 Return (working_index, document) or `None`
1389 """
1390 if direction == SearchDirection.FORWARD:
1391 # Try find at the current input.
1392 new_index = document.find(
1393 text,
1394 include_current_position=include_current_position,
1395 ignore_case=ignore_case,
1396 )
1397
1398 if new_index is not None:
1399 return (
1400 working_index,
1401 Document(document.text, document.cursor_position + new_index),
1402 )
1403 else:
1404 # No match, go forward in the history. (Include len+1 to wrap around.)
1405 # (Here we should always include all cursor positions, because
1406 # it's a different line.)
1407 for i in range(working_index + 1, len(self._working_lines) + 1):
1408 i %= len(self._working_lines)
1409
1410 document = Document(self._working_lines[i], 0)
1411 new_index = document.find(
1412 text, include_current_position=True, ignore_case=ignore_case
1413 )
1414 if new_index is not None:
1415 return (i, Document(document.text, new_index))
1416 else:
1417 # Try find at the current input.
1418 new_index = document.find_backwards(text, ignore_case=ignore_case)
1419
1420 if new_index is not None:
1421 return (
1422 working_index,
1423 Document(document.text, document.cursor_position + new_index),
1424 )
1425 else:
1426 # No match, go back in the history. (Include -1 to wrap around.)
1427 for i in range(working_index - 1, -2, -1):
1428 i %= len(self._working_lines)
1429
1430 document = Document(
1431 self._working_lines[i], len(self._working_lines[i])
1432 )
1433 new_index = document.find_backwards(
1434 text, ignore_case=ignore_case
1435 )
1436 if new_index is not None:
1437 return (
1438 i,
1439 Document(document.text, len(document.text) + new_index),
1440 )
1441 return None
1442
1443 # Do 'count' search iterations.
1444 working_index = self.working_index
1445 document = self.document
1446 for _ in range(count):
1447 result = search_once(working_index, document)
1448 if result is None:
1449 return None # Nothing found.
1450 else:
1451 working_index, document = result
1452
1453 return (working_index, document.cursor_position)
1454
1455 def document_for_search(self, search_state: SearchState) -> Document:
1456 """
1457 Return a :class:`~prompt_toolkit.document.Document` instance that has
1458 the text/cursor position for this search, if we would apply it. This
1459 will be used in the
1460 :class:`~prompt_toolkit.layout.BufferControl` to display feedback while
1461 searching.
1462 """
1463 search_result = self._search(search_state, include_current_position=True)
1464
1465 if search_result is None:
1466 return self.document
1467 else:
1468 working_index, cursor_position = search_result
1469
1470 # Keep selection, when `working_index` was not changed.
1471 if working_index == self.working_index:
1472 selection = self.selection_state
1473 else:
1474 selection = None
1475
1476 return Document(
1477 self._working_lines[working_index], cursor_position, selection=selection
1478 )
1479
1480 def get_search_position(
1481 self,
1482 search_state: SearchState,
1483 include_current_position: bool = True,
1484 count: int = 1,
1485 ) -> int:
1486 """
1487 Get the cursor position for this search.
1488 (This operation won't change the `working_index`. It's won't go through
1489 the history. Vi text objects can't span multiple items.)
1490 """
1491 search_result = self._search(
1492 search_state, include_current_position=include_current_position, count=count
1493 )
1494
1495 if search_result is None:
1496 return self.cursor_position
1497 else:
1498 working_index, cursor_position = search_result
1499 return cursor_position
1500
1501 def apply_search(
1502 self,
1503 search_state: SearchState,
1504 include_current_position: bool = True,
1505 count: int = 1,
1506 ) -> None:
1507 """
1508 Apply search. If something is found, set `working_index` and
1509 `cursor_position`.
1510 """
1511 search_result = self._search(
1512 search_state, include_current_position=include_current_position, count=count
1513 )
1514
1515 if search_result is not None:
1516 working_index, cursor_position = search_result
1517 self.working_index = working_index
1518 self.cursor_position = cursor_position
1519
1520 def exit_selection(self) -> None:
1521 self.selection_state = None
1522
1523 def _editor_simple_tempfile(self) -> tuple[str, Callable[[], None]]:
1524 """
1525 Simple (file) tempfile implementation.
1526 Return (tempfile, cleanup_func).
1527 """
1528 suffix = to_str(self.tempfile_suffix)
1529 descriptor, filename = tempfile.mkstemp(suffix)
1530
1531 os.write(descriptor, self.text.encode("utf-8"))
1532 os.close(descriptor)
1533
1534 def cleanup() -> None:
1535 os.unlink(filename)
1536
1537 return filename, cleanup
1538
1539 def _editor_complex_tempfile(self) -> tuple[str, Callable[[], None]]:
1540 # Complex (directory) tempfile implementation.
1541 headtail = to_str(self.tempfile)
1542 if not headtail:
1543 # Revert to simple case.
1544 return self._editor_simple_tempfile()
1545 headtail = str(headtail)
1546
1547 # Try to make according to tempfile logic.
1548 head, tail = os.path.split(headtail)
1549 if os.path.isabs(head):
1550 head = head[1:]
1551
1552 dirpath = tempfile.mkdtemp()
1553 if head:
1554 dirpath = os.path.join(dirpath, head)
1555 # Assume there is no issue creating dirs in this temp dir.
1556 os.makedirs(dirpath)
1557
1558 # Open the filename and write current text.
1559 filename = os.path.join(dirpath, tail)
1560 with open(filename, "w", encoding="utf-8") as fh:
1561 fh.write(self.text)
1562
1563 def cleanup() -> None:
1564 shutil.rmtree(dirpath)
1565
1566 return filename, cleanup
1567
1568 def open_in_editor(self, validate_and_handle: bool = False) -> asyncio.Task[None]:
1569 """
1570 Open code in editor.
1571
1572 This returns a future, and runs in a thread executor.
1573 """
1574 if self.read_only():
1575 raise EditReadOnlyBuffer()
1576
1577 # Write current text to temporary file
1578 if self.tempfile:
1579 filename, cleanup_func = self._editor_complex_tempfile()
1580 else:
1581 filename, cleanup_func = self._editor_simple_tempfile()
1582
1583 async def run() -> None:
1584 try:
1585 # Open in editor
1586 # (We need to use `run_in_terminal`, because not all editors go to
1587 # the alternate screen buffer, and some could influence the cursor
1588 # position.)
1589 success = await run_in_terminal(
1590 lambda: self._open_file_in_editor(filename), in_executor=True
1591 )
1592
1593 # Read content again.
1594 if success:
1595 with open(filename, "rb") as f:
1596 text = f.read().decode("utf-8")
1597
1598 # Drop trailing newline. (Editors are supposed to add it at the
1599 # end, but we don't need it.)
1600 if text.endswith("\n"):
1601 text = text[:-1]
1602
1603 self.document = Document(text=text, cursor_position=len(text))
1604
1605 # Accept the input.
1606 if validate_and_handle:
1607 self.validate_and_handle()
1608
1609 finally:
1610 # Clean up temp dir/file.
1611 cleanup_func()
1612
1613 return get_app().create_background_task(run())
1614
1615 def _open_file_in_editor(self, filename: str) -> bool:
1616 """
1617 Call editor executable.
1618
1619 Return True when we received a zero return code.
1620 """
1621 # If the 'VISUAL' or 'EDITOR' environment variable has been set, use that.
1622 # Otherwise, fall back to the first available editor that we can find.
1623 visual = os.environ.get("VISUAL")
1624 editor = os.environ.get("EDITOR")
1625
1626 editors = [
1627 visual,
1628 editor,
1629 # Order of preference.
1630 "/usr/bin/editor",
1631 "/usr/bin/nano",
1632 "/usr/bin/pico",
1633 "/usr/bin/vi",
1634 "/usr/bin/emacs",
1635 ]
1636
1637 for e in editors:
1638 if e:
1639 try:
1640 # Use 'shlex.split()', because $VISUAL can contain spaces
1641 # and quotes.
1642 returncode = subprocess.call(shlex.split(e) + [filename])
1643 return returncode == 0
1644
1645 except OSError:
1646 # Executable does not exist, try the next one.
1647 pass
1648
1649 return False
1650
1651 def start_completion(
1652 self,
1653 select_first: bool = False,
1654 select_last: bool = False,
1655 insert_common_part: bool = False,
1656 complete_event: CompleteEvent | None = None,
1657 ) -> None:
1658 """
1659 Start asynchronous autocompletion of this buffer.
1660 (This will do nothing if a previous completion was still in progress.)
1661 """
1662 # Only one of these options can be selected.
1663 assert select_first + select_last + insert_common_part <= 1
1664
1665 get_app().create_background_task(
1666 self._async_completer(
1667 select_first=select_first,
1668 select_last=select_last,
1669 insert_common_part=insert_common_part,
1670 complete_event=complete_event
1671 or CompleteEvent(completion_requested=True),
1672 )
1673 )
1674
1675 def _create_completer_coroutine(self) -> Callable[..., Coroutine[Any, Any, None]]:
1676 """
1677 Create function for asynchronous autocompletion.
1678
1679 (This consumes the asynchronous completer generator, which possibly
1680 runs the completion algorithm in another thread.)
1681 """
1682
1683 def completion_does_nothing(document: Document, completion: Completion) -> bool:
1684 """
1685 Return `True` if applying this completion doesn't have any effect.
1686 (When it doesn't insert any new text.
1687 """
1688 text_before_cursor = document.text_before_cursor
1689 replaced_text = text_before_cursor[
1690 len(text_before_cursor) + completion.start_position :
1691 ]
1692 return replaced_text == completion.text
1693
1694 @_only_one_at_a_time
1695 async def async_completer(
1696 select_first: bool = False,
1697 select_last: bool = False,
1698 insert_common_part: bool = False,
1699 complete_event: CompleteEvent | None = None,
1700 ) -> None:
1701 document = self.document
1702 complete_event = complete_event or CompleteEvent(text_inserted=True)
1703
1704 # Don't complete when we already have completions.
1705 if self.complete_state or not self.completer:
1706 return
1707
1708 # Create an empty CompletionState.
1709 complete_state = CompletionState(original_document=self.document)
1710 self.complete_state = complete_state
1711
1712 def proceed() -> bool:
1713 """Keep retrieving completions. Input text has not yet changed
1714 while generating completions."""
1715 return self.complete_state == complete_state
1716
1717 refresh_needed = asyncio.Event()
1718
1719 async def refresh_while_loading() -> None:
1720 """Background loop to refresh the UI at most 3 times a second
1721 while the completion are loading. Calling
1722 `on_completions_changed.fire()` for every completion that we
1723 receive is too expensive when there are many completions. (We
1724 could tune `Application.max_render_postpone_time` and
1725 `Application.min_redraw_interval`, but having this here is a
1726 better approach.)
1727 """
1728 while True:
1729 self.on_completions_changed.fire()
1730 refresh_needed.clear()
1731 await asyncio.sleep(0.3)
1732 await refresh_needed.wait()
1733
1734 refresh_task = asyncio.ensure_future(refresh_while_loading())
1735 try:
1736 # Load.
1737 async with aclosing(
1738 self.completer.get_completions_async(document, complete_event)
1739 ) as async_generator:
1740 async for completion in async_generator:
1741 complete_state.completions.append(completion)
1742 refresh_needed.set()
1743
1744 # If the input text changes, abort.
1745 if not proceed():
1746 break
1747
1748 # Always stop at 10k completions.
1749 if (
1750 len(complete_state.completions)
1751 >= self.max_number_of_completions
1752 ):
1753 break
1754 finally:
1755 refresh_task.cancel()
1756
1757 # Refresh one final time after we got everything.
1758 self.on_completions_changed.fire()
1759
1760 completions = complete_state.completions
1761
1762 # When there is only one completion, which has nothing to add, ignore it.
1763 if len(completions) == 1 and completion_does_nothing(
1764 document, completions[0]
1765 ):
1766 del completions[:]
1767
1768 # Set completions if the text was not yet changed.
1769 if proceed():
1770 # When no completions were found, or when the user selected
1771 # already a completion by using the arrow keys, don't do anything.
1772 if (
1773 not self.complete_state
1774 or self.complete_state.complete_index is not None
1775 ):
1776 return
1777
1778 # When there are no completions, reset completion state anyway.
1779 if not completions:
1780 self.complete_state = None
1781 # Render the ui if the completion menu was shown
1782 # it is needed especially if there is one completion and it was deleted.
1783 self.on_completions_changed.fire()
1784 return
1785
1786 # Select first/last or insert common part, depending on the key
1787 # binding. (For this we have to wait until all completions are
1788 # loaded.)
1789
1790 if select_first:
1791 self.go_to_completion(0)
1792
1793 elif select_last:
1794 self.go_to_completion(len(completions) - 1)
1795
1796 elif insert_common_part:
1797 common_part = get_common_complete_suffix(document, completions)
1798 if common_part:
1799 # Insert the common part, update completions.
1800 self.insert_text(common_part)
1801 if len(completions) > 1:
1802 # (Don't call `async_completer` again, but
1803 # recalculate completions. See:
1804 # https://github.com/ipython/ipython/issues/9658)
1805 completions[:] = [
1806 c.new_completion_from_position(len(common_part))
1807 for c in completions
1808 ]
1809
1810 self._set_completions(completions=completions)
1811 else:
1812 self.complete_state = None
1813 else:
1814 # When we were asked to insert the "common"
1815 # prefix, but there was no common suffix but
1816 # still exactly one match, then select the
1817 # first. (It could be that we have a completion
1818 # which does * expansion, like '*.py', with
1819 # exactly one match.)
1820 if len(completions) == 1:
1821 self.go_to_completion(0)
1822
1823 else:
1824 # If the last operation was an insert, (not a delete), restart
1825 # the completion coroutine.
1826
1827 if self.document.text_before_cursor == document.text_before_cursor:
1828 return # Nothing changed.
1829
1830 if self.document.text_before_cursor.startswith(
1831 document.text_before_cursor
1832 ):
1833 raise _Retry
1834
1835 return async_completer
1836
1837 def _create_auto_suggest_coroutine(self) -> Callable[[], Coroutine[Any, Any, None]]:
1838 """
1839 Create function for asynchronous auto suggestion.
1840 (This can be in another thread.)
1841 """
1842
1843 @_only_one_at_a_time
1844 async def async_suggestor() -> None:
1845 document = self.document
1846
1847 # Don't suggest when we already have a suggestion.
1848 if self.suggestion or not self.auto_suggest:
1849 return
1850
1851 suggestion = await self.auto_suggest.get_suggestion_async(self, document)
1852
1853 # Set suggestion only if the text was not yet changed.
1854 if self.document == document:
1855 # Set suggestion and redraw interface.
1856 self.suggestion = suggestion
1857 self.on_suggestion_set.fire()
1858 else:
1859 # Otherwise, restart thread.
1860 raise _Retry
1861
1862 return async_suggestor
1863
1864 def _create_auto_validate_coroutine(
1865 self,
1866 ) -> Callable[[], Coroutine[Any, Any, None]]:
1867 """
1868 Create a function for asynchronous validation while typing.
1869 (This can be in another thread.)
1870 """
1871
1872 @_only_one_at_a_time
1873 async def async_validator() -> None:
1874 await self._validate_async()
1875
1876 return async_validator
1877
1878 def validate_and_handle(self) -> None:
1879 """
1880 Validate buffer and handle the accept action.
1881 """
1882 valid = self.validate(set_cursor=True)
1883
1884 # When the validation succeeded, accept the input.
1885 if valid:
1886 if self.accept_handler:
1887 keep_text = self.accept_handler(self)
1888 else:
1889 keep_text = False
1890
1891 self.append_to_history()
1892
1893 if not keep_text:
1894 self.reset()
1895
1896
1897_T = TypeVar("_T", bound=Callable[..., Coroutine[Any, Any, None]])
1898
1899
1900def _only_one_at_a_time(coroutine: _T) -> _T:
1901 """
1902 Decorator that only starts the coroutine only if the previous call has
1903 finished. (Used to make sure that we have only one autocompleter, auto
1904 suggestor and validator running at a time.)
1905
1906 When the coroutine raises `_Retry`, it is restarted.
1907 """
1908 running = False
1909
1910 @wraps(coroutine)
1911 async def new_coroutine(*a: Any, **kw: Any) -> Any:
1912 nonlocal running
1913
1914 # Don't start a new function, if the previous is still in progress.
1915 if running:
1916 return
1917
1918 running = True
1919
1920 try:
1921 while True:
1922 try:
1923 await coroutine(*a, **kw)
1924 except _Retry:
1925 continue
1926 else:
1927 return None
1928 finally:
1929 running = False
1930
1931 return cast(_T, new_coroutine)
1932
1933
1934class _Retry(Exception):
1935 "Retry in `_only_one_at_a_time`."
1936
1937
1938def indent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None:
1939 """
1940 Indent text of a :class:`.Buffer` object.
1941 """
1942 current_row = buffer.document.cursor_position_row
1943 current_col = buffer.document.cursor_position_col
1944 line_range = range(from_row, to_row)
1945
1946 # Apply transformation.
1947 indent_content = " " * count
1948 new_text = buffer.transform_lines(line_range, lambda l: indent_content + l)
1949 buffer.document = Document(
1950 new_text, Document(new_text).translate_row_col_to_index(current_row, 0)
1951 )
1952
1953 # Place cursor in the same position in text after indenting
1954 buffer.cursor_position += current_col + len(indent_content)
1955
1956
1957def unindent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None:
1958 """
1959 Unindent text of a :class:`.Buffer` object.
1960 """
1961 current_row = buffer.document.cursor_position_row
1962 current_col = buffer.document.cursor_position_col
1963 line_range = range(from_row, to_row)
1964
1965 indent_content = " " * count
1966
1967 def transform(text: str) -> str:
1968 remove = indent_content
1969 if text.startswith(remove):
1970 return text[len(remove) :]
1971 else:
1972 return text.lstrip()
1973
1974 # Apply transformation.
1975 new_text = buffer.transform_lines(line_range, transform)
1976 buffer.document = Document(
1977 new_text, Document(new_text).translate_row_col_to_index(current_row, 0)
1978 )
1979
1980 # Place cursor in the same position in text after dedent
1981 buffer.cursor_position += current_col - len(indent_content)
1982
1983
1984def reshape_text(buffer: Buffer, from_row: int, to_row: int) -> None:
1985 """
1986 Reformat text, taking the width into account.
1987 `to_row` is included.
1988 (Vi 'gq' operator.)
1989 """
1990 lines = buffer.text.splitlines(True)
1991 lines_before = lines[:from_row]
1992 lines_after = lines[to_row + 1 :]
1993 lines_to_reformat = lines[from_row : to_row + 1]
1994
1995 if lines_to_reformat:
1996 # Take indentation from the first line.
1997 match = re.search(r"^\s*", lines_to_reformat[0])
1998 length = match.end() if match else 0 # `match` can't be None, actually.
1999
2000 indent = lines_to_reformat[0][:length].replace("\n", "")
2001
2002 # Now, take all the 'words' from the lines to be reshaped.
2003 words = "".join(lines_to_reformat).split()
2004
2005 # And reshape.
2006 width = (buffer.text_width or 80) - len(indent)
2007 reshaped_text = [indent]
2008 current_width = 0
2009 for w in words:
2010 if current_width:
2011 if len(w) + current_width + 1 > width:
2012 reshaped_text.append("\n")
2013 reshaped_text.append(indent)
2014 current_width = 0
2015 else:
2016 reshaped_text.append(" ")
2017 current_width += 1
2018
2019 reshaped_text.append(w)
2020 current_width += len(w)
2021
2022 if reshaped_text[-1] != "\n":
2023 reshaped_text.append("\n")
2024
2025 # Apply result.
2026 buffer.document = Document(
2027 text="".join(lines_before + reshaped_text + lines_after),
2028 cursor_position=len("".join(lines_before + reshaped_text)),
2029 )