Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/buffer.py: 17%

811 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1""" 

2Data structures for the Buffer. 

3It holds the text, cursor position, history, etc... 

4""" 

5from __future__ import annotations 

6 

7import asyncio 

8import logging 

9import os 

10import re 

11import shlex 

12import shutil 

13import subprocess 

14import tempfile 

15from collections import deque 

16from enum import Enum 

17from functools import wraps 

18from typing import Any, Callable, Coroutine, Iterable, TypeVar, cast 

19 

20from .application.current import get_app 

21from .application.run_in_terminal import run_in_terminal 

22from .auto_suggest import AutoSuggest, Suggestion 

23from .cache import FastDictCache 

24from .clipboard import ClipboardData 

25from .completion import ( 

26 CompleteEvent, 

27 Completer, 

28 Completion, 

29 DummyCompleter, 

30 get_common_complete_suffix, 

31) 

32from .document import Document 

33from .eventloop import aclosing 

34from .filters import FilterOrBool, to_filter 

35from .history import History, InMemoryHistory 

36from .search import SearchDirection, SearchState 

37from .selection import PasteMode, SelectionState, SelectionType 

38from .utils import Event, to_str 

39from .validation import ValidationError, Validator 

40 

41__all__ = [ 

42 "EditReadOnlyBuffer", 

43 "Buffer", 

44 "CompletionState", 

45 "indent", 

46 "unindent", 

47 "reshape_text", 

48] 

49 

50logger = logging.getLogger(__name__) 

51 

52 

53class EditReadOnlyBuffer(Exception): 

54 "Attempt editing of read-only :class:`.Buffer`." 

55 

56 

57class ValidationState(Enum): 

58 "The validation state of a buffer. This is set after the validation." 

59 

60 VALID = "VALID" 

61 INVALID = "INVALID" 

62 UNKNOWN = "UNKNOWN" 

63 

64 

65class CompletionState: 

66 """ 

67 Immutable class that contains a completion state. 

68 """ 

69 

70 def __init__( 

71 self, 

72 original_document: Document, 

73 completions: list[Completion] | None = None, 

74 complete_index: int | None = None, 

75 ) -> None: 

76 #: Document as it was when the completion started. 

77 self.original_document = original_document 

78 

79 #: List of all the current Completion instances which are possible at 

80 #: this point. 

81 self.completions = completions or [] 

82 

83 #: Position in the `completions` array. 

84 #: This can be `None` to indicate "no completion", the original text. 

85 self.complete_index = complete_index # Position in the `_completions` array. 

86 

87 def __repr__(self) -> str: 

88 return "{}({!r}, <{!r}> completions, index={!r})".format( 

89 self.__class__.__name__, 

90 self.original_document, 

91 len(self.completions), 

92 self.complete_index, 

93 ) 

94 

95 def go_to_index(self, index: int | None) -> None: 

96 """ 

97 Create a new :class:`.CompletionState` object with the new index. 

98 

99 When `index` is `None` deselect the completion. 

100 """ 

101 if self.completions: 

102 assert index is None or 0 <= index < len(self.completions) 

103 self.complete_index = index 

104 

105 def new_text_and_position(self) -> tuple[str, int]: 

106 """ 

107 Return (new_text, new_cursor_position) for this completion. 

108 """ 

109 if self.complete_index is None: 

110 return self.original_document.text, self.original_document.cursor_position 

111 else: 

112 original_text_before_cursor = self.original_document.text_before_cursor 

113 original_text_after_cursor = self.original_document.text_after_cursor 

114 

115 c = self.completions[self.complete_index] 

116 if c.start_position == 0: 

117 before = original_text_before_cursor 

118 else: 

119 before = original_text_before_cursor[: c.start_position] 

120 

121 new_text = before + c.text + original_text_after_cursor 

122 new_cursor_position = len(before) + len(c.text) 

123 return new_text, new_cursor_position 

124 

125 @property 

126 def current_completion(self) -> Completion | None: 

127 """ 

128 Return the current completion, or return `None` when no completion is 

129 selected. 

130 """ 

131 if self.complete_index is not None: 

132 return self.completions[self.complete_index] 

133 return None 

134 

135 

136_QUOTED_WORDS_RE = re.compile(r"""(\s+|".*?"|'.*?')""") 

137 

138 

139class YankNthArgState: 

140 """ 

141 For yank-last-arg/yank-nth-arg: Keep track of where we are in the history. 

142 """ 

143 

144 def __init__( 

145 self, history_position: int = 0, n: int = -1, previous_inserted_word: str = "" 

146 ) -> None: 

147 self.history_position = history_position 

148 self.previous_inserted_word = previous_inserted_word 

149 self.n = n 

150 

151 def __repr__(self) -> str: 

152 return "{}(history_position={!r}, n={!r}, previous_inserted_word={!r})".format( 

153 self.__class__.__name__, 

154 self.history_position, 

155 self.n, 

156 self.previous_inserted_word, 

157 ) 

158 

159 

160BufferEventHandler = Callable[["Buffer"], None] 

161BufferAcceptHandler = Callable[["Buffer"], bool] 

162 

163 

164class Buffer: 

165 """ 

166 The core data structure that holds the text and cursor position of the 

167 current input line and implements all text manipulations on top of it. It 

168 also implements the history, undo stack and the completion state. 

169 

170 :param completer: :class:`~prompt_toolkit.completion.Completer` instance. 

171 :param history: :class:`~prompt_toolkit.history.History` instance. 

172 :param tempfile_suffix: The tempfile suffix (extension) to be used for the 

173 "open in editor" function. For a Python REPL, this would be ".py", so 

174 that the editor knows the syntax highlighting to use. This can also be 

175 a callable that returns a string. 

176 :param tempfile: For more advanced tempfile situations where you need 

177 control over the subdirectories and filename. For a Git Commit Message, 

178 this would be ".git/COMMIT_EDITMSG", so that the editor knows the syntax 

179 highlighting to use. This can also be a callable that returns a string. 

180 :param name: Name for this buffer. E.g. DEFAULT_BUFFER. This is mostly 

181 useful for key bindings where we sometimes prefer to refer to a buffer 

182 by their name instead of by reference. 

183 :param accept_handler: Called when the buffer input is accepted. (Usually 

184 when the user presses `enter`.) The accept handler receives this 

185 `Buffer` as input and should return True when the buffer text should be 

186 kept instead of calling reset. 

187 

188 In case of a `PromptSession` for instance, we want to keep the text, 

189 because we will exit the application, and only reset it during the next 

190 run. 

191 

192 Events: 

193 

194 :param on_text_changed: When the buffer text changes. (Callable or None.) 

195 :param on_text_insert: When new text is inserted. (Callable or None.) 

196 :param on_cursor_position_changed: When the cursor moves. (Callable or None.) 

197 :param on_completions_changed: When the completions were changed. (Callable or None.) 

198 :param on_suggestion_set: When an auto-suggestion text has been set. (Callable or None.) 

199 

200 Filters: 

201 

202 :param complete_while_typing: :class:`~prompt_toolkit.filters.Filter` 

203 or `bool`. Decide whether or not to do asynchronous autocompleting while 

204 typing. 

205 :param validate_while_typing: :class:`~prompt_toolkit.filters.Filter` 

206 or `bool`. Decide whether or not to do asynchronous validation while 

207 typing. 

208 :param enable_history_search: :class:`~prompt_toolkit.filters.Filter` or 

209 `bool` to indicate when up-arrow partial string matching is enabled. It 

210 is advised to not enable this at the same time as 

211 `complete_while_typing`, because when there is an autocompletion found, 

212 the up arrows usually browse through the completions, rather than 

213 through the history. 

214 :param read_only: :class:`~prompt_toolkit.filters.Filter`. When True, 

215 changes will not be allowed. 

216 :param multiline: :class:`~prompt_toolkit.filters.Filter` or `bool`. When 

217 not set, pressing `Enter` will call the `accept_handler`. Otherwise, 

218 pressing `Esc-Enter` is required. 

219 """ 

220 

221 def __init__( 

222 self, 

223 completer: Completer | None = None, 

224 auto_suggest: AutoSuggest | None = None, 

225 history: History | None = None, 

226 validator: Validator | None = None, 

227 tempfile_suffix: str | Callable[[], str] = "", 

228 tempfile: str | Callable[[], str] = "", 

229 name: str = "", 

230 complete_while_typing: FilterOrBool = False, 

231 validate_while_typing: FilterOrBool = False, 

232 enable_history_search: FilterOrBool = False, 

233 document: Document | None = None, 

234 accept_handler: BufferAcceptHandler | None = None, 

235 read_only: FilterOrBool = False, 

236 multiline: FilterOrBool = True, 

237 on_text_changed: BufferEventHandler | None = None, 

238 on_text_insert: BufferEventHandler | None = None, 

239 on_cursor_position_changed: BufferEventHandler | None = None, 

240 on_completions_changed: BufferEventHandler | None = None, 

241 on_suggestion_set: BufferEventHandler | None = None, 

242 ): 

