1"""
2Data structures for the Buffer.
3It holds the text, cursor position, history, etc...
4"""
5
6from __future__ import annotations
7
8import asyncio
9import logging
10import os
11import re
12import shlex
13import shutil
14import subprocess
15import tempfile
16from collections import deque
17from collections.abc import Callable, Coroutine, Iterable
18from enum import Enum
19from functools import wraps
20from typing import Any, TypeVar, cast
21
22from .application.current import get_app
23from .application.run_in_terminal import run_in_terminal
24from .auto_suggest import AutoSuggest, Suggestion
25from .cache import FastDictCache
26from .clipboard import ClipboardData
27from .completion import (
28 CompleteEvent,
29 Completer,
30 Completion,
31 DummyCompleter,
32 get_common_complete_suffix,
33)
34from .document import Document
35from .eventloop import aclosing
36from .filters import FilterOrBool, to_filter
37from .history import History, InMemoryHistory
38from .search import SearchDirection, SearchState
39from .selection import PasteMode, SelectionState, SelectionType
40from .utils import Event, to_str
41from .validation import ValidationError, Validator
42
43__all__ = [
44 "EditReadOnlyBuffer",
45 "Buffer",
46 "CompletionState",
47 "indent",
48 "unindent",
49 "reshape_text",
50]
51
52logger = logging.getLogger(__name__)
53
54
55class EditReadOnlyBuffer(Exception):
56 "Attempt editing of read-only :class:`.Buffer`."
57
58
59class ValidationState(Enum):
60 "The validation state of a buffer. This is set after the validation."
61
62 VALID = "VALID"
63 INVALID = "INVALID"
64 UNKNOWN = "UNKNOWN"
65
66
67class CompletionState:
68 """
69 Immutable class that contains a completion state.
70 """
71
72 def __init__(
73 self,
74 original_document: Document,
75 completions: list[Completion] | None = None,
76 complete_index: int | None = None,
77 ) -> None:
78 #: Document as it was when the completion started.
79 self.original_document = original_document
80
81 #: List of all the current Completion instances which are possible at
82 #: this point.
83 self.completions = completions or []
84
85 #: Position in the `completions` array.
86 #: This can be `None` to indicate "no completion", the original text.
87 self.complete_index = complete_index # Position in the `_completions` array.
88
89 def __repr__(self) -> str:
90 return f"{self.__class__.__name__}({self.original_document!r}, <{len(self.completions)!r}> completions, index={self.complete_index!r})"
91
92 def go_to_index(self, index: int | None) -> None:
93 """
94 Create a new :class:`.CompletionState` object with the new index.
95
96 When `index` is `None` deselect the completion.
97 """
98 if self.completions:
99 assert index is None or 0 <= index < len(self.completions)
100 self.complete_index = index
101
102 def new_text_and_position(self) -> tuple[str, int]:
103 """
104 Return (new_text, new_cursor_position) for this completion.
105 """
106 if self.complete_index is None:
107 return self.original_document.text, self.original_document.cursor_position
108 else:
109 original_text_before_cursor = self.original_document.text_before_cursor
110 original_text_after_cursor = self.original_document.text_after_cursor
111
112 c = self.completions[self.complete_index]
113 if c.start_position == 0:
114 before = original_text_before_cursor
115 else:
116 before = original_text_before_cursor[: c.start_position]
117
118 new_text = before + c.text + original_text_after_cursor
119 new_cursor_position = len(before) + len(c.text)
120 return new_text, new_cursor_position
121
122 @property
123 def current_completion(self) -> Completion | None:
124 """
125 Return the current completion, or return `None` when no completion is
126 selected.
127 """
128 if self.complete_index is not None:
129 return self.completions[self.complete_index]
130 return None
131
132
133_QUOTED_WORDS_RE = re.compile(r"""(\s+|".*?"|'.*?')""")
134
135
136class YankNthArgState:
137 """
138 For yank-last-arg/yank-nth-arg: Keep track of where we are in the history.
139 """
140
141 def __init__(
142 self, history_position: int = 0, n: int = -1, previous_inserted_word: str = ""
143 ) -> None:
144 self.history_position = history_position
145 self.previous_inserted_word = previous_inserted_word
146 self.n = n
147
148 def __repr__(self) -> str:
149 return f"{self.__class__.__name__}(history_position={self.history_position!r}, n={self.n!r}, previous_inserted_word={self.previous_inserted_word!r})"
150
151
152BufferEventHandler = Callable[["Buffer"], None]
153BufferAcceptHandler = Callable[["Buffer"], bool]
154
155
156class Buffer:
157 """
158 The core data structure that holds the text and cursor position of the
159 current input line and implements all text manipulations on top of it. It
160 also implements the history, undo stack and the completion state.
161
162 :param completer: :class:`~prompt_toolkit.completion.Completer` instance.
163 :param history: :class:`~prompt_toolkit.history.History` instance.
164 :param tempfile_suffix: The tempfile suffix (extension) to be used for the
165 "open in editor" function. For a Python REPL, this would be ".py", so
166 that the editor knows the syntax highlighting to use. This can also be
167 a callable that returns a string.
168 :param tempfile: For more advanced tempfile situations where you need
169 control over the subdirectories and filename. For a Git Commit Message,
170 this would be ".git/COMMIT_EDITMSG", so that the editor knows the syntax
171 highlighting to use. This can also be a callable that returns a string.
172 :param name: Name for this buffer. E.g. DEFAULT_BUFFER. This is mostly
173 useful for key bindings where we sometimes prefer to refer to a buffer
174 by their name instead of by reference.
175 :param accept_handler: Called when the buffer input is accepted. (Usually
176 when the user presses `enter`.) The accept handler receives this
177 `Buffer` as input and should return True when the buffer text should be
178 kept instead of calling reset.
179
180 In case of a `PromptSession` for instance, we want to keep the text,
181 because we will exit the application, and only reset it during the next
182 run.
183 :param max_number_of_completions: Never display more than this number of
184 completions, even when the completer can produce more (limited by
185 default to 10k for performance).
186
187 Events:
188
189 :param on_text_changed: When the buffer text changes. (Callable or None.)
190 :param on_text_insert: When new text is inserted. (Callable or None.)
191 :param on_cursor_position_changed: When the cursor moves. (Callable or None.)
192 :param on_completions_changed: When the completions were changed. (Callable or None.)
193 :param on_suggestion_set: When an auto-suggestion text has been set. (Callable or None.)
194
195 Filters:
196
197 :param complete_while_typing: :class:`~prompt_toolkit.filters.Filter`
198 or `bool`. Decide whether or not to do asynchronous autocompleting while
199 typing.
200 :param validate_while_typing: :class:`~prompt_toolkit.filters.Filter`
201 or `bool`. Decide whether or not to do asynchronous validation while
202 typing.
203 :param enable_history_search: :class:`~prompt_toolkit.filters.Filter` or
204 `bool` to indicate when up-arrow partial string matching is enabled. It
205 is advised to not enable this at the same time as
206 `complete_while_typing`, because when there is an autocompletion found,
207 the up arrows usually browse through the completions, rather than
208 through the history.
209 :param read_only: :class:`~prompt_toolkit.filters.Filter`. When True,
210 changes will not be allowed.
211 :param multiline: :class:`~prompt_toolkit.filters.Filter` or `bool`. When
212 not set, pressing `Enter` will call the `accept_handler`. Otherwise,
213 pressing `Esc-Enter` is required.
214 """
215
216 def __init__(
217 self,
218 completer: Completer | None = None,
219 auto_suggest: AutoSuggest | None = None,
220 history: History | None = None,
221 validator: Validator | None = None,
222 tempfile_suffix: str | Callable[[], str] = "",
223 tempfile: str | Callable[[], str] = "",
224 name: str = "",
225 complete_while_typing: FilterOrBool = False,
226 validate_while_typing: FilterOrBool = False,
227 enable_history_search: FilterOrBool = False,
228 document: Document | None = None,
229 accept_handler: BufferAcceptHandler | None = None,
230 read_only: FilterOrBool = False,
231 multiline: FilterOrBool = True,
232 max_number_of_completions: int = 10000,
233 on_text_changed: BufferEventHandler | None = None,
234 on_text_insert: BufferEventHandler | None = None,
235 on_cursor_position_changed: BufferEventHandler | None = None,
236 on_completions_changed: BufferEventHandler | None = None,
237 on_suggestion_set: BufferEventHandler | None = None,
238 ) -> None:
239 # Accept both filters and booleans as input.
240 enable_history_search = to_filter(enable_history_search)
241 complete_while_typing = to_filter(complete_while_typing)
242 validate_while_typing = to_filter(validate_while_typing)
243 read_only = to_filter(read_only)
244 multiline = to_filter(multiline)
245
246 self.completer = completer or DummyCompleter()
247 self.auto_suggest = auto_suggest
248 self.validator = validator
249 self.tempfile_suffix = tempfile_suffix
250 self.tempfile = tempfile
251 self.name = name
252 self.accept_handler = accept_handler
253
254 # Filters. (Usually, used by the key bindings to drive the buffer.)
255 self.complete_while_typing = complete_while_typing
256 self.validate_while_typing = validate_while_typing
257 self.enable_history_search = enable_history_search
258 self.read_only = read_only
259 self.multiline = multiline
260 self.max_number_of_completions = max_number_of_completions
261
262 # Text width. (For wrapping, used by the Vi 'gq' operator.)
263 self.text_width = 0
264
265 #: The command buffer history.
266 # Note that we shouldn't use a lazy 'or' here. bool(history) could be
267 # False when empty.
268 self.history = InMemoryHistory() if history is None else history
269
270 self.__cursor_position = 0
271
272 # Events
273 self.on_text_changed: Event[Buffer] = Event(self, on_text_changed)
274 self.on_text_insert: Event[Buffer] = Event(self, on_text_insert)
275 self.on_cursor_position_changed: Event[Buffer] = Event(
276 self, on_cursor_position_changed
277 )
278 self.on_completions_changed: Event[Buffer] = Event(self, on_completions_changed)
279 self.on_suggestion_set: Event[Buffer] = Event(self, on_suggestion_set)
280
281 # Document cache. (Avoid creating new Document instances.)
282 self._document_cache: FastDictCache[
283 tuple[str, int, SelectionState | None], Document
284 ] = FastDictCache(Document, size=10)
285
286 # Create completer / auto suggestion / validation coroutines.
287 self._async_suggester = self._create_auto_suggest_coroutine()
288 self._async_completer = self._create_completer_coroutine()
289 self._async_validator = self._create_auto_validate_coroutine()
290
291 # Asyncio task for populating the history.
292 self._load_history_task: asyncio.Future[None] | None = None
293
294 # Reset other attributes.
295 self.reset(document=document)
296
297 def __repr__(self) -> str:
298 if len(self.text) < 15:
299 text = self.text
300 else:
301 text = self.text[:12] + "..."
302
303 return f"<Buffer(name={self.name!r}, text={text!r}) at {id(self)!r}>"
304
305 def reset(
306 self, document: Document | None = None, append_to_history: bool = False
307 ) -> None:
308 """
309 :param append_to_history: Append current input to history first.
310 """
311 if append_to_history:
312 self.append_to_history()
313
314 document = document or Document()
315
316 self.__cursor_position = document.cursor_position
317
318 # `ValidationError` instance. (Will be set when the input is wrong.)
319 self.validation_error: ValidationError | None = None
320 self.validation_state: ValidationState | None = ValidationState.UNKNOWN
321
322 # State of the selection.
323 self.selection_state: SelectionState | None = None
324
325 # Multiple cursor mode. (When we press 'I' or 'A' in visual-block mode,
326 # we can insert text on multiple lines at once. This is implemented by
327 # using multiple cursors.)
328 self.multiple_cursor_positions: list[int] = []
329
330 # When doing consecutive up/down movements, prefer to stay at this column.
331 self.preferred_column: int | None = None
332
333 # State of complete browser
334 # For interactive completion through Ctrl-N/Ctrl-P.
335 self.complete_state: CompletionState | None = None
336
337 # State of Emacs yank-nth-arg completion.
338 self.yank_nth_arg_state: YankNthArgState | None = None # for yank-nth-arg.
339
340 # Remember the document that we had *right before* the last paste
341 # operation. This is used for rotating through the kill ring.
342 self.document_before_paste: Document | None = None
343
344 # Current suggestion.
345 self.suggestion: Suggestion | None = None
346
347 # The history search text. (Used for filtering the history when we
348 # browse through it.)
349 self.history_search_text: str | None = None
350
351 # Undo/redo stacks (stack of `(text, cursor_position)`).
352 self._undo_stack: list[tuple[str, int]] = []
353 self._redo_stack: list[tuple[str, int]] = []
354
355 # Cancel history loader. If history loading was still ongoing.
356 # Cancel the `_load_history_task`, so that next repaint of the
357 # `BufferControl` we will repopulate it.
358 if self._load_history_task is not None:
359 self._load_history_task.cancel()
360 self._load_history_task = None
361
362 #: The working lines. Similar to history, except that this can be
363 #: modified. The user can press arrow_up and edit previous entries.
364 #: Ctrl-C should reset this, and copy the whole history back in here.
365 #: Enter should process the current command and append to the real
366 #: history.
367 self._working_lines: deque[str] = deque([document.text])
368 self.__working_index = 0
369
370 def load_history_if_not_yet_loaded(self) -> None:
371 """
372 Create task for populating the buffer history (if not yet done).
373
374 Note::
375
376 This needs to be called from within the event loop of the
377 application, because history loading is async, and we need to be
378 sure the right event loop is active. Therefor, we call this method
379 in the `BufferControl.create_content`.
380
381 There are situations where prompt_toolkit applications are created
382 in one thread, but will later run in a different thread (Ptpython
383 is one example. The REPL runs in a separate thread, in order to
384 prevent interfering with a potential different event loop in the
385 main thread. The REPL UI however is still created in the main
386 thread.) We could decide to not support creating prompt_toolkit
387 objects in one thread and running the application in a different
388 thread, but history loading is the only place where it matters, and
389 this solves it.
390 """
391 if self._load_history_task is None:
392
393 async def load_history() -> None:
394 async for item in self.history.load():
395 self._working_lines.appendleft(item)
396 self.__working_index += 1
397
398 self._load_history_task = get_app().create_background_task(load_history())
399
400 def load_history_done(f: asyncio.Future[None]) -> None:
401 """
402 Handle `load_history` result when either done, cancelled, or
403 when an exception was raised.
404 """
405 try:
406 f.result()
407 except asyncio.CancelledError:
408 # Ignore cancellation. But handle it, so that we don't get
409 # this traceback.
410 pass
411 except GeneratorExit:
412 # Probably not needed, but we had situations where
413 # `GeneratorExit` was raised in `load_history` during
414 # cancellation.
415 pass
416 except BaseException:
417 # Log error if something goes wrong. (We don't have a
418 # caller to which we can propagate this exception.)
419 logger.exception("Loading history failed")
420
421 self._load_history_task.add_done_callback(load_history_done)
422
423 # <getters/setters>
424
425 def _set_text(self, value: str) -> bool:
426 """set text at current working_index. Return whether it changed."""
427 working_index = self.working_index
428 working_lines = self._working_lines
429
430 original_value = working_lines[working_index]
431 working_lines[working_index] = value
432
433 # Return True when this text has been changed.
434 if len(value) != len(original_value):
435 # For Python 2, it seems that when two strings have a different
436 # length and one is a prefix of the other, Python still scans
437 # character by character to see whether the strings are different.
438 # (Some benchmarking showed significant differences for big
439 # documents. >100,000 of lines.)
440 return True
441 elif value != original_value:
442 return True
443 return False
444
445 def _set_cursor_position(self, value: int) -> bool:
446 """Set cursor position. Return whether it changed."""
447 original_position = self.__cursor_position
448 self.__cursor_position = max(0, value)
449
450 return self.__cursor_position != original_position
451
452 @property
453 def text(self) -> str:
454 return self._working_lines[self.working_index]
455
456 @text.setter
457 def text(self, value: str) -> None:
458 """
459 Setting text. (When doing this, make sure that the cursor_position is
460 valid for this text. text/cursor_position should be consistent at any time,
461 otherwise set a Document instead.)
462 """
463 # Ensure cursor position remains within the size of the text.
464 if self.cursor_position > len(value):
465 self.cursor_position = len(value)
466
467 # Don't allow editing of read-only buffers.
468 if self.read_only():
469 raise EditReadOnlyBuffer()
470
471 changed = self._set_text(value)
472
473 if changed:
474 self._text_changed()
475
476 # Reset history search text.
477 # (Note that this doesn't need to happen when working_index
478 # changes, which is when we traverse the history. That's why we
479 # don't do this in `self._text_changed`.)
480 self.history_search_text = None
481
482 @property
483 def cursor_position(self) -> int:
484 return self.__cursor_position
485
486 @cursor_position.setter
487 def cursor_position(self, value: int) -> None:
488 """
489 Setting cursor position.
490 """
491 assert isinstance(value, int)
492
493 # Ensure cursor position is within the size of the text.
494 if value > len(self.text):
495 value = len(self.text)
496 if value < 0:
497 value = 0
498
499 changed = self._set_cursor_position(value)
500
501 if changed:
502 self._cursor_position_changed()
503
504 @property
505 def working_index(self) -> int:
506 return self.__working_index
507
508 @working_index.setter
509 def working_index(self, value: int) -> None:
510 if self.__working_index != value:
511 self.__working_index = value
512 # Make sure to reset the cursor position, otherwise we end up in
513 # situations where the cursor position is out of the bounds of the
514 # text.
515 self.cursor_position = 0
516 self._text_changed()
517
518 def _text_changed(self) -> None:
519 # Remove any validation errors and complete state.
520 self.validation_error = None
521 self.validation_state = ValidationState.UNKNOWN
522 self.complete_state = None
523 self.yank_nth_arg_state = None
524 self.document_before_paste = None
525 self.selection_state = None
526 self.suggestion = None
527 self.preferred_column = None
528
529 # fire 'on_text_changed' event.
530 self.on_text_changed.fire()
531
532 # Input validation.
533 # (This happens on all change events, unlike auto completion, also when
534 # deleting text.)
535 if self.validator and self.validate_while_typing():
536 get_app().create_background_task(self._async_validator())
537
538 def _cursor_position_changed(self) -> None:
539 # Remove any complete state.
540 # (Input validation should only be undone when the cursor position
541 # changes.)
542 self.complete_state = None
543 self.yank_nth_arg_state = None
544 self.document_before_paste = None
545
546 # Unset preferred_column. (Will be set after the cursor movement, if
547 # required.)
548 self.preferred_column = None
549
550 # Note that the cursor position can change if we have a selection the
551 # new position of the cursor determines the end of the selection.
552
553 # fire 'on_cursor_position_changed' event.
554 self.on_cursor_position_changed.fire()
555
556 @property
557 def document(self) -> Document:
558 """
559 Return :class:`~prompt_toolkit.document.Document` instance from the
560 current text, cursor position and selection state.
561 """
562 return self._document_cache[
563 self.text, self.cursor_position, self.selection_state
564 ]
565
566 @document.setter
567 def document(self, value: Document) -> None:
568 """
569 Set :class:`~prompt_toolkit.document.Document` instance.
570
571 This will set both the text and cursor position at the same time, but
572 atomically. (Change events will be triggered only after both have been set.)
573 """
574 self.set_document(value)
575
576 def set_document(self, value: Document, bypass_readonly: bool = False) -> None:
577 """
578 Set :class:`~prompt_toolkit.document.Document` instance. Like the
579 ``document`` property, but accept an ``bypass_readonly`` argument.
580
581 :param bypass_readonly: When True, don't raise an
582 :class:`.EditReadOnlyBuffer` exception, even
583 when the buffer is read-only.
584
585 .. warning::
586
587 When this buffer is read-only and `bypass_readonly` was not passed,
588 the `EditReadOnlyBuffer` exception will be caught by the
589 `KeyProcessor` and is silently suppressed. This is important to
590 keep in mind when writing key bindings, because it won't do what
591 you expect, and there won't be a stack trace. Use try/finally
592 around this function if you need some cleanup code.
593 """
594 # Don't allow editing of read-only buffers.
595 if not bypass_readonly and self.read_only():
596 raise EditReadOnlyBuffer()
597
598 # Set text and cursor position first.
599 text_changed = self._set_text(value.text)
600 cursor_position_changed = self._set_cursor_position(value.cursor_position)
601
602 # Now handle change events. (We do this when text/cursor position is
603 # both set and consistent.)
604 if text_changed:
605 self._text_changed()
606 self.history_search_text = None
607
608 if cursor_position_changed:
609 self._cursor_position_changed()
610
611 @property
612 def is_returnable(self) -> bool:
613 """
614 True when there is something handling accept.
615 """
616 return bool(self.accept_handler)
617
618 # End of <getters/setters>
619
620 def save_to_undo_stack(self, clear_redo_stack: bool = True) -> None:
621 """
622 Safe current state (input text and cursor position), so that we can
623 restore it by calling undo.
624 """
625 # Safe if the text is different from the text at the top of the stack
626 # is different. If the text is the same, just update the cursor position.
627 if self._undo_stack and self._undo_stack[-1][0] == self.text:
628 self._undo_stack[-1] = (self._undo_stack[-1][0], self.cursor_position)
629 else:
630 self._undo_stack.append((self.text, self.cursor_position))
631
632 # Saving anything to the undo stack, clears the redo stack.
633 if clear_redo_stack:
634 self._redo_stack = []
635
636 def transform_lines(
637 self,
638 line_index_iterator: Iterable[int],
639 transform_callback: Callable[[str], str],
640 ) -> str:
641 """
642 Transforms the text on a range of lines.
643 When the iterator yield an index not in the range of lines that the
644 document contains, it skips them silently.
645
646 To uppercase some lines::
647
648 new_text = transform_lines(range(5,10), lambda text: text.upper())
649
650 :param line_index_iterator: Iterator of line numbers (int)
651 :param transform_callback: callable that takes the original text of a
652 line, and return the new text for this line.
653
654 :returns: The new text.
655 """
656 # Split lines
657 lines = self.text.split("\n")
658
659 # Apply transformation
660 for index in line_index_iterator:
661 try:
662 lines[index] = transform_callback(lines[index])
663 except IndexError:
664 pass
665
666 return "\n".join(lines)
667
668 def transform_current_line(self, transform_callback: Callable[[str], str]) -> None:
669 """
670 Apply the given transformation function to the current line.
671
672 :param transform_callback: callable that takes a string and return a new string.
673 """
674 document = self.document
675 a = document.cursor_position + document.get_start_of_line_position()
676 b = document.cursor_position + document.get_end_of_line_position()
677 self.text = (
678 document.text[:a]
679 + transform_callback(document.text[a:b])
680 + document.text[b:]
681 )
682
683 def transform_region(
684 self, from_: int, to: int, transform_callback: Callable[[str], str]
685 ) -> None:
686 """
687 Transform a part of the input string.
688
689 :param from_: (int) start position.
690 :param to: (int) end position.
691 :param transform_callback: Callable which accepts a string and returns
692 the transformed string.
693 """
694 assert from_ < to
695
696 self.text = "".join(
697 [
698 self.text[:from_]
699 + transform_callback(self.text[from_:to])
700 + self.text[to:]
701 ]
702 )
703
704 def cursor_left(self, count: int = 1) -> None:
705 self.cursor_position += self.document.get_cursor_left_position(count=count)
706
707 def cursor_right(self, count: int = 1) -> None:
708 self.cursor_position += self.document.get_cursor_right_position(count=count)
709
710 def cursor_up(self, count: int = 1) -> None:
711 """(for multiline edit). Move cursor to the previous line."""
712 original_column = self.preferred_column or self.document.cursor_position_col
713 self.cursor_position += self.document.get_cursor_up_position(
714 count=count, preferred_column=original_column
715 )
716
717 # Remember the original column for the next up/down movement.
718 self.preferred_column = original_column
719
720 def cursor_down(self, count: int = 1) -> None:
721 """(for multiline edit). Move cursor to the next line."""
722 original_column = self.preferred_column or self.document.cursor_position_col
723 self.cursor_position += self.document.get_cursor_down_position(
724 count=count, preferred_column=original_column
725 )
726
727 # Remember the original column for the next up/down movement.
728 self.preferred_column = original_column
729
730 def auto_up(
731 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False
732 ) -> None:
733 """
734 If we're not on the first line (of a multiline input) go a line up,
735 otherwise go back in history. (If nothing is selected.)
736 """
737 if self.complete_state:
738 self.complete_previous(count=count)
739 elif self.document.cursor_position_row > 0:
740 self.cursor_up(count=count)
741 elif not self.selection_state:
742 self.history_backward(count=count)
743
744 # Go to the start of the line?
745 if go_to_start_of_line_if_history_changes:
746 self.cursor_position += self.document.get_start_of_line_position()
747
748 def auto_down(
749 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False
750 ) -> None:
751 """
752 If we're not on the last line (of a multiline input) go a line down,
753 otherwise go forward in history. (If nothing is selected.)
754 """
755 if self.complete_state:
756 self.complete_next(count=count)
757 elif self.document.cursor_position_row < self.document.line_count - 1:
758 self.cursor_down(count=count)
759 elif not self.selection_state:
760 self.history_forward(count=count)
761
762 # Go to the start of the line?
763 if go_to_start_of_line_if_history_changes:
764 self.cursor_position += self.document.get_start_of_line_position()
765
766 def delete_before_cursor(self, count: int = 1) -> str:
767 """
768 Delete specified number of characters before cursor and return the
769 deleted text.
770 """
771 assert count >= 0
772 deleted = ""
773
774 if self.cursor_position > 0:
775 deleted = self.text[self.cursor_position - count : self.cursor_position]
776
777 new_text = (
778 self.text[: self.cursor_position - count]
779 + self.text[self.cursor_position :]
780 )
781 new_cursor_position = self.cursor_position - len(deleted)
782
783 # Set new Document atomically.
784 self.document = Document(new_text, new_cursor_position)
785
786 return deleted
787
788 def delete(self, count: int = 1) -> str:
789 """
790 Delete specified number of characters and Return the deleted text.
791 """
792 if self.cursor_position < len(self.text):
793 deleted = self.document.text_after_cursor[:count]
794 self.text = (
795 self.text[: self.cursor_position]
796 + self.text[self.cursor_position + len(deleted) :]
797 )
798 return deleted
799 else:
800 return ""
801
802 def join_next_line(self, separator: str = " ") -> None:
803 """
804 Join the next line to the current one by deleting the line ending after
805 the current line.
806 """
807 if not self.document.on_last_line:
808 self.cursor_position += self.document.get_end_of_line_position()
809 self.delete()
810
811 # Remove spaces.
812 self.text = (
813 self.document.text_before_cursor
814 + separator
815 + self.document.text_after_cursor.lstrip(" ")
816 )
817
818 def join_selected_lines(self, separator: str = " ") -> None:
819 """
820 Join the selected lines.
821 """
822 assert self.selection_state
823
824 # Get lines.
825 from_, to = sorted(
826 [self.cursor_position, self.selection_state.original_cursor_position]
827 )
828
829 before = self.text[:from_]
830 lines = self.text[from_:to].splitlines()
831 after = self.text[to:]
832
833 # Replace leading spaces with just one space.
834 lines = [l.lstrip(" ") + separator for l in lines]
835
836 # Set new document.
837 self.document = Document(
838 text=before + "".join(lines) + after,
839 cursor_position=len(before + "".join(lines[:-1])) - 1,
840 )
841
842 def swap_characters_before_cursor(self) -> None:
843 """
844 Swap the last two characters before the cursor.
845 """
846 pos = self.cursor_position
847
848 if pos >= 2:
849 a = self.text[pos - 2]
850 b = self.text[pos - 1]
851
852 self.text = self.text[: pos - 2] + b + a + self.text[pos:]
853
854 def go_to_history(self, index: int) -> None:
855 """
856 Go to this item in the history.
857 """
858 if index < len(self._working_lines):
859 self.working_index = index
860 self.cursor_position = len(self.text)
861
862 def complete_next(self, count: int = 1, disable_wrap_around: bool = False) -> None:
863 """
864 Browse to the next completions.
865 (Does nothing if there are no completion.)
866 """
867 index: int | None
868
869 if self.complete_state:
870 completions_count = len(self.complete_state.completions)
871
872 if self.complete_state.complete_index is None:
873 index = 0
874 elif self.complete_state.complete_index == completions_count - 1:
875 index = None
876
877 if disable_wrap_around:
878 return
879 else:
880 index = min(
881 completions_count - 1, self.complete_state.complete_index + count
882 )
883 self.go_to_completion(index)
884
885 def complete_previous(
886 self, count: int = 1, disable_wrap_around: bool = False
887 ) -> None:
888 """
889 Browse to the previous completions.
890 (Does nothing if there are no completion.)
891 """
892 index: int | None
893
894 if self.complete_state:
895 if self.complete_state.complete_index == 0:
896 index = None
897
898 if disable_wrap_around:
899 return
900 elif self.complete_state.complete_index is None:
901 index = len(self.complete_state.completions) - 1
902 else:
903 index = max(0, self.complete_state.complete_index - count)
904
905 self.go_to_completion(index)
906
907 def cancel_completion(self) -> None:
908 """
909 Cancel completion, go back to the original text.
910 """
911 if self.complete_state:
912 self.go_to_completion(None)
913 self.complete_state = None
914
915 def _set_completions(self, completions: list[Completion]) -> CompletionState:
916 """
917 Start completions. (Generate list of completions and initialize.)
918
919 By default, no completion will be selected.
920 """
921 self.complete_state = CompletionState(
922 original_document=self.document, completions=completions
923 )
924
925 # Trigger event. This should eventually invalidate the layout.
926 self.on_completions_changed.fire()
927
928 return self.complete_state
929
930 def start_history_lines_completion(self) -> None:
931 """
932 Start a completion based on all the other lines in the document and the
933 history.
934 """
935 found_completions: set[str] = set()
936 completions = []
937
938 # For every line of the whole history, find matches with the current line.
939 current_line = self.document.current_line_before_cursor.lstrip()
940
941 for i, string in enumerate(self._working_lines):
942 for j, l in enumerate(string.split("\n")):
943 l = l.strip()
944 if l and l.startswith(current_line):
945 # When a new line has been found.
946 if l not in found_completions:
947 found_completions.add(l)
948
949 # Create completion.
950 if i == self.working_index:
951 display_meta = "Current, line %s" % (j + 1)
952 else:
953 display_meta = f"History {i + 1}, line {j + 1}"
954
955 completions.append(
956 Completion(
957 text=l,
958 start_position=-len(current_line),
959 display_meta=display_meta,
960 )
961 )
962
963 self._set_completions(completions=completions[::-1])
964 self.go_to_completion(0)
965
966 def go_to_completion(self, index: int | None) -> None:
967 """
968 Select a completion from the list of current completions.
969 """
970 assert self.complete_state
971
972 # Set new completion
973 state = self.complete_state
974 state.go_to_index(index)
975
976 # Set text/cursor position
977 new_text, new_cursor_position = state.new_text_and_position()
978 self.document = Document(new_text, new_cursor_position)
979
980 # (changing text/cursor position will unset complete_state.)
981 self.complete_state = state
982
983 def apply_completion(self, completion: Completion) -> None:
984 """
985 Insert a given completion.
986 """
987 # If there was already a completion active, cancel that one.
988 if self.complete_state:
989 self.go_to_completion(None)
990 self.complete_state = None
991
992 # Insert text from the given completion.
993 self.delete_before_cursor(-completion.start_position)
994 self.insert_text(completion.text)
995
996 def _set_history_search(self) -> None:
997 """
998 Set `history_search_text`.
999 (The text before the cursor will be used for filtering the history.)
1000 """
1001 if self.enable_history_search():
1002 if self.history_search_text is None:
1003 self.history_search_text = self.document.text_before_cursor
1004 else:
1005 self.history_search_text = None
1006
1007 def _history_matches(self, i: int) -> bool:
1008 """
1009 True when the current entry matches the history search.
1010 (when we don't have history search, it's also True.)
1011 """
1012 return self.history_search_text is None or self._working_lines[i].startswith(
1013 self.history_search_text
1014 )
1015
1016 def history_forward(self, count: int = 1) -> None:
1017 """
1018 Move forwards through the history.
1019
1020 :param count: Amount of items to move forward.
1021 """
1022 self._set_history_search()
1023
1024 # Go forward in history.
1025 found_something = False
1026
1027 for i in range(self.working_index + 1, len(self._working_lines)):
1028 if self._history_matches(i):
1029 self.working_index = i
1030 count -= 1
1031 found_something = True
1032 if count == 0:
1033 break
1034
1035 # If we found an entry, move cursor to the end of the first line.
1036 if found_something:
1037 self.cursor_position = 0
1038 self.cursor_position += self.document.get_end_of_line_position()
1039
1040 def history_backward(self, count: int = 1) -> None:
1041 """
1042 Move backwards through history.
1043 """
1044 self._set_history_search()
1045
1046 # Go back in history.
1047 found_something = False
1048
1049 for i in range(self.working_index - 1, -1, -1):
1050 if self._history_matches(i):
1051 self.working_index = i
1052 count -= 1
1053 found_something = True
1054 if count == 0:
1055 break
1056
1057 # If we move to another entry, move cursor to the end of the line.
1058 if found_something:
1059 self.cursor_position = len(self.text)
1060
1061 def yank_nth_arg(self, n: int | None = None, _yank_last_arg: bool = False) -> None:
1062 """
1063 Pick nth word from previous history entry (depending on current
1064 `yank_nth_arg_state`) and insert it at current position. Rotate through
1065 history if called repeatedly. If no `n` has been given, take the first
1066 argument. (The second word.)
1067
1068 :param n: (None or int), The index of the word from the previous line
1069 to take.
1070 """
1071 assert n is None or isinstance(n, int)
1072 history_strings = self.history.get_strings()
1073
1074 if not len(history_strings):
1075 return
1076
1077 # Make sure we have a `YankNthArgState`.
1078 if self.yank_nth_arg_state is None:
1079 state = YankNthArgState(n=-1 if _yank_last_arg else 1)
1080 else:
1081 state = self.yank_nth_arg_state
1082
1083 if n is not None:
1084 state.n = n
1085
1086 # Get new history position.
1087 new_pos = state.history_position - 1
1088 if -new_pos > len(history_strings):
1089 new_pos = -1
1090
1091 # Take argument from line.
1092 line = history_strings[new_pos]
1093
1094 words = [w.strip() for w in _QUOTED_WORDS_RE.split(line)]
1095 words = [w for w in words if w]
1096 try:
1097 word = words[state.n]
1098 except IndexError:
1099 word = ""
1100
1101 # Insert new argument.
1102 if state.previous_inserted_word:
1103 self.delete_before_cursor(len(state.previous_inserted_word))
1104 self.insert_text(word)
1105
1106 # Save state again for next completion. (Note that the 'insert'
1107 # operation from above clears `self.yank_nth_arg_state`.)
1108 state.previous_inserted_word = word
1109 state.history_position = new_pos
1110 self.yank_nth_arg_state = state
1111
1112 def yank_last_arg(self, n: int | None = None) -> None:
1113 """
1114 Like `yank_nth_arg`, but if no argument has been given, yank the last
1115 word by default.
1116 """
1117 self.yank_nth_arg(n=n, _yank_last_arg=True)
1118
1119 def start_selection(
1120 self, selection_type: SelectionType = SelectionType.CHARACTERS
1121 ) -> None:
1122 """
1123 Take the current cursor position as the start of this selection.
1124 """
1125 self.selection_state = SelectionState(self.cursor_position, selection_type)
1126
1127 def copy_selection(self, _cut: bool = False) -> ClipboardData:
1128 """
1129 Copy selected text and return :class:`.ClipboardData` instance.
1130
1131 Notice that this doesn't store the copied data on the clipboard yet.
1132 You can store it like this:
1133
1134 .. code:: python
1135
1136 data = buffer.copy_selection()
1137 get_app().clipboard.set_data(data)
1138 """
1139 new_document, clipboard_data = self.document.cut_selection()
1140 if _cut:
1141 self.document = new_document
1142
1143 self.selection_state = None
1144 return clipboard_data
1145
1146 def cut_selection(self) -> ClipboardData:
1147 """
1148 Delete selected text and return :class:`.ClipboardData` instance.
1149 """
1150 return self.copy_selection(_cut=True)
1151
1152 def paste_clipboard_data(
1153 self,
1154 data: ClipboardData,
1155 paste_mode: PasteMode = PasteMode.EMACS,
1156 count: int = 1,
1157 ) -> None:
1158 """
1159 Insert the data from the clipboard.
1160 """
1161 assert isinstance(data, ClipboardData)
1162 assert paste_mode in (PasteMode.VI_BEFORE, PasteMode.VI_AFTER, PasteMode.EMACS)
1163
1164 original_document = self.document
1165 self.document = self.document.paste_clipboard_data(
1166 data, paste_mode=paste_mode, count=count
1167 )
1168
1169 # Remember original document. This assignment should come at the end,
1170 # because assigning to 'document' will erase it.
1171 self.document_before_paste = original_document
1172
1173 def newline(self, copy_margin: bool = True) -> None:
1174 """
1175 Insert a line ending at the current position.
1176 """
1177 if copy_margin:
1178 self.insert_text("\n" + self.document.leading_whitespace_in_current_line)
1179 else:
1180 self.insert_text("\n")
1181
1182 def insert_line_above(self, copy_margin: bool = True) -> None:
1183 """
1184 Insert a new line above the current one.
1185 """
1186 if copy_margin:
1187 insert = self.document.leading_whitespace_in_current_line + "\n"
1188 else:
1189 insert = "\n"
1190
1191 self.cursor_position += self.document.get_start_of_line_position()
1192 self.insert_text(insert)
1193 self.cursor_position -= 1
1194
1195 def insert_line_below(self, copy_margin: bool = True) -> None:
1196 """
1197 Insert a new line below the current one.
1198 """
1199 if copy_margin:
1200 insert = "\n" + self.document.leading_whitespace_in_current_line
1201 else:
1202 insert = "\n"
1203
1204 self.cursor_position += self.document.get_end_of_line_position()
1205 self.insert_text(insert)
1206
1207 def insert_text(
1208 self,
1209 data: str,
1210 overwrite: bool = False,
1211 move_cursor: bool = True,
1212 fire_event: bool = True,
1213 ) -> None:
1214 """
1215 Insert characters at cursor position.
1216
1217 :param fire_event: Fire `on_text_insert` event. This is mainly used to
1218 trigger autocompletion while typing.
1219 """
1220 # Original text & cursor position.
1221 otext = self.text
1222 ocpos = self.cursor_position
1223
1224 # In insert/text mode.
1225 if overwrite:
1226 # Don't overwrite the newline itself. Just before the line ending,
1227 # it should act like insert mode.
1228 overwritten_text = otext[ocpos : ocpos + len(data)]
1229 if "\n" in overwritten_text:
1230 overwritten_text = overwritten_text[: overwritten_text.find("\n")]
1231
1232 text = otext[:ocpos] + data + otext[ocpos + len(overwritten_text) :]
1233 else:
1234 text = otext[:ocpos] + data + otext[ocpos:]
1235
1236 if move_cursor:
1237 cpos = self.cursor_position + len(data)
1238 else:
1239 cpos = self.cursor_position
1240
1241 # Set new document.
1242 # (Set text and cursor position at the same time. Otherwise, setting
1243 # the text will fire a change event before the cursor position has been
1244 # set. It works better to have this atomic.)
1245 self.document = Document(text, cpos)
1246
1247 # Fire 'on_text_insert' event.
1248 if fire_event: # XXX: rename to `start_complete`.
1249 self.on_text_insert.fire()
1250
1251 # Only complete when "complete_while_typing" is enabled.
1252 if self.completer and self.complete_while_typing():
1253 get_app().create_background_task(self._async_completer())
1254
1255 # Call auto_suggest.
1256 if self.auto_suggest:
1257 get_app().create_background_task(self._async_suggester())
1258
1259 def undo(self) -> None:
1260 # Pop from the undo-stack until we find a text that if different from
1261 # the current text. (The current logic of `save_to_undo_stack` will
1262 # cause that the top of the undo stack is usually the same as the
1263 # current text, so in that case we have to pop twice.)
1264 while self._undo_stack:
1265 text, pos = self._undo_stack.pop()
1266
1267 if text != self.text:
1268 # Push current text to redo stack.
1269 self._redo_stack.append((self.text, self.cursor_position))
1270
1271 # Set new text/cursor_position.
1272 self.document = Document(text, cursor_position=pos)
1273 break
1274
1275 def redo(self) -> None:
1276 if self._redo_stack:
1277 # Copy current state on undo stack.
1278 self.save_to_undo_stack(clear_redo_stack=False)
1279
1280 # Pop state from redo stack.
1281 text, pos = self._redo_stack.pop()
1282 self.document = Document(text, cursor_position=pos)
1283
1284 def validate(self, set_cursor: bool = False) -> bool:
1285 """
1286 Returns `True` if valid.
1287
1288 :param set_cursor: Set the cursor position, if an error was found.
1289 """
1290 # Don't call the validator again, if it was already called for the
1291 # current input.
1292 if self.validation_state != ValidationState.UNKNOWN:
1293 return self.validation_state == ValidationState.VALID
1294
1295 # Call validator.
1296 if self.validator:
1297 try:
1298 self.validator.validate(self.document)
1299 except ValidationError as e:
1300 # Set cursor position (don't allow invalid values.)
1301 if set_cursor:
1302 self.cursor_position = min(
1303 max(0, e.cursor_position), len(self.text)
1304 )
1305
1306 self.validation_state = ValidationState.INVALID
1307 self.validation_error = e
1308 return False
1309
1310 # Handle validation result.
1311 self.validation_state = ValidationState.VALID
1312 self.validation_error = None
1313 return True
1314
1315 async def _validate_async(self) -> None:
1316 """
1317 Asynchronous version of `validate()`.
1318 This one doesn't set the cursor position.
1319
1320 We have both variants, because a synchronous version is required.
1321 Handling the ENTER key needs to be completely synchronous, otherwise
1322 stuff like type-ahead is going to give very weird results. (People
1323 could type input while the ENTER key is still processed.)
1324
1325 An asynchronous version is required if we have `validate_while_typing`
1326 enabled.
1327 """
1328 while True:
1329 # Don't call the validator again, if it was already called for the
1330 # current input.
1331 if self.validation_state != ValidationState.UNKNOWN:
1332 return
1333
1334 # Call validator.
1335 error = None
1336 document = self.document
1337
1338 if self.validator:
1339 try:
1340 await self.validator.validate_async(self.document)
1341 except ValidationError as e:
1342 error = e
1343
1344 # If the document changed during the validation, try again.
1345 if self.document != document:
1346 continue
1347
1348 # Handle validation result.
1349 if error:
1350 self.validation_state = ValidationState.INVALID
1351 else:
1352 self.validation_state = ValidationState.VALID
1353
1354 self.validation_error = error
1355 get_app().invalidate() # Trigger redraw (display error).
1356
1357 def append_to_history(self) -> None:
1358 """
1359 Append the current input to the history.
1360 """
1361 # Save at the tail of the history. (But don't if the last entry the
1362 # history is already the same.)
1363 if self.text:
1364 history_strings = self.history.get_strings()
1365 if not len(history_strings) or history_strings[-1] != self.text:
1366 self.history.append_string(self.text)
1367
1368 def _search(
1369 self,
1370 search_state: SearchState,
1371 include_current_position: bool = False,
1372 count: int = 1,
1373 ) -> tuple[int, int] | None:
1374 """
1375 Execute search. Return (working_index, cursor_position) tuple when this
1376 search is applied. Returns `None` when this text cannot be found.
1377 """
1378 assert count > 0
1379
1380 text = search_state.text
1381 direction = search_state.direction
1382 ignore_case = search_state.ignore_case()
1383
1384 def search_once(
1385 working_index: int, document: Document
1386 ) -> tuple[int, Document] | None:
1387 """
1388 Do search one time.
1389 Return (working_index, document) or `None`
1390 """
1391 if direction == SearchDirection.FORWARD:
1392 # Try find at the current input.
1393 new_index = document.find(
1394 text,
1395 include_current_position=include_current_position,
1396 ignore_case=ignore_case,
1397 )
1398
1399 if new_index is not None:
1400 return (
1401 working_index,
1402 Document(document.text, document.cursor_position + new_index),
1403 )
1404 else:
1405 # No match, go forward in the history. (Include len+1 to wrap around.)
1406 # (Here we should always include all cursor positions, because
1407 # it's a different line.)
1408 for i in range(working_index + 1, len(self._working_lines) + 1):
1409 i %= len(self._working_lines)
1410
1411 document = Document(self._working_lines[i], 0)
1412 new_index = document.find(
1413 text, include_current_position=True, ignore_case=ignore_case
1414 )
1415 if new_index is not None:
1416 return (i, Document(document.text, new_index))
1417 else:
1418 # Try find at the current input.
1419 new_index = document.find_backwards(text, ignore_case=ignore_case)
1420
1421 if new_index is not None:
1422 return (
1423 working_index,
1424 Document(document.text, document.cursor_position + new_index),
1425 )
1426 else:
1427 # No match, go back in the history. (Include -1 to wrap around.)
1428 for i in range(working_index - 1, -2, -1):
1429 i %= len(self._working_lines)
1430
1431 document = Document(
1432 self._working_lines[i], len(self._working_lines[i])
1433 )
1434 new_index = document.find_backwards(
1435 text, ignore_case=ignore_case
1436 )
1437 if new_index is not None:
1438 return (
1439 i,
1440 Document(document.text, len(document.text) + new_index),
1441 )
1442 return None
1443
1444 # Do 'count' search iterations.
1445 working_index = self.working_index
1446 document = self.document
1447 for _ in range(count):
1448 result = search_once(working_index, document)
1449 if result is None:
1450 return None # Nothing found.
1451 else:
1452 working_index, document = result
1453
1454 return (working_index, document.cursor_position)
1455
1456 def document_for_search(self, search_state: SearchState) -> Document:
1457 """
1458 Return a :class:`~prompt_toolkit.document.Document` instance that has
1459 the text/cursor position for this search, if we would apply it. This
1460 will be used in the
1461 :class:`~prompt_toolkit.layout.BufferControl` to display feedback while
1462 searching.
1463 """
1464 search_result = self._search(search_state, include_current_position=True)
1465
1466 if search_result is None:
1467 return self.document
1468 else:
1469 working_index, cursor_position = search_result
1470
1471 # Keep selection, when `working_index` was not changed.
1472 if working_index == self.working_index:
1473 selection = self.selection_state
1474 else:
1475 selection = None
1476
1477 return Document(
1478 self._working_lines[working_index], cursor_position, selection=selection
1479 )
1480
1481 def get_search_position(
1482 self,
1483 search_state: SearchState,
1484 include_current_position: bool = True,
1485 count: int = 1,
1486 ) -> int:
1487 """
1488 Get the cursor position for this search.
1489 (This operation won't change the `working_index`. It's won't go through
1490 the history. Vi text objects can't span multiple items.)
1491 """
1492 search_result = self._search(
1493 search_state, include_current_position=include_current_position, count=count
1494 )
1495
1496 if search_result is None:
1497 return self.cursor_position
1498 else:
1499 working_index, cursor_position = search_result
1500 return cursor_position
1501
1502 def apply_search(
1503 self,
1504 search_state: SearchState,
1505 include_current_position: bool = True,
1506 count: int = 1,
1507 ) -> None:
1508 """
1509 Apply search. If something is found, set `working_index` and
1510 `cursor_position`.
1511 """
1512 search_result = self._search(
1513 search_state, include_current_position=include_current_position, count=count
1514 )
1515
1516 if search_result is not None:
1517 working_index, cursor_position = search_result
1518 self.working_index = working_index
1519 self.cursor_position = cursor_position
1520
1521 def exit_selection(self) -> None:
1522 self.selection_state = None
1523
1524 def _editor_simple_tempfile(self) -> tuple[str, Callable[[], None]]:
1525 """
1526 Simple (file) tempfile implementation.
1527 Return (tempfile, cleanup_func).
1528 """
1529 suffix = to_str(self.tempfile_suffix)
1530 descriptor, filename = tempfile.mkstemp(suffix)
1531
1532 os.write(descriptor, self.text.encode("utf-8"))
1533 os.close(descriptor)
1534
1535 def cleanup() -> None:
1536 os.unlink(filename)
1537
1538 return filename, cleanup
1539
1540 def _editor_complex_tempfile(self) -> tuple[str, Callable[[], None]]:
1541 # Complex (directory) tempfile implementation.
1542 headtail = to_str(self.tempfile)
1543 if not headtail:
1544 # Revert to simple case.
1545 return self._editor_simple_tempfile()
1546 headtail = str(headtail)
1547
1548 # Try to make according to tempfile logic.
1549 head, tail = os.path.split(headtail)
1550 if os.path.isabs(head):
1551 head = head[1:]
1552
1553 dirpath = tempfile.mkdtemp()
1554 if head:
1555 dirpath = os.path.join(dirpath, head)
1556 # Assume there is no issue creating dirs in this temp dir.
1557 os.makedirs(dirpath)
1558
1559 # Open the filename and write current text.
1560 filename = os.path.join(dirpath, tail)
1561 with open(filename, "w", encoding="utf-8") as fh:
1562 fh.write(self.text)
1563
1564 def cleanup() -> None:
1565 shutil.rmtree(dirpath)
1566
1567 return filename, cleanup
1568
1569 def open_in_editor(self, validate_and_handle: bool = False) -> asyncio.Task[None]:
1570 """
1571 Open code in editor.
1572
1573 This returns a future, and runs in a thread executor.
1574 """
1575 if self.read_only():
1576 raise EditReadOnlyBuffer()
1577
1578 # Write current text to temporary file
1579 if self.tempfile:
1580 filename, cleanup_func = self._editor_complex_tempfile()
1581 else:
1582 filename, cleanup_func = self._editor_simple_tempfile()
1583
1584 async def run() -> None:
1585 try:
1586 # Open in editor
1587 # (We need to use `run_in_terminal`, because not all editors go to
1588 # the alternate screen buffer, and some could influence the cursor
1589 # position.)
1590 success = await run_in_terminal(
1591 lambda: self._open_file_in_editor(filename), in_executor=True
1592 )
1593
1594 # Read content again.
1595 if success:
1596 with open(filename, "rb") as f:
1597 text = f.read().decode("utf-8")
1598
1599 # Drop trailing newline. (Editors are supposed to add it at the
1600 # end, but we don't need it.)
1601 if text.endswith("\n"):
1602 text = text[:-1]
1603
1604 self.document = Document(text=text, cursor_position=len(text))
1605
1606 # Accept the input.
1607 if validate_and_handle:
1608 self.validate_and_handle()
1609
1610 finally:
1611 # Clean up temp dir/file.
1612 cleanup_func()
1613
1614 return get_app().create_background_task(run())
1615
1616 def _open_file_in_editor(self, filename: str) -> bool:
1617 """
1618 Call editor executable.
1619
1620 Return True when we received a zero return code.
1621 """
1622 # If the 'VISUAL' or 'EDITOR' environment variable has been set, use that.
1623 # Otherwise, fall back to the first available editor that we can find.
1624 visual = os.environ.get("VISUAL")
1625 editor = os.environ.get("EDITOR")
1626
1627 editors = [
1628 visual,
1629 editor,
1630 # Order of preference.
1631 "/usr/bin/editor",
1632 "/usr/bin/nano",
1633 "/usr/bin/pico",
1634 "/usr/bin/vi",
1635 "/usr/bin/emacs",
1636 ]
1637
1638 for e in editors:
1639 if e:
1640 try:
1641 # Use 'shlex.split()', because $VISUAL can contain spaces
1642 # and quotes.
1643 returncode = subprocess.call(shlex.split(e) + [filename])
1644 return returncode == 0
1645
1646 except OSError:
1647 # Executable does not exist, try the next one.
1648 pass
1649
1650 return False
1651
1652 def start_completion(
1653 self,
1654 select_first: bool = False,
1655 select_last: bool = False,
1656 insert_common_part: bool = False,
1657 complete_event: CompleteEvent | None = None,
1658 ) -> None:
1659 """
1660 Start asynchronous autocompletion of this buffer.
1661 (This will do nothing if a previous completion was still in progress.)
1662 """
1663 # Only one of these options can be selected.
1664 assert select_first + select_last + insert_common_part <= 1
1665
1666 get_app().create_background_task(
1667 self._async_completer(
1668 select_first=select_first,
1669 select_last=select_last,
1670 insert_common_part=insert_common_part,
1671 complete_event=complete_event
1672 or CompleteEvent(completion_requested=True),
1673 )
1674 )
1675
1676 def _create_completer_coroutine(self) -> Callable[..., Coroutine[Any, Any, None]]:
1677 """
1678 Create function for asynchronous autocompletion.
1679
1680 (This consumes the asynchronous completer generator, which possibly
1681 runs the completion algorithm in another thread.)
1682 """
1683
1684 def completion_does_nothing(document: Document, completion: Completion) -> bool:
1685 """
1686 Return `True` if applying this completion doesn't have any effect.
1687 (When it doesn't insert any new text.
1688 """
1689 text_before_cursor = document.text_before_cursor
1690 replaced_text = text_before_cursor[
1691 len(text_before_cursor) + completion.start_position :
1692 ]
1693 return replaced_text == completion.text
1694
1695 @_only_one_at_a_time
1696 async def async_completer(
1697 select_first: bool = False,
1698 select_last: bool = False,
1699 insert_common_part: bool = False,
1700 complete_event: CompleteEvent | None = None,
1701 ) -> None:
1702 document = self.document
1703 complete_event = complete_event or CompleteEvent(text_inserted=True)
1704
1705 # Don't complete when we already have completions.
1706 if self.complete_state or not self.completer:
1707 return
1708
1709 # Create an empty CompletionState.
1710 complete_state = CompletionState(original_document=self.document)
1711 self.complete_state = complete_state
1712
1713 def proceed() -> bool:
1714 """Keep retrieving completions. Input text has not yet changed
1715 while generating completions."""
1716 return self.complete_state == complete_state
1717
1718 refresh_needed = asyncio.Event()
1719
1720 async def refresh_while_loading() -> None:
1721 """Background loop to refresh the UI at most 3 times a second
1722 while the completion are loading. Calling
1723 `on_completions_changed.fire()` for every completion that we
1724 receive is too expensive when there are many completions. (We
1725 could tune `Application.max_render_postpone_time` and
1726 `Application.min_redraw_interval`, but having this here is a
1727 better approach.)
1728 """
1729 while True:
1730 self.on_completions_changed.fire()
1731 refresh_needed.clear()
1732 await asyncio.sleep(0.3)
1733 await refresh_needed.wait()
1734
1735 refresh_task = asyncio.ensure_future(refresh_while_loading())
1736 try:
1737 # Load.
1738 async with aclosing(
1739 self.completer.get_completions_async(document, complete_event)
1740 ) as async_generator:
1741 async for completion in async_generator:
1742 complete_state.completions.append(completion)
1743 refresh_needed.set()
1744
1745 # If the input text changes, abort.
1746 if not proceed():
1747 break
1748
1749 # Always stop at 10k completions.
1750 if (
1751 len(complete_state.completions)
1752 >= self.max_number_of_completions
1753 ):
1754 break
1755 finally:
1756 refresh_task.cancel()
1757
1758 # Refresh one final time after we got everything.
1759 self.on_completions_changed.fire()
1760
1761 completions = complete_state.completions
1762
1763 # When there is only one completion, which has nothing to add, ignore it.
1764 if len(completions) == 1 and completion_does_nothing(
1765 document, completions[0]
1766 ):
1767 del completions[:]
1768
1769 # Set completions if the text was not yet changed.
1770 if proceed():
1771 # When no completions were found, or when the user selected
1772 # already a completion by using the arrow keys, don't do anything.
1773 if (
1774 not self.complete_state
1775 or self.complete_state.complete_index is not None
1776 ):
1777 return
1778
1779 # When there are no completions, reset completion state anyway.
1780 if not completions:
1781 self.complete_state = None
1782 # Render the ui if the completion menu was shown
1783 # it is needed especially if there is one completion and it was deleted.
1784 self.on_completions_changed.fire()
1785 return
1786
1787 # Select first/last or insert common part, depending on the key
1788 # binding. (For this we have to wait until all completions are
1789 # loaded.)
1790
1791 if select_first:
1792 self.go_to_completion(0)
1793
1794 elif select_last:
1795 self.go_to_completion(len(completions) - 1)
1796
1797 elif insert_common_part:
1798 common_part = get_common_complete_suffix(document, completions)
1799 if common_part:
1800 # Insert the common part, update completions.
1801 self.insert_text(common_part)
1802 if len(completions) > 1:
1803 # (Don't call `async_completer` again, but
1804 # recalculate completions. See:
1805 # https://github.com/ipython/ipython/issues/9658)
1806 completions[:] = [
1807 c.new_completion_from_position(len(common_part))
1808 for c in completions
1809 ]
1810
1811 self._set_completions(completions=completions)
1812 else:
1813 self.complete_state = None
1814 else:
1815 # When we were asked to insert the "common"
1816 # prefix, but there was no common suffix but
1817 # still exactly one match, then select the
1818 # first. (It could be that we have a completion
1819 # which does * expansion, like '*.py', with
1820 # exactly one match.)
1821 if len(completions) == 1:
1822 self.go_to_completion(0)
1823
1824 else:
1825 # If the last operation was an insert, (not a delete), restart
1826 # the completion coroutine.
1827
1828 if self.document.text_before_cursor == document.text_before_cursor:
1829 return # Nothing changed.
1830
1831 if self.document.text_before_cursor.startswith(
1832 document.text_before_cursor
1833 ):
1834 raise _Retry
1835
1836 return async_completer
1837
1838 def _create_auto_suggest_coroutine(self) -> Callable[[], Coroutine[Any, Any, None]]:
1839 """
1840 Create function for asynchronous auto suggestion.
1841 (This can be in another thread.)
1842 """
1843
1844 @_only_one_at_a_time
1845 async def async_suggestor() -> None:
1846 document = self.document
1847
1848 # Don't suggest when we already have a suggestion.
1849 if self.suggestion or not self.auto_suggest:
1850 return
1851
1852 suggestion = await self.auto_suggest.get_suggestion_async(self, document)
1853
1854 # Set suggestion only if the text was not yet changed.
1855 if self.document == document:
1856 # Set suggestion and redraw interface.
1857 self.suggestion = suggestion
1858 self.on_suggestion_set.fire()
1859 else:
1860 # Otherwise, restart thread.
1861 raise _Retry
1862
1863 return async_suggestor
1864
1865 def _create_auto_validate_coroutine(
1866 self,
1867 ) -> Callable[[], Coroutine[Any, Any, None]]:
1868 """
1869 Create a function for asynchronous validation while typing.
1870 (This can be in another thread.)
1871 """
1872
1873 @_only_one_at_a_time
1874 async def async_validator() -> None:
1875 await self._validate_async()
1876
1877 return async_validator
1878
1879 def validate_and_handle(self) -> None:
1880 """
1881 Validate buffer and handle the accept action.
1882 """
1883 valid = self.validate(set_cursor=True)
1884
1885 # When the validation succeeded, accept the input.
1886 if valid:
1887 if self.accept_handler:
1888 keep_text = self.accept_handler(self)
1889 else:
1890 keep_text = False
1891
1892 self.append_to_history()
1893
1894 if not keep_text:
1895 self.reset()
1896
1897
1898_T = TypeVar("_T", bound=Callable[..., Coroutine[Any, Any, None]])
1899
1900
1901def _only_one_at_a_time(coroutine: _T) -> _T:
1902 """
1903 Decorator that only starts the coroutine only if the previous call has
1904 finished. (Used to make sure that we have only one autocompleter, auto
1905 suggestor and validator running at a time.)
1906
1907 When the coroutine raises `_Retry`, it is restarted.
1908 """
1909 running = False
1910
1911 @wraps(coroutine)
1912 async def new_coroutine(*a: Any, **kw: Any) -> Any:
1913 nonlocal running
1914
1915 # Don't start a new function, if the previous is still in progress.
1916 if running:
1917 return
1918
1919 running = True
1920
1921 try:
1922 while True:
1923 try:
1924 await coroutine(*a, **kw)
1925 except _Retry:
1926 continue
1927 else:
1928 return None
1929 finally:
1930 running = False
1931
1932 return cast(_T, new_coroutine)
1933
1934
1935class _Retry(Exception):
1936 "Retry in `_only_one_at_a_time`."
1937
1938
1939def indent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None:
1940 """
1941 Indent text of a :class:`.Buffer` object.
1942 """
1943 current_row = buffer.document.cursor_position_row
1944 current_col = buffer.document.cursor_position_col
1945 line_range = range(from_row, to_row)
1946
1947 # Apply transformation.
1948 indent_content = " " * count
1949 new_text = buffer.transform_lines(line_range, lambda l: indent_content + l)
1950 buffer.document = Document(
1951 new_text, Document(new_text).translate_row_col_to_index(current_row, 0)
1952 )
1953
1954 # Place cursor in the same position in text after indenting
1955 buffer.cursor_position += current_col + len(indent_content)
1956
1957
1958def unindent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None:
1959 """
1960 Unindent text of a :class:`.Buffer` object.
1961 """
1962 current_row = buffer.document.cursor_position_row
1963 current_col = buffer.document.cursor_position_col
1964 line_range = range(from_row, to_row)
1965
1966 indent_content = " " * count
1967
1968 def transform(text: str) -> str:
1969 remove = indent_content
1970 if text.startswith(remove):
1971 return text[len(remove) :]
1972 else:
1973 return text.lstrip()
1974
1975 # Apply transformation.
1976 new_text = buffer.transform_lines(line_range, transform)
1977 buffer.document = Document(
1978 new_text, Document(new_text).translate_row_col_to_index(current_row, 0)
1979 )
1980
1981 # Place cursor in the same position in text after dedent
1982 buffer.cursor_position += current_col - len(indent_content)
1983
1984
1985def reshape_text(buffer: Buffer, from_row: int, to_row: int) -> None:
1986 """
1987 Reformat text, taking the width into account.
1988 `to_row` is included.
1989 (Vi 'gq' operator.)
1990 """
1991 lines = buffer.text.splitlines(True)
1992 lines_before = lines[:from_row]
1993 lines_after = lines[to_row + 1 :]
1994 lines_to_reformat = lines[from_row : to_row + 1]
1995
1996 if lines_to_reformat:
1997 # Take indentation from the first line.
1998 match = re.search(r"^\s*", lines_to_reformat[0])
1999 length = match.end() if match else 0 # `match` can't be None, actually.
2000
2001 indent = lines_to_reformat[0][:length].replace("\n", "")
2002
2003 # Now, take all the 'words' from the lines to be reshaped.
2004 words = "".join(lines_to_reformat).split()
2005
2006 # And reshape.
2007 width = (buffer.text_width or 80) - len(indent)
2008 reshaped_text = [indent]
2009 current_width = 0
2010 for w in words:
2011 if current_width:
2012 if len(w) + current_width + 1 > width:
2013 reshaped_text.append("\n")
2014 reshaped_text.append(indent)
2015 current_width = 0
2016 else:
2017 reshaped_text.append(" ")
2018 current_width += 1
2019
2020 reshaped_text.append(w)
2021 current_width += len(w)
2022
2023 if reshaped_text[-1] != "\n":
2024 reshaped_text.append("\n")
2025
2026 # Apply result.
2027 buffer.document = Document(
2028 text="".join(lines_before + reshaped_text + lines_after),
2029 cursor_position=len("".join(lines_before + reshaped_text)),
2030 )