243 # Accept both filters and booleans as input. 

244 enable_history_search = to_filter(enable_history_search) 

245 complete_while_typing = to_filter(complete_while_typing) 

246 validate_while_typing = to_filter(validate_while_typing) 

247 read_only = to_filter(read_only) 

248 multiline = to_filter(multiline) 

249 

250 self.completer = completer or DummyCompleter() 

251 self.auto_suggest = auto_suggest 

252 self.validator = validator 

253 self.tempfile_suffix = tempfile_suffix 

254 self.tempfile = tempfile 

255 self.name = name 

256 self.accept_handler = accept_handler 

257 

258 # Filters. (Usually, used by the key bindings to drive the buffer.) 

259 self.complete_while_typing = complete_while_typing 

260 self.validate_while_typing = validate_while_typing 

261 self.enable_history_search = enable_history_search 

262 self.read_only = read_only 

263 self.multiline = multiline 

264 

265 # Text width. (For wrapping, used by the Vi 'gq' operator.) 

266 self.text_width = 0 

267 

268 #: The command buffer history. 

269 # Note that we shouldn't use a lazy 'or' here. bool(history) could be 

270 # False when empty. 

271 self.history = InMemoryHistory() if history is None else history 

272 

273 self.__cursor_position = 0 

274 

275 # Events 

276 self.on_text_changed: Event[Buffer] = Event(self, on_text_changed) 

277 self.on_text_insert: Event[Buffer] = Event(self, on_text_insert) 

278 self.on_cursor_position_changed: Event[Buffer] = Event( 

279 self, on_cursor_position_changed 

280 ) 

281 self.on_completions_changed: Event[Buffer] = Event(self, on_completions_changed) 

282 self.on_suggestion_set: Event[Buffer] = Event(self, on_suggestion_set) 

283 

284 # Document cache. (Avoid creating new Document instances.) 

285 self._document_cache: FastDictCache[ 

286 tuple[str, int, SelectionState | None], Document 

287 ] = FastDictCache(Document, size=10) 

288 

289 # Create completer / auto suggestion / validation coroutines. 

290 self._async_suggester = self._create_auto_suggest_coroutine() 

291 self._async_completer = self._create_completer_coroutine() 

292 self._async_validator = self._create_auto_validate_coroutine() 

293 

294 # Asyncio task for populating the history. 

295 self._load_history_task: asyncio.Future[None] | None = None 

296 

297 # Reset other attributes. 

298 self.reset(document=document) 

299 

300 def __repr__(self) -> str: 

301 if len(self.text) < 15: 

302 text = self.text 

303 else: 

304 text = self.text[:12] + "..." 

305 

306 return f"<Buffer(name={self.name!r}, text={text!r}) at {id(self)!r}>" 

307 

308 def reset( 

309 self, document: Document | None = None, append_to_history: bool = False 

310 ) -> None: 

311 """ 

312 :param append_to_history: Append current input to history first. 

313 """ 

314 if append_to_history: 

315 self.append_to_history() 

316 

317 document = document or Document() 

318 

319 self.__cursor_position = document.cursor_position 

320 

321 # `ValidationError` instance. (Will be set when the input is wrong.) 

322 self.validation_error: ValidationError | None = None 

323 self.validation_state: ValidationState | None = ValidationState.UNKNOWN 

324 

325 # State of the selection. 

326 self.selection_state: SelectionState | None = None 

327 

328 # Multiple cursor mode. (When we press 'I' or 'A' in visual-block mode, 

329 # we can insert text on multiple lines at once. This is implemented by 

330 # using multiple cursors.) 

331 self.multiple_cursor_positions: list[int] = [] 

332 

333 # When doing consecutive up/down movements, prefer to stay at this column. 

334 self.preferred_column: int | None = None 

335 

336 # State of complete browser 

337 # For interactive completion through Ctrl-N/Ctrl-P. 

338 self.complete_state: CompletionState | None = None 

339 

340 # State of Emacs yank-nth-arg completion. 

341 self.yank_nth_arg_state: YankNthArgState | None = None # for yank-nth-arg. 

342 

343 # Remember the document that we had *right before* the last paste 

344 # operation. This is used for rotating through the kill ring. 

345 self.document_before_paste: Document | None = None 

346 

347 # Current suggestion. 

348 self.suggestion: Suggestion | None = None 

349 

350 # The history search text. (Used for filtering the history when we 

351 # browse through it.) 

352 self.history_search_text: str | None = None 

353 

354 # Undo/redo stacks (stack of `(text, cursor_position)`). 

355 self._undo_stack: list[tuple[str, int]] = [] 

356 self._redo_stack: list[tuple[str, int]] = [] 

357 

358 # Cancel history loader. If history loading was still ongoing. 

359 # Cancel the `_load_history_task`, so that next repaint of the 

360 # `BufferControl` we will repopulate it. 

361 if self._load_history_task is not None: 

362 self._load_history_task.cancel() 

363 self._load_history_task = None 

364 

365 #: The working lines. Similar to history, except that this can be 

366 #: modified. The user can press arrow_up and edit previous entries. 

367 #: Ctrl-C should reset this, and copy the whole history back in here. 

368 #: Enter should process the current command and append to the real 

369 #: history. 

370 self._working_lines: deque[str] = deque([document.text]) 

371 self.__working_index = 0 

372 

373 def load_history_if_not_yet_loaded(self) -> None: 

374 """ 

375 Create task for populating the buffer history (if not yet done). 

376 

377 Note:: 

378 

379 This needs to be called from within the event loop of the 

380 application, because history loading is async, and we need to be 

381 sure the right event loop is active. Therefor, we call this method 

382 in the `BufferControl.create_content`. 

383 

384 There are situations where prompt_toolkit applications are created 

385 in one thread, but will later run in a different thread (Ptpython 

386 is one example. The REPL runs in a separate thread, in order to 

387 prevent interfering with a potential different event loop in the 

388 main thread. The REPL UI however is still created in the main 

389 thread.) We could decide to not support creating prompt_toolkit 

390 objects in one thread and running the application in a different 

391 thread, but history loading is the only place where it matters, and 

392 this solves it. 

393 """ 

394 if self._load_history_task is None: 

395 

396 async def load_history() -> None: 

397 async for item in self.history.load(): 

398 self._working_lines.appendleft(item) 

399 self.__working_index += 1 

400 

401 self._load_history_task = get_app().create_background_task(load_history()) 

402 

403 def load_history_done(f: asyncio.Future[None]) -> None: 

404 """ 

405 Handle `load_history` result when either done, cancelled, or 

406 when an exception was raised. 

407 """ 

408 try: 

409 f.result() 

410 except asyncio.CancelledError: 

411 # Ignore cancellation. But handle it, so that we don't get 

412 # this traceback. 

413 pass 

414 except GeneratorExit: 

415 # Probably not needed, but we had situations where 

416 # `GeneratorExit` was raised in `load_history` during 

417 # cancellation. 

418 pass 

419 except BaseException: 

420 # Log error if something goes wrong. (We don't have a 

421 # caller to which we can propagate this exception.) 

422 logger.exception("Loading history failed") 

423 

424 self._load_history_task.add_done_callback(load_history_done) 

425 

426 # <getters/setters> 

427 

428 def _set_text(self, value: str) -> bool: 

429 """set text at current working_index. Return whether it changed.""" 

430 working_index = self.working_index 

431 working_lines = self._working_lines 

432 

433 original_value = working_lines[working_index] 

434 working_lines[working_index] = value 

435 

436 # Return True when this text has been changed. 

437 if len(value) != len(original_value): 

438 # For Python 2, it seems that when two strings have a different 

439 # length and one is a prefix of the other, Python still scans 

440 # character by character to see whether the strings are different. 

441 # (Some benchmarking showed significant differences for big 

442 # documents. >100,000 of lines.) 

443 return True 

444 elif value != original_value: 

445 return True 

446 return False 

447 

448 def _set_cursor_position(self, value: int) -> bool: 

449 """Set cursor position. Return whether it changed.""" 

450 original_position = self.__cursor_position 

451 self.__cursor_position = max(0, value) 

452 

453 return self.__cursor_position != original_position 

454 

455 @property 

456 def text(self) -> str: 

457 return self._working_lines[self.working_index] 

458 

459 @text.setter 

460 def text(self, value: str) -> None: 

461 """ 

462 Setting text. (When doing this, make sure that the cursor_position is 

463 valid for this text. text/cursor_position should be consistent at any time, 

464 otherwise set a Document instead.) 

465 """ 

466 # Ensure cursor position remains within the size of the text. 

467 if self.cursor_position > len(value): 

468 self.cursor_position = len(value) 

469 

470 # Don't allow editing of read-only buffers. 

471 if self.read_only(): 

472 raise EditReadOnlyBuffer() 

473 

474 changed = self._set_text(value) 

475 

476 if changed: 

477 self._text_changed() 

478 

479 # Reset history search text. 

480 # (Note that this doesn't need to happen when working_index 

481 # changes, which is when we traverse the history. That's why we 

482 # don't do this in `self._text_changed`.) 

483 self.history_search_text = None 

484 

485 @property 

486 def cursor_position(self) -> int: 

487 return self.__cursor_position 

488 

489 @cursor_position.setter 

490 def cursor_position(self, value: int) -> None: 

491 """ 

492 Setting cursor position. 

493 """ 

494 assert isinstance(value, int) 

495 

496 # Ensure cursor position is within the size of the text. 

497 if value > len(self.text): 

498 value = len(self.text) 

499 if value < 0: 

500 value = 0 

501 

502 changed = self._set_cursor_position(value) 

503 

504 if changed: 

505 self._cursor_position_changed() 

506 

507 @property 

508 def working_index(self) -> int: 

509 return self.__working_index 

510 

511 @working_index.setter 

512 def working_index(self, value: int) -> None: 

513 if self.__working_index != value: 

514 self.__working_index = value 

515 # Make sure to reset the cursor position, otherwise we end up in 

516 # situations where the cursor position is out of the bounds of the 

517 # text. 

518 self.cursor_position = 0 

519 self._text_changed() 

520 

521 def _text_changed(self) -> None: 

522 # Remove any validation errors and complete state. 

523 self.validation_error = None 

524 self.validation_state = ValidationState.UNKNOWN 

525 self.complete_state = None 

526 self.yank_nth_arg_state = None 

527 self.document_before_paste = None 

528 self.selection_state = None 

529 self.suggestion = None 

530 self.preferred_column = None 

531 

532 # fire 'on_text_changed' event. 

533 self.on_text_changed.fire() 

534 

535 # Input validation. 

536 # (This happens on all change events, unlike auto completion, also when 

537 # deleting text.) 

538 if self.validator and self.validate_while_typing(): 

539 get_app().create_background_task(self._async_validator()) 

540 

541 def _cursor_position_changed(self) -> None: 

542 # Remove any complete state. 

543 # (Input validation should only be undone when the cursor position 

544 # changes.) 

545 self.complete_state = None 

546 self.yank_nth_arg_state = None 

547 self.document_before_paste = None 

548 

549 # Unset preferred_column. (Will be set after the cursor movement, if 

550 # required.) 

551 self.preferred_column = None 

552 

553 # Note that the cursor position can change if we have a selection the 

554 # new position of the cursor determines the end of the selection. 

555 

556 # fire 'on_cursor_position_changed' event. 

557 self.on_cursor_position_changed.fire() 

558 

559 @property 

560 def document(self) -> Document: 

561 """ 

562 Return :class:`~prompt_toolkit.document.Document` instance from the 

563 current text, cursor position and selection state. 

564 """ 

565 return self._document_cache[ 

566 self.text, self.cursor_position, self.selection_state 

567 ] 

568 

569 @document.setter 

570 def document(self, value: Document) -> None: 

571 """ 

572 Set :class:`~prompt_toolkit.document.Document` instance. 

573 

574 This will set both the text and cursor position at the same time, but 

575 atomically. (Change events will be triggered only after both have been set.) 

576 """ 

577 self.set_document(value) 

578 

579 def set_document(self, value: Document, bypass_readonly: bool = False) -> None: 

580 """ 

581 Set :class:`~prompt_toolkit.document.Document` instance. Like the 

582 ``document`` property, but accept an ``bypass_readonly`` argument. 

583 

584 :param bypass_readonly: When True, don't raise an 

585 :class:`.EditReadOnlyBuffer` exception, even 

586 when the buffer is read-only. 

587 

588 .. warning:: 

589 

590 When this buffer is read-only and `bypass_readonly` was not passed, 

591 the `EditReadOnlyBuffer` exception will be caught by the 

592 `KeyProcessor` and is silently suppressed. This is important to 

593 keep in mind when writing key bindings, because it won't do what 

594 you expect, and there won't be a stack trace. Use try/finally 

595 around this function if you need some cleanup code. 

596 """ 

597 # Don't allow editing of read-only buffers. 

598 if not bypass_readonly and self.read_only(): 

599 raise EditReadOnlyBuffer() 

600 

601 # Set text and cursor position first. 

602 text_changed = self._set_text(value.text) 

603 cursor_position_changed = self._set_cursor_position(value.cursor_position) 

604 

605 # Now handle change events. (We do this when text/cursor position is 

606 # both set and consistent.) 

607 if text_changed: 

608 self._text_changed() 

609 self.history_search_text = None 

610 

611 if cursor_position_changed: 

612 self._cursor_position_changed() 

613 

614 @property 

615 def is_returnable(self) -> bool: 

616 """ 

617 True when there is something handling accept. 

618 """ 

619 return bool(self.accept_handler) 

620 

621 # End of <getters/setters> 

622 

623 def save_to_undo_stack(self, clear_redo_stack: bool = True) -> None: 

624 """ 

625 Safe current state (input text and cursor position), so that we can 

626 restore it by calling undo. 

627 """ 

628 # Safe if the text is different from the text at the top of the stack 

629 # is different. If the text is the same, just update the cursor position. 

630 if self._undo_stack and self._undo_stack[-1][0] == self.text: 

631 self._undo_stack[-1] = (self._undo_stack[-1][0], self.cursor_position) 

632 else: 

633 self._undo_stack.append((self.text, self.cursor_position)) 

634 

635 # Saving anything to the undo stack, clears the redo stack. 

636 if clear_redo_stack: 

637 self._redo_stack = [] 

638 

639 def transform_lines( 

640 self, 

641 line_index_iterator: Iterable[int], 

642 transform_callback: Callable[[str], str], 

643 ) -> str: 

644 """ 

645 Transforms the text on a range of lines. 

646 When the iterator yield an index not in the range of lines that the 

647 document contains, it skips them silently. 

648 

649 To uppercase some lines:: 

650 

651 new_text = transform_lines(range(5,10), lambda text: text.upper()) 

652 

653 :param line_index_iterator: Iterator of line numbers (int) 

654 :param transform_callback: callable that takes the original text of a 

655 line, and return the new text for this line. 

656 

657 :returns: The new text. 

658 """ 

659 # Split lines 

660 lines = self.text.split("\n") 

661 

662 # Apply transformation 

663 for index in line_index_iterator: 

664 try: 

665 lines[index] = transform_callback(lines[index]) 

666 except IndexError: 

667 pass 

668 

669 return "\n".join(lines) 

670 

671 def transform_current_line(self, transform_callback: Callable[[str], str]) -> None: 

672 """ 

673 Apply the given transformation function to the current line. 

674 

675 :param transform_callback: callable that takes a string and return a new string. 

676 """ 

677 document = self.document 

678 a = document.cursor_position + document.get_start_of_line_position() 

679 b = document.cursor_position + document.get_end_of_line_position() 

680 self.text = ( 

681 document.text[:a] 

682 + transform_callback(document.text[a:b]) 

683 + document.text[b:] 

684 ) 

685 

686 def transform_region( 

687 self, from_: int, to: int, transform_callback: Callable[[str], str] 

688 ) -> None: 

689 """ 

690 Transform a part of the input string. 

691 

692 :param from_: (int) start position. 

693 :param to: (int) end position. 

694 :param transform_callback: Callable which accepts a string and returns 

695 the transformed string. 

696 """ 

697 assert from_ < to 

698 

699 self.text = "".join( 

700 [ 

701 self.text[:from_] 

702 + transform_callback(self.text[from_:to]) 

703 + self.text[to:] 

704 ] 

705 ) 

706 

707 def cursor_left(self, count: int = 1) -> None: 

708 self.cursor_position += self.document.get_cursor_left_position(count=count) 

709 

710 def cursor_right(self, count: int = 1) -> None: 

711 self.cursor_position += self.document.get_cursor_right_position(count=count) 

712 

713 def cursor_up(self, count: int = 1) -> None: 

714 """(for multiline edit). Move cursor to the previous line.""" 

715 original_column = self.preferred_column or self.document.cursor_position_col 

716 self.cursor_position += self.document.get_cursor_up_position( 

717 count=count, preferred_column=original_column 

718 ) 

719 

720 # Remember the original column for the next up/down movement. 

721 self.preferred_column = original_column 

722 

723 def cursor_down(self, count: int = 1) -> None: 

724 """(for multiline edit). Move cursor to the next line.""" 

725 original_column = self.preferred_column or self.document.cursor_position_col 

726 self.cursor_position += self.document.get_cursor_down_position( 

727 count=count, preferred_column=original_column 

728 ) 

729 

730 # Remember the original column for the next up/down movement. 

731 self.preferred_column = original_column 

732 

733 def auto_up( 

734 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False 

735 ) -> None: 

736 """ 

737 If we're not on the first line (of a multiline input) go a line up, 

738 otherwise go back in history. (If nothing is selected.) 

739 """ 

740 if self.complete_state: 

741 self.complete_previous(count=count) 

742 elif self.document.cursor_position_row > 0: 

743 self.cursor_up(count=count) 

744 elif not self.selection_state: 

745 self.history_backward(count=count) 

746 

747 # Go to the start of the line? 

748 if go_to_start_of_line_if_history_changes: 

749 self.cursor_position += self.document.get_start_of_line_position() 

750 

751 def auto_down( 

752 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False 

753 ) -> None: 

754 """ 

755 If we're not on the last line (of a multiline input) go a line down, 

756 otherwise go forward in history. (If nothing is selected.) 

757 """ 

758 if self.complete_state: 

759 self.complete_next(count=count) 

760 elif self.document.cursor_position_row < self.document.line_count - 1: 

761 self.cursor_down(count=count) 

762 elif not self.selection_state: 

763 self.history_forward(count=count) 

764 

765 # Go to the start of the line? 

766 if go_to_start_of_line_if_history_changes: 

767 self.cursor_position += self.document.get_start_of_line_position() 

768 

769 def delete_before_cursor(self, count: int = 1) -> str: 

770 """ 

771 Delete specified number of characters before cursor and return the 

772 deleted text. 

773 """ 

774 assert count >= 0 

775 deleted = "" 

776 

777 if self.cursor_position > 0: 

778 deleted = self.text[self.cursor_position - count : self.cursor_position] 

779 

780 new_text = ( 

781 self.text[: self.cursor_position - count] 

782 + self.text[self.cursor_position :] 

783 ) 

784 new_cursor_position = self.cursor_position - len(deleted) 

785 

786 # Set new Document atomically. 

787 self.document = Document(new_text, new_cursor_position) 

788 

789 return deleted 

790 

791 def delete(self, count: int = 1) -> str: 

792 """ 

793 Delete specified number of characters and Return the deleted text. 

794 """ 

795 if self.cursor_position < len(self.text): 

796 deleted = self.document.text_after_cursor[:count] 

797 self.text = ( 

798 self.text[: self.cursor_position] 

799 + self.text[self.cursor_position + len(deleted) :] 

800 ) 

801 return deleted 

802 else: 

803 return "" 

804 

805 def join_next_line(self, separator: str = " ") -> None: 

806 """ 

807 Join the next line to the current one by deleting the line ending after 

808 the current line. 

809 """ 

810 if not self.document.on_last_line: 

811 self.cursor_position += self.document.get_end_of_line_position() 

812 self.delete() 

813 

814 # Remove spaces. 

815 self.text = ( 

816 self.document.text_before_cursor 

817 + separator 

818 + self.document.text_after_cursor.lstrip(" ") 

819 ) 

820 

821 def join_selected_lines(self, separator: str = " ") -> None: 

822 """ 

823 Join the selected lines. 

824 """ 

825 assert self.selection_state 

826 

827 # Get lines. 

828 from_, to = sorted( 

829 [self.cursor_position, self.selection_state.original_cursor_position] 

830 ) 

831 

832 before = self.text[:from_] 

833 lines = self.text[from_:to].splitlines() 

834 after = self.text[to:] 

835 

836 # Replace leading spaces with just one space. 

837 lines = [l.lstrip(" ") + separator for l in lines] 

838 

839 # Set new document. 

840 self.document = Document( 

841 text=before + "".join(lines) + after, 

842 cursor_position=len(before + "".join(lines[:-1])) - 1, 

843 ) 

844 

845 def swap_characters_before_cursor(self) -> None: 

846 """ 

847 Swap the last two characters before the cursor. 

848 """ 

849 pos = self.cursor_position 

850 

851 if pos >= 2: 

852 a = self.text[pos - 2] 

853 b = self.text[pos - 1] 

854 

855 self.text = self.text[: pos - 2] + b + a + self.text[pos:] 

856 

857 def go_to_history(self, index: int) -> None: 

858 """ 

859 Go to this item in the history. 

860 """ 

861 if index < len(self._working_lines): 

862 self.working_index = index 

863 self.cursor_position = len(self.text) 

864 

865 def complete_next(self, count: int = 1, disable_wrap_around: bool = False) -> None: 

866 """ 

867 Browse to the next completions. 

868 (Does nothing if there are no completion.) 

869 """ 

870 index: int | None 

871 

872 if self.complete_state: 

873 completions_count = len(self.complete_state.completions) 

874 

875 if self.complete_state.complete_index is None: 

876 index = 0 

877 elif self.complete_state.complete_index == completions_count - 1: 

878 index = None 

879 

880 if disable_wrap_around: 

881 return 

882 else: 

883 index = min( 

884 completions_count - 1, self.complete_state.complete_index + count 

885 ) 

886 self.go_to_completion(index) 

887 

888 def complete_previous( 

889 self, count: int = 1, disable_wrap_around: bool = False 

890 ) -> None: 

891 """ 

892 Browse to the previous completions. 

893 (Does nothing if there are no completion.) 

894 """ 

895 index: int | None 

896 

897 if self.complete_state: 

898 if self.complete_state.complete_index == 0: 

899 index = None 

900 

901 if disable_wrap_around: 

902 return 

903 elif self.complete_state.complete_index is None: 

904 index = len(self.complete_state.completions) - 1 

905 else: 

906 index = max(0, self.complete_state.complete_index - count) 

907 

908 self.go_to_completion(index) 

909 

910 def cancel_completion(self) -> None: 

911 """ 

912 Cancel completion, go back to the original text. 

913 """ 

914 if self.complete_state: 

915 self.go_to_completion(None) 

916 self.complete_state = None 

917 

918 def _set_completions(self, completions: list[Completion]) -> CompletionState: 

919 """ 

920 Start completions. (Generate list of completions and initialize.) 

921 

922 By default, no completion will be selected. 

923 """ 

924 self.complete_state = CompletionState( 

925 original_document=self.document, completions=completions 

926 ) 

927 

928 # Trigger event. This should eventually invalidate the layout. 

929 self.on_completions_changed.fire() 

930 

931 return self.complete_state 

932 

933 def start_history_lines_completion(self) -> None: 

934 """ 

935 Start a completion based on all the other lines in the document and the 

936 history. 

937 """ 

938 found_completions: set[str] = set() 

939 completions = [] 

940 

941 # For every line of the whole history, find matches with the current line. 

942 current_line = self.document.current_line_before_cursor.lstrip() 

943 

944 for i, string in enumerate(self._working_lines): 

945 for j, l in enumerate(string.split("\n")): 

946 l = l.strip() 

947 if l and l.startswith(current_line): 

948 # When a new line has been found. 

949 if l not in found_completions: 

950 found_completions.add(l) 

951 

952 # Create completion. 

953 if i == self.working_index: 

954 display_meta = "Current, line %s" % (j + 1) 

955 else: 

956 display_meta = f"History {i + 1}, line {j + 1}" 

957 

958 completions.append( 

959 Completion( 

960 text=l, 

961 start_position=-len(current_line), 

962 display_meta=display_meta, 

963 ) 

964 ) 

965 

966 self._set_completions(completions=completions[::-1]) 

967 self.go_to_completion(0) 

968 

969 def go_to_completion(self, index: int | None) -> None: 

970 """ 

971 Select a completion from the list of current completions. 

972 """ 

973 assert self.complete_state 

974 

975 # Set new completion 

976 state = self.complete_state 

977 state.go_to_index(index) 

978 

979 # Set text/cursor position 

980 new_text, new_cursor_position = state.new_text_and_position() 

981 self.document = Document(new_text, new_cursor_position) 

982 

983 # (changing text/cursor position will unset complete_state.) 

984 self.complete_state = state 

985 

986 def apply_completion(self, completion: Completion) -> None: 

987 """ 

988 Insert a given completion. 

989 """ 

990 # If there was already a completion active, cancel that one. 

991 if self.complete_state: 

992 self.go_to_completion(None) 

993 self.complete_state = None 

994 

995 # Insert text from the given completion. 

996 self.delete_before_cursor(-completion.start_position) 

997 self.insert_text(completion.text) 

998 

999 def _set_history_search(self) -> None: 

1000 """ 

1001 Set `history_search_text`. 

1002 (The text before the cursor will be used for filtering the history.) 

1003 """ 

1004 if self.enable_history_search(): 

1005 if self.history_search_text is None: 

1006 self.history_search_text = self.document.text_before_cursor 

1007 else: 

1008 self.history_search_text = None 

1009 

1010 def _history_matches(self, i: int) -> bool: 

1011 """ 

1012 True when the current entry matches the history search. 

1013 (when we don't have history search, it's also True.) 

1014 """ 

1015 return self.history_search_text is None or self._working_lines[i].startswith( 

1016 self.history_search_text 

1017 ) 

1018 

1019 def history_forward(self, count: int = 1) -> None: 

1020 """ 

1021 Move forwards through the history. 

1022 

1023 :param count: Amount of items to move forward. 

1024 """ 

1025 self._set_history_search() 

1026 

1027 # Go forward in history. 

1028 found_something = False 

1029 

1030 for i in range(self.working_index + 1, len(self._working_lines)): 

1031 if self._history_matches(i): 

1032 self.working_index = i 

1033 count -= 1 

1034 found_something = True 

1035 if count == 0: 

1036 break 

1037 

1038 # If we found an entry, move cursor to the end of the first line. 

1039 if found_something: 

1040 self.cursor_position = 0 

1041 self.cursor_position += self.document.get_end_of_line_position() 

1042 

1043 def history_backward(self, count: int = 1) -> None: 

1044 """ 

1045 Move backwards through history. 

1046 """ 

1047 self._set_history_search() 

1048 

1049 # Go back in history. 

1050 found_something = False 

1051 

1052 for i in range(self.working_index - 1, -1, -1): 

1053 if self._history_matches(i): 

1054 self.working_index = i 

1055 count -= 1 

1056 found_something = True 

1057 if count == 0: 

1058 break 

1059 

1060 # If we move to another entry, move cursor to the end of the line. 

1061 if found_something: 

1062 self.cursor_position = len(self.text) 

1063 

1064 def yank_nth_arg(self, n: int | None = None, _yank_last_arg: bool = False) -> None: 

1065 """ 

1066 Pick nth word from previous history entry (depending on current 

1067 `yank_nth_arg_state`) and insert it at current position. Rotate through 

1068 history if called repeatedly. If no `n` has been given, take the first 

1069 argument. (The second word.) 

1070 

1071 :param n: (None or int), The index of the word from the previous line 

1072 to take. 

1073 """ 

1074 assert n is None or isinstance(n, int) 

1075 history_strings = self.history.get_strings() 

1076 

1077 if not len(history_strings): 

1078 return 

1079 

1080 # Make sure we have a `YankNthArgState`. 

1081 if self.yank_nth_arg_state is None: 

1082 state = YankNthArgState(n=-1 if _yank_last_arg else 1) 

1083 else: 

1084 state = self.yank_nth_arg_state 

1085 

1086 if n is not None: 

1087 state.n = n 

1088 

1089 # Get new history position. 

1090 new_pos = state.history_position - 1 

1091 if -new_pos > len(history_strings): 

1092 new_pos = -1 

1093 

1094 # Take argument from line. 

1095 line = history_strings[new_pos] 

1096 

1097 words = [w.strip() for w in _QUOTED_WORDS_RE.split(line)] 

1098 words = [w for w in words if w] 

1099 try: 

1100 word = words[state.n] 

1101 except IndexError: 

1102 word = "" 

1103 

1104 # Insert new argument. 

1105 if state.previous_inserted_word: 

1106 self.delete_before_cursor(len(state.previous_inserted_word)) 

1107 self.insert_text(word) 

1108 

1109 # Save state again for next completion. (Note that the 'insert' 

1110 # operation from above clears `self.yank_nth_arg_state`.) 

1111 state.previous_inserted_word = word 

1112 state.history_position = new_pos 

1113 self.yank_nth_arg_state = state 

1114 

1115 def yank_last_arg(self, n: int | None = None) -> None: 

1116 """ 

1117 Like `yank_nth_arg`, but if no argument has been given, yank the last 

1118 word by default. 

1119 """ 

1120 self.yank_nth_arg(n=n, _yank_last_arg=True) 

1121 

1122 def start_selection( 

1123 self, selection_type: SelectionType = SelectionType.CHARACTERS 

1124 ) -> None: 

1125 """ 

1126 Take the current cursor position as the start of this selection. 

1127 """ 

1128 self.selection_state = SelectionState(self.cursor_position, selection_type) 

1129 

1130 def copy_selection(self, _cut: bool = False) -> ClipboardData: 

1131 """ 

1132 Copy selected text and return :class:`.ClipboardData` instance. 

1133 

1134 Notice that this doesn't store the copied data on the clipboard yet. 

1135 You can store it like this: 

1136 

1137 .. code:: python 

1138 

1139 data = buffer.copy_selection() 

1140 get_app().clipboard.set_data(data) 

1141 """ 

1142 new_document, clipboard_data = self.document.cut_selection() 

1143 if _cut: 

1144 self.document = new_document 

1145 

1146 self.selection_state = None 

1147 return clipboard_data 

1148 

1149 def cut_selection(self) -> ClipboardData: 

1150 """ 

1151 Delete selected text and return :class:`.ClipboardData` instance. 

1152 """ 

1153 return self.copy_selection(_cut=True) 

1154 

1155 def paste_clipboard_data( 

1156 self, 

1157 data: ClipboardData, 

1158 paste_mode: PasteMode = PasteMode.EMACS, 

1159 count: int = 1, 

1160 ) -> None: 

1161 """ 

1162 Insert the data from the clipboard. 

1163 """ 

1164 assert isinstance(data, ClipboardData) 

1165 assert paste_mode in (PasteMode.VI_BEFORE, PasteMode.VI_AFTER, PasteMode.EMACS) 

1166 

1167 original_document = self.document 

1168 self.document = self.document.paste_clipboard_data( 

1169 data, paste_mode=paste_mode, count=count 

1170 ) 

1171 

1172 # Remember original document. This assignment should come at the end, 

1173 # because assigning to 'document' will erase it. 

1174 self.document_before_paste = original_document 

1175 

1176 def newline(self, copy_margin: bool = True) -> None: 

1177 """ 

1178 Insert a line ending at the current position. 

1179 """ 

1180 if copy_margin: 

1181 self.insert_text("\n" + self.document.leading_whitespace_in_current_line) 

1182 else: 

1183 self.insert_text("\n") 

1184 

1185 def insert_line_above(self, copy_margin: bool = True) -> None: 

1186 """ 

1187 Insert a new line above the current one. 

1188 """ 

1189 if copy_margin: 

1190 insert = self.document.leading_whitespace_in_current_line + "\n" 

1191 else: 

1192 insert = "\n" 

1193 

1194 self.cursor_position += self.document.get_start_of_line_position() 

1195 self.insert_text(insert) 

1196 self.cursor_position -= 1 

1197 

1198 def insert_line_below(self, copy_margin: bool = True) -> None: 

1199 """ 

1200 Insert a new line below the current one. 

1201 """ 

1202 if copy_margin: 

1203 insert = "\n" + self.document.leading_whitespace_in_current_line 

1204 else: 

1205 insert = "\n" 

1206 

1207 self.cursor_position += self.document.get_end_of_line_position() 

1208 self.insert_text(insert) 

1209 

1210 def insert_text( 

1211 self, 

1212 data: str, 

1213 overwrite: bool = False, 

1214 move_cursor: bool = True, 

1215 fire_event: bool = True, 

1216 ) -> None: 

1217 """ 

1218 Insert characters at cursor position. 

1219 

1220 :param fire_event: Fire `on_text_insert` event. This is mainly used to 

1221 trigger autocompletion while typing. 

1222 """ 

1223 # Original text & cursor position. 

1224 otext = self.text 

1225 ocpos = self.cursor_position 

1226 

1227 # In insert/text mode. 

1228 if overwrite: 

1229 # Don't overwrite the newline itself. Just before the line ending, 

1230 # it should act like insert mode. 

1231 overwritten_text = otext[ocpos : ocpos + len(data)] 

1232 if "\n" in overwritten_text: 

1233 overwritten_text = overwritten_text[: overwritten_text.find("\n")] 

1234 

1235 text = otext[:ocpos] + data + otext[ocpos + len(overwritten_text) :] 

1236 else: 

1237 text = otext[:ocpos] + data + otext[ocpos:] 

1238 

1239 if move_cursor: 

1240 cpos = self.cursor_position + len(data) 

1241 else: 

1242 cpos = self.cursor_position 

1243 

1244 # Set new document. 

1245 # (Set text and cursor position at the same time. Otherwise, setting 

1246 # the text will fire a change event before the cursor position has been 

1247 # set. It works better to have this atomic.) 

1248 self.document = Document(text, cpos) 

1249 

1250 # Fire 'on_text_insert' event. 

1251 if fire_event: # XXX: rename to `start_complete`. 

1252 self.on_text_insert.fire() 

1253 

1254 # Only complete when "complete_while_typing" is enabled. 

1255 if self.completer and self.complete_while_typing(): 

1256 get_app().create_background_task(self._async_completer()) 

1257 

1258 # Call auto_suggest. 

1259 if self.auto_suggest: 

1260 get_app().create_background_task(self._async_suggester()) 

1261 

1262 def undo(self) -> None: 

1263 # Pop from the undo-stack until we find a text that if different from 

1264 # the current text. (The current logic of `save_to_undo_stack` will 

1265 # cause that the top of the undo stack is usually the same as the 

1266 # current text, so in that case we have to pop twice.) 

1267 while self._undo_stack: 

1268 text, pos = self._undo_stack.pop() 

1269 

1270 if text != self.text: 

1271 # Push current text to redo stack. 

1272 self._redo_stack.append((self.text, self.cursor_position)) 

1273 

1274 # Set new text/cursor_position. 

1275 self.document = Document(text, cursor_position=pos) 

1276 break 

1277 

1278 def redo(self) -> None: 

1279 if self._redo_stack: 

1280 # Copy current state on undo stack. 

1281 self.save_to_undo_stack(clear_redo_stack=False) 

1282 

1283 # Pop state from redo stack. 

1284 text, pos = self._redo_stack.pop() 

1285 self.document = Document(text, cursor_position=pos) 

1286 

1287 def validate(self, set_cursor: bool = False) -> bool: 

1288 """ 

1289 Returns `True` if valid. 

1290 

1291 :param set_cursor: Set the cursor position, if an error was found. 

1292 """ 

1293 # Don't call the validator again, if it was already called for the 

1294 # current input. 

1295 if self.validation_state != ValidationState.UNKNOWN: 

1296 return self.validation_state == ValidationState.VALID 

1297 

1298 # Call validator. 

1299 if self.validator: 

1300 try: 

1301 self.validator.validate(self.document) 

1302 except ValidationError as e: 

1303 # Set cursor position (don't allow invalid values.) 

1304 if set_cursor: 

1305 self.cursor_position = min( 

1306 max(0, e.cursor_position), len(self.text) 

1307 ) 

1308 

1309 self.validation_state = ValidationState.INVALID 

1310 self.validation_error = e 

1311 return False 

1312 

1313 # Handle validation result. 

1314 self.validation_state = ValidationState.VALID 

1315 self.validation_error = None 

1316 return True 

1317 

1318 async def _validate_async(self) -> None: 

1319 """ 

1320 Asynchronous version of `validate()`. 

1321 This one doesn't set the cursor position. 

1322 

1323 We have both variants, because a synchronous version is required. 

1324 Handling the ENTER key needs to be completely synchronous, otherwise 

1325 stuff like type-ahead is going to give very weird results. (People 

1326 could type input while the ENTER key is still processed.) 

1327 

1328 An asynchronous version is required if we have `validate_while_typing` 

1329 enabled. 

1330 """ 

1331 while True: 

1332 # Don't call the validator again, if it was already called for the 

1333 # current input. 

1334 if self.validation_state != ValidationState.UNKNOWN: 

1335 return 

1336 

1337 # Call validator. 

1338 error = None 

1339 document = self.document 

1340 

1341 if self.validator: 

1342 try: 

1343 await self.validator.validate_async(self.document) 

1344 except ValidationError as e: 

1345 error = e 

1346 

1347 # If the document changed during the validation, try again. 

1348 if self.document != document: 

1349 continue 

1350 

1351 # Handle validation result. 

1352 if error: 

1353 self.validation_state = ValidationState.INVALID 

1354 else: 

1355 self.validation_state = ValidationState.VALID 

1356 

1357 self.validation_error = error 

1358 get_app().invalidate() # Trigger redraw (display error). 

1359 

1360 def append_to_history(self) -> None: 

1361 """ 

1362 Append the current input to the history. 

1363 """ 

1364 # Save at the tail of the history. (But don't if the last entry the 

1365 # history is already the same.) 

1366 if self.text: 

1367 history_strings = self.history.get_strings() 

1368 if not len(history_strings) or history_strings[-1] != self.text: 

1369 self.history.append_string(self.text) 

1370 

1371 def _search( 

1372 self, 

1373 search_state: SearchState, 

1374 include_current_position: bool = False, 

1375 count: int = 1, 

1376 ) -> tuple[int, int] | None: 

1377 """ 

1378 Execute search. Return (working_index, cursor_position) tuple when this 

1379 search is applied. Returns `None` when this text cannot be found. 

1380 """ 

1381 assert count > 0 

1382 

1383 text = search_state.text 

1384 direction = search_state.direction 

1385 ignore_case = search_state.ignore_case() 

1386 

1387 def search_once( 

1388 working_index: int, document: Document 

1389 ) -> tuple[int, Document] | None: 

1390 """ 

1391 Do search one time. 

1392 Return (working_index, document) or `None` 

1393 """ 

1394 if direction == SearchDirection.FORWARD: 

1395 # Try find at the current input. 

1396 new_index = document.find( 

1397 text, 

1398 include_current_position=include_current_position, 

1399 ignore_case=ignore_case, 

1400 ) 

1401 

1402 if new_index is not None: 

1403 return ( 

1404 working_index, 

1405 Document(document.text, document.cursor_position + new_index), 

1406 ) 

1407 else: 

1408 # No match, go forward in the history. (Include len+1 to wrap around.) 

1409 # (Here we should always include all cursor positions, because 

1410 # it's a different line.) 

1411 for i in range(working_index + 1, len(self._working_lines) + 1): 

1412 i %= len(self._working_lines) 

1413 

1414 document = Document(self._working_lines[i], 0) 

1415 new_index = document.find( 

1416 text, include_current_position=True, ignore_case=ignore_case 

1417 ) 

1418 if new_index is not None: 

1419 return (i, Document(document.text, new_index)) 

1420 else: 

1421 # Try find at the current input. 

1422 new_index = document.find_backwards(text, ignore_case=ignore_case) 

1423 

1424 if new_index is not None: 

1425 return ( 

1426 working_index, 

1427 Document(document.text, document.cursor_position + new_index), 

1428 ) 

1429 else: 

1430 # No match, go back in the history. (Include -1 to wrap around.) 

1431 for i in range(working_index - 1, -2, -1): 

1432 i %= len(self._working_lines) 

1433 

1434 document = Document( 

1435 self._working_lines[i], len(self._working_lines[i]) 

1436 ) 

1437 new_index = document.find_backwards( 

1438 text, ignore_case=ignore_case 

1439 ) 

1440 if new_index is not None: 

1441 return ( 

1442 i, 

1443 Document(document.text, len(document.text) + new_index), 

1444 ) 

1445 return None 

1446 

1447 # Do 'count' search iterations. 

1448 working_index = self.working_index 

1449 document = self.document 

1450 for _ in range(count): 

1451 result = search_once(working_index, document) 

1452 if result is None: 

1453 return None # Nothing found. 

1454 else: 

1455 working_index, document = result 

1456 

1457 return (working_index, document.cursor_position) 

1458 

1459 def document_for_search(self, search_state: SearchState) -> Document: 

1460 """ 

1461 Return a :class:`~prompt_toolkit.document.Document` instance that has 

1462 the text/cursor position for this search, if we would apply it. This 

1463 will be used in the 

1464 :class:`~prompt_toolkit.layout.BufferControl` to display feedback while 

1465 searching. 

1466 """ 

1467 search_result = self._search(search_state, include_current_position=True) 

1468 

1469 if search_result is None: 

1470 return self.document 

1471 else: 

1472 working_index, cursor_position = search_result 

1473 

1474 # Keep selection, when `working_index` was not changed. 

1475 if working_index == self.working_index: 

1476 selection = self.selection_state 

1477 else: 

1478 selection = None 

1479 

1480 return Document( 

1481 self._working_lines[working_index], cursor_position, selection=selection 

1482 ) 

1483 

1484 def get_search_position( 

1485 self, 

1486 search_state: SearchState, 

1487 include_current_position: bool = True, 

1488 count: int = 1, 

1489 ) -> int: 

1490 """ 

1491 Get the cursor position for this search. 

1492 (This operation won't change the `working_index`. It's won't go through 

1493 the history. Vi text objects can't span multiple items.) 

1494 """ 

1495 search_result = self._search( 

1496 search_state, include_current_position=include_current_position, count=count 

1497 ) 

1498 

1499 if search_result is None: 

1500 return self.cursor_position 

1501 else: 

1502 working_index, cursor_position = search_result 

1503 return cursor_position 

1504 

1505 def apply_search( 

1506 self, 

1507 search_state: SearchState, 

1508 include_current_position: bool = True, 

1509 count: int = 1, 

1510 ) -> None: 

1511 """ 

1512 Apply search. If something is found, set `working_index` and 

1513 `cursor_position`. 

1514 """ 

1515 search_result = self._search( 

1516 search_state, include_current_position=include_current_position, count=count 

1517 ) 

1518 

1519 if search_result is not None: 

1520 working_index, cursor_position = search_result 

1521 self.working_index = working_index 

1522 self.cursor_position = cursor_position 

1523 

1524 def exit_selection(self) -> None: 

1525 self.selection_state = None 

1526 

1527 def _editor_simple_tempfile(self) -> tuple[str, Callable[[], None]]: 

1528 """ 

1529 Simple (file) tempfile implementation. 

1530 Return (tempfile, cleanup_func). 

1531 """ 

1532 suffix = to_str(self.tempfile_suffix) 

1533 descriptor, filename = tempfile.mkstemp(suffix) 

1534 

1535 os.write(descriptor, self.text.encode("utf-8")) 

1536 os.close(descriptor) 

1537 

1538 def cleanup() -> None: 

1539 os.unlink(filename) 

1540 

1541 return filename, cleanup 

1542 

1543 def _editor_complex_tempfile(self) -> tuple[str, Callable[[], None]]: 

1544 # Complex (directory) tempfile implementation. 

1545 headtail = to_str(self.tempfile) 

1546 if not headtail: 

1547 # Revert to simple case. 

1548 return self._editor_simple_tempfile() 

1549 headtail = str(headtail) 

1550 

1551 # Try to make according to tempfile logic. 

1552 head, tail = os.path.split(headtail) 

1553 if os.path.isabs(head): 

1554 head = head[1:] 

1555 

1556 dirpath = tempfile.mkdtemp() 

1557 if head: 

1558 dirpath = os.path.join(dirpath, head) 

1559 # Assume there is no issue creating dirs in this temp dir. 

1560 os.makedirs(dirpath) 

1561 

1562 # Open the filename and write current text. 

1563 filename = os.path.join(dirpath, tail) 

1564 with open(filename, "w", encoding="utf-8") as fh: 

1565 fh.write(self.text) 

1566 

1567 def cleanup() -> None: 

1568 shutil.rmtree(dirpath) 

1569 

1570 return filename, cleanup 

1571 

1572 def open_in_editor(self, validate_and_handle: bool = False) -> asyncio.Task[None]: 

1573 """ 

1574 Open code in editor. 

1575 

1576 This returns a future, and runs in a thread executor. 

1577 """ 

1578 if self.read_only(): 

1579 raise EditReadOnlyBuffer() 

1580 

1581 # Write current text to temporary file 

1582 if self.tempfile: 

1583 filename, cleanup_func = self._editor_complex_tempfile() 

1584 else: 

1585 filename, cleanup_func = self._editor_simple_tempfile() 

1586 

1587 async def run() -> None: 

1588 try: 

1589 # Open in editor 

1590 # (We need to use `run_in_terminal`, because not all editors go to 

1591 # the alternate screen buffer, and some could influence the cursor 

1592 # position.) 

1593 success = await run_in_terminal( 

1594 lambda: self._open_file_in_editor(filename), in_executor=True 

1595 ) 

1596 

1597 # Read content again. 

1598 if success: 

1599 with open(filename, "rb") as f: 

1600 text = f.read().decode("utf-8") 

1601 

1602 # Drop trailing newline. (Editors are supposed to add it at the 

1603 # end, but we don't need it.) 

1604 if text.endswith("\n"): 

1605 text = text[:-1] 

1606 

1607 self.document = Document(text=text, cursor_position=len(text)) 

1608 

1609 # Accept the input. 

1610 if validate_and_handle: 

1611 self.validate_and_handle() 

1612 

1613 finally: 

1614 # Clean up temp dir/file. 

1615 cleanup_func() 

1616 

1617 return get_app().create_background_task(run()) 

1618 

1619 def _open_file_in_editor(self, filename: str) -> bool: 

1620 """ 

1621 Call editor executable. 

1622 

1623 Return True when we received a zero return code. 

1624 """ 

1625 # If the 'VISUAL' or 'EDITOR' environment variable has been set, use that. 

1626 # Otherwise, fall back to the first available editor that we can find. 

1627 visual = os.environ.get("VISUAL") 

1628 editor = os.environ.get("EDITOR") 

1629 

1630 editors = [ 

1631 visual, 

1632 editor, 

1633 # Order of preference. 

1634 "/usr/bin/editor", 

1635 "/usr/bin/nano", 

1636 "/usr/bin/pico", 

1637 "/usr/bin/vi", 

1638 "/usr/bin/emacs", 

1639 ] 

1640 

1641 for e in editors: 

1642 if e: 

1643 try: 

1644 # Use 'shlex.split()', because $VISUAL can contain spaces 

1645 # and quotes. 

1646 returncode = subprocess.call(shlex.split(e) + [filename]) 

1647 return returncode == 0 

1648 

1649 except OSError: 

1650 # Executable does not exist, try the next one. 

1651 pass 

1652 

1653 return False 

1654 

1655 def start_completion( 

1656 self, 

1657 select_first: bool = False, 

1658 select_last: bool = False, 

1659 insert_common_part: bool = False, 

1660 complete_event: CompleteEvent | None = None, 

1661 ) -> None: 

1662 """ 

1663 Start asynchronous autocompletion of this buffer. 

1664 (This will do nothing if a previous completion was still in progress.) 

1665 """ 

1666 # Only one of these options can be selected. 

1667 assert select_first + select_last + insert_common_part <= 1 

1668 

1669 get_app().create_background_task( 

1670 self._async_completer( 

1671 select_first=select_first, 

1672 select_last=select_last, 

1673 insert_common_part=insert_common_part, 

1674 complete_event=complete_event 

1675 or CompleteEvent(completion_requested=True), 

1676 ) 

1677 ) 

1678 

1679 def _create_completer_coroutine(self) -> Callable[..., Coroutine[Any, Any, None]]: 

1680 """ 

1681 Create function for asynchronous autocompletion. 

1682 

1683 (This consumes the asynchronous completer generator, which possibly 

1684 runs the completion algorithm in another thread.) 

1685 """ 

1686 

1687 def completion_does_nothing(document: Document, completion: Completion) -> bool: 

1688 """ 

1689 Return `True` if applying this completion doesn't have any effect. 

1690 (When it doesn't insert any new text. 

1691 """ 

1692 text_before_cursor = document.text_before_cursor 

1693 replaced_text = text_before_cursor[ 

1694 len(text_before_cursor) + completion.start_position : 

1695 ] 

1696 return replaced_text == completion.text 

1697 

1698 @_only_one_at_a_time 

1699 async def async_completer( 

1700 select_first: bool = False, 

1701 select_last: bool = False, 

1702 insert_common_part: bool = False, 

1703 complete_event: CompleteEvent | None = None, 

1704 ) -> None: 

1705 document = self.document 

1706 complete_event = complete_event or CompleteEvent(text_inserted=True) 

1707 

1708 # Don't complete when we already have completions. 

1709 if self.complete_state or not self.completer: 

1710 return 

1711 

1712 # Create an empty CompletionState. 

1713 complete_state = CompletionState(original_document=self.document) 

1714 self.complete_state = complete_state 

1715 

1716 def proceed() -> bool: 

1717 """Keep retrieving completions. Input text has not yet changed 

1718 while generating completions.""" 

1719 return self.complete_state == complete_state 

1720 

1721 refresh_needed = asyncio.Event() 

1722 

1723 async def refresh_while_loading() -> None: 

1724 """Background loop to refresh the UI at most 3 times a second 

1725 while the completion are loading. Calling 

1726 `on_completions_changed.fire()` for every completion that we 

1727 receive is too expensive when there are many completions. (We 

1728 could tune `Application.max_render_postpone_time` and 

1729 `Application.min_redraw_interval`, but having this here is a 

1730 better approach.) 

1731 """ 

1732 while True: 

1733 self.on_completions_changed.fire() 

1734 refresh_needed.clear() 

1735 await asyncio.sleep(0.3) 

1736 await refresh_needed.wait() 

1737 

1738 refresh_task = asyncio.ensure_future(refresh_while_loading()) 

1739 try: 

1740 # Load. 

1741 async with aclosing( 

1742 self.completer.get_completions_async(document, complete_event) 

1743 ) as async_generator: 

1744 async for completion in async_generator: 

1745 complete_state.completions.append(completion) 

1746 refresh_needed.set() 

1747 

1748 # If the input text changes, abort. 

1749 if not proceed(): 

1750 break 

1751 finally: 

1752 refresh_task.cancel() 

1753 

1754 # Refresh one final time after we got everything. 

1755 self.on_completions_changed.fire() 

1756 

1757 completions = complete_state.completions 

1758 

1759 # When there is only one completion, which has nothing to add, ignore it. 

1760 if len(completions) == 1 and completion_does_nothing( 

1761 document, completions[0] 

1762 ): 

1763 del completions[:] 

1764 

1765 # Set completions if the text was not yet changed. 

1766 if proceed(): 

1767 # When no completions were found, or when the user selected 

1768 # already a completion by using the arrow keys, don't do anything. 

1769 if ( 

1770 not self.complete_state 

1771 or self.complete_state.complete_index is not None 

1772 ): 

1773 return 

1774 

1775 # When there are no completions, reset completion state anyway. 

1776 if not completions: 

1777 self.complete_state = None 

1778 # Render the ui if the completion menu was shown 

1779 # it is needed especially if there is one completion and it was deleted. 

1780 self.on_completions_changed.fire() 

1781 return 

1782 

1783 # Select first/last or insert common part, depending on the key 

1784 # binding. (For this we have to wait until all completions are 

1785 # loaded.) 

1786 

1787 if select_first: 

1788 self.go_to_completion(0) 

1789 

1790 elif select_last: 

1791 self.go_to_completion(len(completions) - 1) 

1792 

1793 elif insert_common_part: 

1794 common_part = get_common_complete_suffix(document, completions) 

1795 if common_part: 

1796 # Insert the common part, update completions. 

1797 self.insert_text(common_part) 

1798 if len(completions) > 1: 

1799 # (Don't call `async_completer` again, but 

1800 # recalculate completions. See: 

1801 # https://github.com/ipython/ipython/issues/9658) 

1802 completions[:] = [ 

1803 c.new_completion_from_position(len(common_part)) 

1804 for c in completions 

1805 ] 

1806 

1807 self._set_completions(completions=completions) 

1808 else: 

1809 self.complete_state = None 

1810 else: 

1811 # When we were asked to insert the "common" 

1812 # prefix, but there was no common suffix but 

1813 # still exactly one match, then select the 

1814 # first. (It could be that we have a completion 

1815 # which does * expansion, like '*.py', with 

1816 # exactly one match.) 

1817 if len(completions) == 1: 

1818 self.go_to_completion(0) 

1819 

1820 else: 

1821 # If the last operation was an insert, (not a delete), restart 

1822 # the completion coroutine. 

1823 

1824 if self.document.text_before_cursor == document.text_before_cursor: 

1825 return # Nothing changed. 

1826 

1827 if self.document.text_before_cursor.startswith( 

1828 document.text_before_cursor 

1829 ): 

1830 raise _Retry 

1831 

1832 return async_completer 

1833 

1834 def _create_auto_suggest_coroutine(self) -> Callable[[], Coroutine[Any, Any, None]]: 

1835 """ 

1836 Create function for asynchronous auto suggestion. 

1837 (This can be in another thread.) 

1838 """ 

1839 

1840 @_only_one_at_a_time 

1841 async def async_suggestor() -> None: 

1842 document = self.document 

1843 

1844 # Don't suggest when we already have a suggestion. 

1845 if self.suggestion or not self.auto_suggest: 

1846 return 

1847 

1848 suggestion = await self.auto_suggest.get_suggestion_async(self, document) 

1849 

1850 # Set suggestion only if the text was not yet changed. 

1851 if self.document == document: 

1852 # Set suggestion and redraw interface. 

1853 self.suggestion = suggestion 

1854 self.on_suggestion_set.fire() 

1855 else: 

1856 # Otherwise, restart thread. 

1857 raise _Retry 

1858 

1859 return async_suggestor 

1860 

1861 def _create_auto_validate_coroutine( 

1862 self, 

1863 ) -> Callable[[], Coroutine[Any, Any, None]]: 

1864 """ 

1865 Create a function for asynchronous validation while typing. 

1866 (This can be in another thread.) 

1867 """ 

1868 

1869 @_only_one_at_a_time 

1870 async def async_validator() -> None: 

1871 await self._validate_async() 

1872 

1873 return async_validator 

1874 

1875 def validate_and_handle(self) -> None: 

1876 """ 

1877 Validate buffer and handle the accept action. 

1878 """ 

1879 valid = self.validate(set_cursor=True) 

1880 

1881 # When the validation succeeded, accept the input. 

1882 if valid: 

1883 if self.accept_handler: 

1884 keep_text = self.accept_handler(self) 

1885 else: 

1886 keep_text = False 

1887 

1888 self.append_to_history() 

1889 

1890 if not keep_text: 

1891 self.reset() 

1892 

1893 

1894_T = TypeVar("_T", bound=Callable[..., Coroutine[Any, Any, None]]) 

1895 

1896 

1897def _only_one_at_a_time(coroutine: _T) -> _T: 

1898 """ 

1899 Decorator that only starts the coroutine only if the previous call has 

1900 finished. (Used to make sure that we have only one autocompleter, auto 

1901 suggestor and validator running at a time.) 

1902 

1903 When the coroutine raises `_Retry`, it is restarted. 

1904 """ 

1905 running = False 

1906 

1907 @wraps(coroutine) 

1908 async def new_coroutine(*a: Any, **kw: Any) -> Any: 

1909 nonlocal running 

1910 

1911 # Don't start a new function, if the previous is still in progress. 

1912 if running: 

1913 return 

1914 

1915 running = True 

1916 

1917 try: 

1918 while True: 

1919 try: 

1920 await coroutine(*a, **kw) 

1921 except _Retry: 

1922 continue 

1923 else: 

1924 return None 

1925 finally: 

1926 running = False 

1927 

1928 return cast(_T, new_coroutine) 

1929 

1930 

1931class _Retry(Exception): 

1932 "Retry in `_only_one_at_a_time`." 

1933 

1934 

1935def indent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: 

1936 """ 

1937 Indent text of a :class:`.Buffer` object. 

1938 """ 

1939 current_row = buffer.document.cursor_position_row 

1940 current_col = buffer.document.cursor_position_col 

1941 line_range = range(from_row, to_row) 

1942 

1943 # Apply transformation. 

1944 indent_content = " " * count 

1945 new_text = buffer.transform_lines(line_range, lambda l: indent_content + l) 

1946 buffer.document = Document( 

1947 new_text, Document(new_text).translate_row_col_to_index(current_row, 0) 

1948 ) 

1949 

1950 # Place cursor in the same position in text after indenting 

1951 buffer.cursor_position += current_col + len(indent_content) 

1952 

1953 

1954def unindent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: 

1955 """ 

1956 Unindent text of a :class:`.Buffer` object. 

1957 """ 

1958 current_row = buffer.document.cursor_position_row 

1959 current_col = buffer.document.cursor_position_col 

1960 line_range = range(from_row, to_row) 

1961 

1962 indent_content = " " * count 

1963 

1964 def transform(text: str) -> str: 

1965 remove = indent_content 

1966 if text.startswith(remove): 

1967 return text[len(remove) :] 

1968 else: 

1969 return text.lstrip() 

1970 

1971 # Apply transformation. 

1972 new_text = buffer.transform_lines(line_range, transform) 

1973 buffer.document = Document( 

1974 new_text, Document(new_text).translate_row_col_to_index(current_row, 0) 

1975 ) 

1976 

1977 # Place cursor in the same position in text after dedent 

1978 buffer.cursor_position += current_col - len(indent_content) 

1979 

1980 

1981def reshape_text(buffer: Buffer, from_row: int, to_row: int) -> None: 

1982 """ 

1983 Reformat text, taking the width into account. 

1984 `to_row` is included. 

1985 (Vi 'gq' operator.) 

1986 """ 

1987 lines = buffer.text.splitlines(True) 

1988 lines_before = lines[:from_row] 

1989 lines_after = lines[to_row + 1 :] 

1990 lines_to_reformat = lines[from_row : to_row + 1] 

1991 

1992 if lines_to_reformat: 

1993 # Take indentation from the first line. 

1994 match = re.search(r"^\s*", lines_to_reformat[0]) 

1995 length = match.end() if match else 0 # `match` can't be None, actually. 

1996 

1997 indent = lines_to_reformat[0][:length].replace("\n", "") 

1998 

1999 # Now, take all the 'words' from the lines to be reshaped. 

2000 words = "".join(lines_to_reformat).split() 

2001 

2002 # And reshape. 

2003 width = (buffer.text_width or 80) - len(indent) 

2004 reshaped_text = [indent] 

2005 current_width = 0 

2006 for w in words: 

2007 if current_width: 

2008 if len(w) + current_width + 1 > width: 

2009 reshaped_text.append("\n") 

2010 reshaped_text.append(indent) 

2011 current_width = 0 

2012 else: 

2013 reshaped_text.append(" ") 

2014 current_width += 1 

2015 

2016 reshaped_text.append(w) 

2017 current_width += len(w) 

2018 

2019 if reshaped_text[-1] != "\n": 

2020 reshaped_text.append("\n") 

2021 

2022 # Apply result. 

2023 buffer.document = Document( 

2024 text="".join(lines_before + reshaped_text + lines_after), 

2025 cursor_position=len("".join(lines_before + reshaped_text)), 

2026 )