Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/buffer.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

818 statements  

1""" 

2Data structures for the Buffer. 

3It holds the text, cursor position, history, etc... 

4""" 

5 

6from __future__ import annotations 

7 

8import asyncio 

9import logging 

10import os 

11import re 

12import shlex 

13import shutil 

14import subprocess 

15import tempfile 

16from collections import deque 

17from enum import Enum 

18from functools import wraps 

19from typing import Any, Callable, Coroutine, Iterable, TypeVar, cast 

20 

21from .application.current import get_app 

22from .application.run_in_terminal import run_in_terminal 

23from .auto_suggest import AutoSuggest, Suggestion 

24from .cache import FastDictCache 

25from .clipboard import ClipboardData 

26from .completion import ( 

27 CompleteEvent, 

28 Completer, 

29 Completion, 

30 DummyCompleter, 

31 get_common_complete_suffix, 

32) 

33from .document import Document 

34from .eventloop import aclosing 

35from .filters import FilterOrBool, to_filter 

36from .history import History, InMemoryHistory 

37from .search import SearchDirection, SearchState 

38from .selection import PasteMode, SelectionState, SelectionType 

39from .utils import Event, to_str 

40from .validation import ValidationError, Validator 

41 

42__all__ = [ 

43 "EditReadOnlyBuffer", 

44 "Buffer", 

45 "CompletionState", 

46 "indent", 

47 "unindent", 

48 "reshape_text", 

49] 

50 

51logger = logging.getLogger(__name__) 

52 

53 

54class EditReadOnlyBuffer(Exception): 

55 "Attempt editing of read-only :class:`.Buffer`." 

56 

57 

58class ValidationState(Enum): 

59 "The validation state of a buffer. This is set after the validation." 

60 

61 VALID = "VALID" 

62 INVALID = "INVALID" 

63 UNKNOWN = "UNKNOWN" 

64 

65 

66class CompletionState: 

67 """ 

68 Immutable class that contains a completion state. 

69 """ 

70 

71 def __init__( 

72 self, 

73 original_document: Document, 

74 completions: list[Completion] | None = None, 

75 complete_index: int | None = None, 

76 ) -> None: 

77 #: Document as it was when the completion started. 

78 self.original_document = original_document 

79 

80 #: List of all the current Completion instances which are possible at 

81 #: this point. 

82 self.completions = completions or [] 

83 

84 #: Position in the `completions` array. 

85 #: This can be `None` to indicate "no completion", the original text. 

86 self.complete_index = complete_index # Position in the `_completions` array. 

87 

88 def __repr__(self) -> str: 

89 return f"{self.__class__.__name__}({self.original_document!r}, <{len(self.completions)!r}> completions, index={self.complete_index!r})" 

90 

91 def go_to_index(self, index: int | None) -> None: 

92 """ 

93 Create a new :class:`.CompletionState` object with the new index. 

94 

95 When `index` is `None` deselect the completion. 

96 """ 

97 if self.completions: 

98 assert index is None or 0 <= index < len(self.completions) 

99 self.complete_index = index 

100 

101 def new_text_and_position(self) -> tuple[str, int]: 

102 """ 

103 Return (new_text, new_cursor_position) for this completion. 

104 """ 

105 if self.complete_index is None: 

106 return self.original_document.text, self.original_document.cursor_position 

107 else: 

108 original_text_before_cursor = self.original_document.text_before_cursor 

109 original_text_after_cursor = self.original_document.text_after_cursor 

110 

111 c = self.completions[self.complete_index] 

112 if c.start_position == 0: 

113 before = original_text_before_cursor 

114 else: 

115 before = original_text_before_cursor[: c.start_position] 

116 

117 new_text = before + c.text + original_text_after_cursor 

118 new_cursor_position = len(before) + len(c.text) 

119 return new_text, new_cursor_position 

120 

121 @property 

122 def current_completion(self) -> Completion | None: 

123 """ 

124 Return the current completion, or return `None` when no completion is 

125 selected. 

126 """ 

127 if self.complete_index is not None: 

128 return self.completions[self.complete_index] 

129 return None 

130 

131 

132_QUOTED_WORDS_RE = re.compile(r"""(\s+|".*?"|'.*?')""") 

133 

134 

135class YankNthArgState: 

136 """ 

137 For yank-last-arg/yank-nth-arg: Keep track of where we are in the history. 

138 """ 

139 

140 def __init__( 

141 self, history_position: int = 0, n: int = -1, previous_inserted_word: str = "" 

142 ) -> None: 

143 self.history_position = history_position 

144 self.previous_inserted_word = previous_inserted_word 

145 self.n = n 

146 

147 def __repr__(self) -> str: 

148 return f"{self.__class__.__name__}(history_position={self.history_position!r}, n={self.n!r}, previous_inserted_word={self.previous_inserted_word!r})" 

149 

150 

151BufferEventHandler = Callable[["Buffer"], None] 

152BufferAcceptHandler = Callable[["Buffer"], bool] 

153 

154 

155class Buffer: 

156 """ 

157 The core data structure that holds the text and cursor position of the 

158 current input line and implements all text manipulations on top of it. It 

159 also implements the history, undo stack and the completion state. 

160 

161 :param completer: :class:`~prompt_toolkit.completion.Completer` instance. 

162 :param history: :class:`~prompt_toolkit.history.History` instance. 

163 :param tempfile_suffix: The tempfile suffix (extension) to be used for the 

164 "open in editor" function. For a Python REPL, this would be ".py", so 

165 that the editor knows the syntax highlighting to use. This can also be 

166 a callable that returns a string. 

167 :param tempfile: For more advanced tempfile situations where you need 

168 control over the subdirectories and filename. For a Git Commit Message, 

169 this would be ".git/COMMIT_EDITMSG", so that the editor knows the syntax 

170 highlighting to use. This can also be a callable that returns a string. 

171 :param name: Name for this buffer. E.g. DEFAULT_BUFFER. This is mostly 

172 useful for key bindings where we sometimes prefer to refer to a buffer 

173 by their name instead of by reference. 

174 :param accept_handler: Called when the buffer input is accepted. (Usually 

175 when the user presses `enter`.) The accept handler receives this 

176 `Buffer` as input and should return True when the buffer text should be 

177 kept instead of calling reset. 

178 

179 In case of a `PromptSession` for instance, we want to keep the text, 

180 because we will exit the application, and only reset it during the next 

181 run. 

182 :param max_number_of_completions: Never display more than this number of 

183 completions, even when the completer can produce more (limited by 

184 default to 10k for performance). 

185 

186 Events: 

187 

188 :param on_text_changed: When the buffer text changes. (Callable or None.) 

189 :param on_text_insert: When new text is inserted. (Callable or None.) 

190 :param on_cursor_position_changed: When the cursor moves. (Callable or None.) 

191 :param on_completions_changed: When the completions were changed. (Callable or None.) 

192 :param on_suggestion_set: When an auto-suggestion text has been set. (Callable or None.) 

193 

194 Filters: 

195 

196 :param complete_while_typing: :class:`~prompt_toolkit.filters.Filter` 

197 or `bool`. Decide whether or not to do asynchronous autocompleting while 

198 typing. 

199 :param validate_while_typing: :class:`~prompt_toolkit.filters.Filter` 

200 or `bool`. Decide whether or not to do asynchronous validation while 

201 typing. 

202 :param enable_history_search: :class:`~prompt_toolkit.filters.Filter` or 

203 `bool` to indicate when up-arrow partial string matching is enabled. It 

204 is advised to not enable this at the same time as 

205 `complete_while_typing`, because when there is an autocompletion found, 

206 the up arrows usually browse through the completions, rather than 

207 through the history. 

208 :param read_only: :class:`~prompt_toolkit.filters.Filter`. When True, 

209 changes will not be allowed. 

210 :param multiline: :class:`~prompt_toolkit.filters.Filter` or `bool`. When 

211 not set, pressing `Enter` will call the `accept_handler`. Otherwise, 

212 pressing `Esc-Enter` is required. 

213 """ 

214 

215 def __init__( 

216 self, 

217 completer: Completer | None = None, 

218 auto_suggest: AutoSuggest | None = None, 

219 history: History | None = None, 

220 validator: Validator | None = None, 

221 tempfile_suffix: str | Callable[[], str] = "", 

222 tempfile: str | Callable[[], str] = "", 

223 name: str = "", 

224 complete_while_typing: FilterOrBool = False, 

225 validate_while_typing: FilterOrBool = False, 

226 enable_history_search: FilterOrBool = False, 

227 document: Document | None = None, 

228 accept_handler: BufferAcceptHandler | None = None, 

229 read_only: FilterOrBool = False, 

230 multiline: FilterOrBool = True, 

231 max_number_of_completions: int = 10000, 

232 on_text_changed: BufferEventHandler | None = None, 

233 on_text_insert: BufferEventHandler | None = None, 

234 on_cursor_position_changed: BufferEventHandler | None = None, 

235 on_completions_changed: BufferEventHandler | None = None, 

236 on_suggestion_set: BufferEventHandler | None = None, 

237 ) -> None: 

238 # Accept both filters and booleans as input. 

239 enable_history_search = to_filter(enable_history_search) 

240 complete_while_typing = to_filter(complete_while_typing) 

241 validate_while_typing = to_filter(validate_while_typing) 

242 read_only = to_filter(read_only) 

243 multiline = to_filter(multiline) 

244 

245 self.completer = completer or DummyCompleter() 

246 self.auto_suggest = auto_suggest 

247 self.validator = validator 

248 self.tempfile_suffix = tempfile_suffix 

249 self.tempfile = tempfile 

250 self.name = name 

251 self.accept_handler = accept_handler 

252 

253 # Filters. (Usually, used by the key bindings to drive the buffer.) 

254 self.complete_while_typing = complete_while_typing 

255 self.validate_while_typing = validate_while_typing 

256 self.enable_history_search = enable_history_search 

257 self.read_only = read_only 

258 self.multiline = multiline 

259 self.max_number_of_completions = max_number_of_completions 

260 

261 # Text width. (For wrapping, used by the Vi 'gq' operator.) 

262 self.text_width = 0 

263 

264 #: The command buffer history. 

265 # Note that we shouldn't use a lazy 'or' here. bool(history) could be 

266 # False when empty. 

267 self.history = InMemoryHistory() if history is None else history 

268 

269 self.__cursor_position = 0 

270 

271 # Events 

272 self.on_text_changed: Event[Buffer] = Event(self, on_text_changed) 

273 self.on_text_insert: Event[Buffer] = Event(self, on_text_insert) 

274 self.on_cursor_position_changed: Event[Buffer] = Event( 

275 self, on_cursor_position_changed 

276 ) 

277 self.on_completions_changed: Event[Buffer] = Event(self, on_completions_changed) 

278 self.on_suggestion_set: Event[Buffer] = Event(self, on_suggestion_set) 

279 

280 # Document cache. (Avoid creating new Document instances.) 

281 self._document_cache: FastDictCache[ 

282 tuple[str, int, SelectionState | None], Document 

283 ] = FastDictCache(Document, size=10) 

284 

285 # Create completer / auto suggestion / validation coroutines. 

286 self._async_suggester = self._create_auto_suggest_coroutine() 

287 self._async_completer = self._create_completer_coroutine() 

288 self._async_validator = self._create_auto_validate_coroutine() 

289 

290 # Asyncio task for populating the history. 

291 self._load_history_task: asyncio.Future[None] | None = None 

292 

293 # Reset other attributes. 

294 self.reset(document=document) 

295 

296 def __repr__(self) -> str: 

297 if len(self.text) < 15: 

298 text = self.text 

299 else: 

300 text = self.text[:12] + "..." 

301 

302 return f"<Buffer(name={self.name!r}, text={text!r}) at {id(self)!r}>" 

303 

304 def reset( 

305 self, document: Document | None = None, append_to_history: bool = False 

306 ) -> None: 

307 """ 

308 :param append_to_history: Append current input to history first. 

309 """ 

310 if append_to_history: 

311 self.append_to_history() 

312 

313 document = document or Document() 

314 

315 self.__cursor_position = document.cursor_position 

316 

317 # `ValidationError` instance. (Will be set when the input is wrong.) 

318 self.validation_error: ValidationError | None = None 

319 self.validation_state: ValidationState | None = ValidationState.UNKNOWN 

320 

321 # State of the selection. 

322 self.selection_state: SelectionState | None = None 

323 

324 # Multiple cursor mode. (When we press 'I' or 'A' in visual-block mode, 

325 # we can insert text on multiple lines at once. This is implemented by 

326 # using multiple cursors.) 

327 self.multiple_cursor_positions: list[int] = [] 

328 

329 # When doing consecutive up/down movements, prefer to stay at this column. 

330 self.preferred_column: int | None = None 

331 

332 # State of complete browser 

333 # For interactive completion through Ctrl-N/Ctrl-P. 

334 self.complete_state: CompletionState | None = None 

335 

336 # State of Emacs yank-nth-arg completion. 

337 self.yank_nth_arg_state: YankNthArgState | None = None # for yank-nth-arg. 

338 

339 # Remember the document that we had *right before* the last paste 

340 # operation. This is used for rotating through the kill ring. 

341 self.document_before_paste: Document | None = None 

342 

343 # Current suggestion. 

344 self.suggestion: Suggestion | None = None 

345 

346 # The history search text. (Used for filtering the history when we 

347 # browse through it.) 

348 self.history_search_text: str | None = None 

349 

350 # Undo/redo stacks (stack of `(text, cursor_position)`). 

351 self._undo_stack: list[tuple[str, int]] = [] 

352 self._redo_stack: list[tuple[str, int]] = [] 

353 

354 # Cancel history loader. If history loading was still ongoing. 

355 # Cancel the `_load_history_task`, so that next repaint of the 

356 # `BufferControl` we will repopulate it. 

357 if self._load_history_task is not None: 

358 self._load_history_task.cancel() 

359 self._load_history_task = None 

360 

361 #: The working lines. Similar to history, except that this can be 

362 #: modified. The user can press arrow_up and edit previous entries. 

363 #: Ctrl-C should reset this, and copy the whole history back in here. 

364 #: Enter should process the current command and append to the real 

365 #: history. 

366 self._working_lines: deque[str] = deque([document.text]) 

367 self.__working_index = 0 

368 

369 def load_history_if_not_yet_loaded(self) -> None: 

370 """ 

371 Create task for populating the buffer history (if not yet done). 

372 

373 Note:: 

374 

375 This needs to be called from within the event loop of the 

376 application, because history loading is async, and we need to be 

377 sure the right event loop is active. Therefor, we call this method 

378 in the `BufferControl.create_content`. 

379 

380 There are situations where prompt_toolkit applications are created 

381 in one thread, but will later run in a different thread (Ptpython 

382 is one example. The REPL runs in a separate thread, in order to 

383 prevent interfering with a potential different event loop in the 

384 main thread. The REPL UI however is still created in the main 

385 thread.) We could decide to not support creating prompt_toolkit 

386 objects in one thread and running the application in a different 

387 thread, but history loading is the only place where it matters, and 

388 this solves it. 

389 """ 

390 if self._load_history_task is None: 

391 

392 async def load_history() -> None: 

393 async for item in self.history.load(): 

394 self._working_lines.appendleft(item) 

395 self.__working_index += 1 

396 

397 self._load_history_task = get_app().create_background_task(load_history()) 

398 

399 def load_history_done(f: asyncio.Future[None]) -> None: 

400 """ 

401 Handle `load_history` result when either done, cancelled, or 

402 when an exception was raised. 

403 """ 

404 try: 

405 f.result() 

406 except asyncio.CancelledError: 

407 # Ignore cancellation. But handle it, so that we don't get 

408 # this traceback. 

409 pass 

410 except GeneratorExit: 

411 # Probably not needed, but we had situations where 

412 # `GeneratorExit` was raised in `load_history` during 

413 # cancellation. 

414 pass 

415 except BaseException: 

416 # Log error if something goes wrong. (We don't have a 

417 # caller to which we can propagate this exception.) 

418 logger.exception("Loading history failed") 

419 

420 self._load_history_task.add_done_callback(load_history_done) 

421 

422 # <getters/setters> 

423 

424 def _set_text(self, value: str) -> bool: 

425 """set text at current working_index. Return whether it changed.""" 

426 working_index = self.working_index 

427 working_lines = self._working_lines 

428 

429 original_value = working_lines[working_index] 

430 working_lines[working_index] = value 

431 

432 # Return True when this text has been changed. 

433 if len(value) != len(original_value): 

434 # For Python 2, it seems that when two strings have a different 

435 # length and one is a prefix of the other, Python still scans 

436 # character by character to see whether the strings are different. 

437 # (Some benchmarking showed significant differences for big 

438 # documents. >100,000 of lines.) 

439 return True 

440 elif value != original_value: 

441 return True 

442 return False 

443 

444 def _set_cursor_position(self, value: int) -> bool: 

445 """Set cursor position. Return whether it changed.""" 

446 original_position = self.__cursor_position 

447 self.__cursor_position = max(0, value) 

448 

449 return self.__cursor_position != original_position 

450 

451 @property 

452 def text(self) -> str: 

453 return self._working_lines[self.working_index] 

454 

455 @text.setter 

456 def text(self, value: str) -> None: 

457 """ 

458 Setting text. (When doing this, make sure that the cursor_position is 

459 valid for this text. text/cursor_position should be consistent at any time, 

460 otherwise set a Document instead.) 

461 """ 

462 # Ensure cursor position remains within the size of the text. 

463 if self.cursor_position > len(value): 

464 self.cursor_position = len(value) 

465 

466 # Don't allow editing of read-only buffers. 

467 if self.read_only(): 

468 raise EditReadOnlyBuffer() 

469 

470 changed = self._set_text(value) 

471 

472 if changed: 

473 self._text_changed() 

474 

475 # Reset history search text. 

476 # (Note that this doesn't need to happen when working_index 

477 # changes, which is when we traverse the history. That's why we 

478 # don't do this in `self._text_changed`.) 

479 self.history_search_text = None 

480 

481 @property 

482 def cursor_position(self) -> int: 

483 return self.__cursor_position 

484 

485 @cursor_position.setter 

486 def cursor_position(self, value: int) -> None: 

487 """ 

488 Setting cursor position. 

489 """ 

490 assert isinstance(value, int) 

491 

492 # Ensure cursor position is within the size of the text. 

493 if value > len(self.text): 

494 value = len(self.text) 

495 if value < 0: 

496 value = 0 

497 

498 changed = self._set_cursor_position(value) 

499 

500 if changed: 

501 self._cursor_position_changed() 

502 

503 @property 

504 def working_index(self) -> int: 

505 return self.__working_index 

506 

507 @working_index.setter 

508 def working_index(self, value: int) -> None: 

509 if self.__working_index != value: 

510 self.__working_index = value 

511 # Make sure to reset the cursor position, otherwise we end up in 

512 # situations where the cursor position is out of the bounds of the 

513 # text. 

514 self.cursor_position = 0 

515 self._text_changed() 

516 

517 def _text_changed(self) -> None: 

518 # Remove any validation errors and complete state. 

519 self.validation_error = None 

520 self.validation_state = ValidationState.UNKNOWN 

521 self.complete_state = None 

522 self.yank_nth_arg_state = None 

523 self.document_before_paste = None 

524 self.selection_state = None 

525 self.suggestion = None 

526 self.preferred_column = None 

527 

528 # fire 'on_text_changed' event. 

529 self.on_text_changed.fire() 

530 

531 # Input validation. 

532 # (This happens on all change events, unlike auto completion, also when 

533 # deleting text.) 

534 if self.validator and self.validate_while_typing(): 

535 get_app().create_background_task(self._async_validator()) 

536 

537 def _cursor_position_changed(self) -> None: 

538 # Remove any complete state. 

539 # (Input validation should only be undone when the cursor position 

540 # changes.) 

541 self.complete_state = None 

542 self.yank_nth_arg_state = None 

543 self.document_before_paste = None 

544 

545 # Unset preferred_column. (Will be set after the cursor movement, if 

546 # required.) 

547 self.preferred_column = None 

548 

549 # Note that the cursor position can change if we have a selection the 

550 # new position of the cursor determines the end of the selection. 

551 

552 # fire 'on_cursor_position_changed' event. 

553 self.on_cursor_position_changed.fire() 

554 

555 @property 

556 def document(self) -> Document: 

557 """ 

558 Return :class:`~prompt_toolkit.document.Document` instance from the 

559 current text, cursor position and selection state. 

560 """ 

561 return self._document_cache[ 

562 self.text, self.cursor_position, self.selection_state 

563 ] 

564 

565 @document.setter 

566 def document(self, value: Document) -> None: 

567 """ 

568 Set :class:`~prompt_toolkit.document.Document` instance. 

569 

570 This will set both the text and cursor position at the same time, but 

571 atomically. (Change events will be triggered only after both have been set.) 

572 """ 

573 self.set_document(value) 

574 

575 def set_document(self, value: Document, bypass_readonly: bool = False) -> None: 

576 """ 

577 Set :class:`~prompt_toolkit.document.Document` instance. Like the 

578 ``document`` property, but accept an ``bypass_readonly`` argument. 

579 

580 :param bypass_readonly: When True, don't raise an 

581 :class:`.EditReadOnlyBuffer` exception, even 

582 when the buffer is read-only. 

583 

584 .. warning:: 

585 

586 When this buffer is read-only and `bypass_readonly` was not passed, 

587 the `EditReadOnlyBuffer` exception will be caught by the 

588 `KeyProcessor` and is silently suppressed. This is important to 

589 keep in mind when writing key bindings, because it won't do what 

590 you expect, and there won't be a stack trace. Use try/finally 

591 around this function if you need some cleanup code. 

592 """ 

593 # Don't allow editing of read-only buffers. 

594 if not bypass_readonly and self.read_only(): 

595 raise EditReadOnlyBuffer() 

596 

597 # Set text and cursor position first. 

598 text_changed = self._set_text(value.text) 

599 cursor_position_changed = self._set_cursor_position(value.cursor_position) 

600 

601 # Now handle change events. (We do this when text/cursor position is 

602 # both set and consistent.) 

603 if text_changed: 

604 self._text_changed() 

605 self.history_search_text = None 

606 

607 if cursor_position_changed: 

608 self._cursor_position_changed() 

609 

610 @property 

611 def is_returnable(self) -> bool: 

612 """ 

613 True when there is something handling accept. 

614 """ 

615 return bool(self.accept_handler) 

616 

617 # End of <getters/setters> 

618 

619 def save_to_undo_stack(self, clear_redo_stack: bool = True) -> None: 

620 """ 

621 Safe current state (input text and cursor position), so that we can 

622 restore it by calling undo. 

623 """ 

624 # Safe if the text is different from the text at the top of the stack 

625 # is different. If the text is the same, just update the cursor position. 

626 if self._undo_stack and self._undo_stack[-1][0] == self.text: 

627 self._undo_stack[-1] = (self._undo_stack[-1][0], self.cursor_position) 

628 else: 

629 self._undo_stack.append((self.text, self.cursor_position)) 

630 

631 # Saving anything to the undo stack, clears the redo stack. 

632 if clear_redo_stack: 

633 self._redo_stack = [] 

634 

635 def transform_lines( 

636 self, 

637 line_index_iterator: Iterable[int], 

638 transform_callback: Callable[[str], str], 

639 ) -> str: 

640 """ 

641 Transforms the text on a range of lines. 

642 When the iterator yield an index not in the range of lines that the 

643 document contains, it skips them silently. 

644 

645 To uppercase some lines:: 

646 

647 new_text = transform_lines(range(5,10), lambda text: text.upper()) 

648 

649 :param line_index_iterator: Iterator of line numbers (int) 

650 :param transform_callback: callable that takes the original text of a 

651 line, and return the new text for this line. 

652 

653 :returns: The new text. 

654 """ 

655 # Split lines 

656 lines = self.text.split("\n") 

657 

658 # Apply transformation 

659 for index in line_index_iterator: 

660 try: 

661 lines[index] = transform_callback(lines[index]) 

662 except IndexError: 

663 pass 

664 

665 return "\n".join(lines) 

666 

667 def transform_current_line(self, transform_callback: Callable[[str], str]) -> None: 

668 """ 

669 Apply the given transformation function to the current line. 

670 

671 :param transform_callback: callable that takes a string and return a new string. 

672 """ 

673 document = self.document 

674 a = document.cursor_position + document.get_start_of_line_position() 

675 b = document.cursor_position + document.get_end_of_line_position() 

676 self.text = ( 

677 document.text[:a] 

678 + transform_callback(document.text[a:b]) 

679 + document.text[b:] 

680 ) 

681 

682 def transform_region( 

683 self, from_: int, to: int, transform_callback: Callable[[str], str] 

684 ) -> None: 

685 """ 

686 Transform a part of the input string. 

687 

688 :param from_: (int) start position. 

689 :param to: (int) end position. 

690 :param transform_callback: Callable which accepts a string and returns 

691 the transformed string. 

692 """ 

693 assert from_ < to 

694 

695 self.text = "".join( 

696 [ 

697 self.text[:from_] 

698 + transform_callback(self.text[from_:to]) 

699 + self.text[to:] 

700 ] 

701 ) 

702 

703 def cursor_left(self, count: int = 1) -> None: 

704 self.cursor_position += self.document.get_cursor_left_position(count=count) 

705 

706 def cursor_right(self, count: int = 1) -> None: 

707 self.cursor_position += self.document.get_cursor_right_position(count=count) 

708 

709 def cursor_up(self, count: int = 1) -> None: 

710 """(for multiline edit). Move cursor to the previous line.""" 

711 original_column = self.preferred_column or self.document.cursor_position_col 

712 self.cursor_position += self.document.get_cursor_up_position( 

713 count=count, preferred_column=original_column 

714 ) 

715 

716 # Remember the original column for the next up/down movement. 

717 self.preferred_column = original_column 

718 

719 def cursor_down(self, count: int = 1) -> None: 

720 """(for multiline edit). Move cursor to the next line.""" 

721 original_column = self.preferred_column or self.document.cursor_position_col 

722 self.cursor_position += self.document.get_cursor_down_position( 

723 count=count, preferred_column=original_column 

724 ) 

725 

726 # Remember the original column for the next up/down movement. 

727 self.preferred_column = original_column 

728 

729 def auto_up( 

730 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False 

731 ) -> None: 

732 """ 

733 If we're not on the first line (of a multiline input) go a line up, 

734 otherwise go back in history. (If nothing is selected.) 

735 """ 

736 if self.complete_state: 

737 self.complete_previous(count=count) 

738 elif self.document.cursor_position_row > 0: 

739 self.cursor_up(count=count) 

740 elif not self.selection_state: 

741 self.history_backward(count=count) 

742 

743 # Go to the start of the line? 

744 if go_to_start_of_line_if_history_changes: 

745 self.cursor_position += self.document.get_start_of_line_position() 

746 

747 def auto_down( 

748 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False 

749 ) -> None: 

750 """ 

751 If we're not on the last line (of a multiline input) go a line down, 

752 otherwise go forward in history. (If nothing is selected.) 

753 """ 

754 if self.complete_state: 

755 self.complete_next(count=count) 

756 elif self.document.cursor_position_row < self.document.line_count - 1: 

757 self.cursor_down(count=count) 

758 elif not self.selection_state: 

759 self.history_forward(count=count) 

760 

761 # Go to the start of the line? 

762 if go_to_start_of_line_if_history_changes: 

763 self.cursor_position += self.document.get_start_of_line_position() 

764 

765 def delete_before_cursor(self, count: int = 1) -> str: 

766 """ 

767 Delete specified number of characters before cursor and return the 

768 deleted text. 

769 """ 

770 assert count >= 0 

771 deleted = "" 

772 

773 if self.cursor_position > 0: 

774 deleted = self.text[self.cursor_position - count : self.cursor_position] 

775 

776 new_text = ( 

777 self.text[: self.cursor_position - count] 

778 + self.text[self.cursor_position :] 

779 ) 

780 new_cursor_position = self.cursor_position - len(deleted) 

781 

782 # Set new Document atomically. 

783 self.document = Document(new_text, new_cursor_position) 

784 

785 return deleted 

786 

787 def delete(self, count: int = 1) -> str: 

788 """ 

789 Delete specified number of characters and Return the deleted text. 

790 """ 

791 if self.cursor_position < len(self.text): 

792 deleted = self.document.text_after_cursor[:count] 

793 self.text = ( 

794 self.text[: self.cursor_position] 

795 + self.text[self.cursor_position + len(deleted) :] 

796 ) 

797 return deleted 

798 else: 

799 return "" 

800 

801 def join_next_line(self, separator: str = " ") -> None: 

802 """ 

803 Join the next line to the current one by deleting the line ending after 

804 the current line. 

805 """ 

806 if not self.document.on_last_line: 

807 self.cursor_position += self.document.get_end_of_line_position() 

808 self.delete() 

809 

810 # Remove spaces. 

811 self.text = ( 

812 self.document.text_before_cursor 

813 + separator 

814 + self.document.text_after_cursor.lstrip(" ") 

815 ) 

816 

817 def join_selected_lines(self, separator: str = " ") -> None: 

818 """ 

819 Join the selected lines. 

820 """ 

821 assert self.selection_state 

822 

823 # Get lines. 

824 from_, to = sorted( 

825 [self.cursor_position, self.selection_state.original_cursor_position] 

826 ) 

827 

828 before = self.text[:from_] 

829 lines = self.text[from_:to].splitlines() 

830 after = self.text[to:] 

831 

832 # Replace leading spaces with just one space. 

833 lines = [l.lstrip(" ") + separator for l in lines] 

834 

835 # Set new document. 

836 self.document = Document( 

837 text=before + "".join(lines) + after, 

838 cursor_position=len(before + "".join(lines[:-1])) - 1, 

839 ) 

840 

841 def swap_characters_before_cursor(self) -> None: 

842 """ 

843 Swap the last two characters before the cursor. 

844 """ 

845 pos = self.cursor_position 

846 

847 if pos >= 2: 

848 a = self.text[pos - 2] 

849 b = self.text[pos - 1] 

850 

851 self.text = self.text[: pos - 2] + b + a + self.text[pos:] 

852 

853 def go_to_history(self, index: int) -> None: 

854 """ 

855 Go to this item in the history. 

856 """ 

857 if index < len(self._working_lines): 

858 self.working_index = index 

859 self.cursor_position = len(self.text) 

860 

861 def complete_next(self, count: int = 1, disable_wrap_around: bool = False) -> None: 

862 """ 

863 Browse to the next completions. 

864 (Does nothing if there are no completion.) 

865 """ 

866 index: int | None 

867 

868 if self.complete_state: 

869 completions_count = len(self.complete_state.completions) 

870 

871 if self.complete_state.complete_index is None: 

872 index = 0 

873 elif self.complete_state.complete_index == completions_count - 1: 

874 index = None 

875 

876 if disable_wrap_around: 

877 return 

878 else: 

879 index = min( 

880 completions_count - 1, self.complete_state.complete_index + count 

881 ) 

882 self.go_to_completion(index) 

883 

884 def complete_previous( 

885 self, count: int = 1, disable_wrap_around: bool = False 

886 ) -> None: 

887 """ 

888 Browse to the previous completions. 

889 (Does nothing if there are no completion.) 

890 """ 

891 index: int | None 

892 

893 if self.complete_state: 

894 if self.complete_state.complete_index == 0: 

895 index = None 

896 

897 if disable_wrap_around: 

898 return 

899 elif self.complete_state.complete_index is None: 

900 index = len(self.complete_state.completions) - 1 

901 else: 

902 index = max(0, self.complete_state.complete_index - count) 

903 

904 self.go_to_completion(index) 

905 

906 def cancel_completion(self) -> None: 

907 """ 

908 Cancel completion, go back to the original text. 

909 """ 

910 if self.complete_state: 

911 self.go_to_completion(None) 

912 self.complete_state = None 

913 

914 def _set_completions(self, completions: list[Completion]) -> CompletionState: 

915 """ 

916 Start completions. (Generate list of completions and initialize.) 

917 

918 By default, no completion will be selected. 

919 """ 

920 self.complete_state = CompletionState( 

921 original_document=self.document, completions=completions 

922 ) 

923 

924 # Trigger event. This should eventually invalidate the layout. 

925 self.on_completions_changed.fire() 

926 

927 return self.complete_state 

928 

929 def start_history_lines_completion(self) -> None: 

930 """ 

931 Start a completion based on all the other lines in the document and the 

932 history. 

933 """ 

934 found_completions: set[str] = set() 

935 completions = [] 

936 

937 # For every line of the whole history, find matches with the current line. 

938 current_line = self.document.current_line_before_cursor.lstrip() 

939 

940 for i, string in enumerate(self._working_lines): 

941 for j, l in enumerate(string.split("\n")): 

942 l = l.strip() 

943 if l and l.startswith(current_line): 

944 # When a new line has been found. 

945 if l not in found_completions: 

946 found_completions.add(l) 

947 

948 # Create completion. 

949 if i == self.working_index: 

950 display_meta = "Current, line %s" % (j + 1) 

951 else: 

952 display_meta = f"History {i + 1}, line {j + 1}" 

953 

954 completions.append( 

955 Completion( 

956 text=l, 

957 start_position=-len(current_line), 

958 display_meta=display_meta, 

959 ) 

960 ) 

961 

962 self._set_completions(completions=completions[::-1]) 

963 self.go_to_completion(0) 

964 

965 def go_to_completion(self, index: int | None) -> None: 

966 """ 

967 Select a completion from the list of current completions. 

968 """ 

969 assert self.complete_state 

970 

971 # Set new completion 

972 state = self.complete_state 

973 state.go_to_index(index) 

974 

975 # Set text/cursor position 

976 new_text, new_cursor_position = state.new_text_and_position() 

977 self.document = Document(new_text, new_cursor_position) 

978 

979 # (changing text/cursor position will unset complete_state.) 

980 self.complete_state = state 

981 

982 def apply_completion(self, completion: Completion) -> None: 

983 """ 

984 Insert a given completion. 

985 """ 

986 # If there was already a completion active, cancel that one. 

987 if self.complete_state: 

988 self.go_to_completion(None) 

989 self.complete_state = None 

990 

991 # Insert text from the given completion. 

992 self.delete_before_cursor(-completion.start_position) 

993 self.insert_text(completion.text) 

994 

995 def _set_history_search(self) -> None: 

996 """ 

997 Set `history_search_text`. 

998 (The text before the cursor will be used for filtering the history.) 

999 """ 

1000 if self.enable_history_search(): 

1001 if self.history_search_text is None: 

1002 self.history_search_text = self.document.text_before_cursor 

1003 else: 

1004 self.history_search_text = None 

1005 

1006 def _history_matches(self, i: int) -> bool: 

1007 """ 

1008 True when the current entry matches the history search. 

1009 (when we don't have history search, it's also True.) 

1010 """ 

1011 return self.history_search_text is None or self._working_lines[i].startswith( 

1012 self.history_search_text 

1013 ) 

1014 

1015 def history_forward(self, count: int = 1) -> None: 

1016 """ 

1017 Move forwards through the history. 

1018 

1019 :param count: Amount of items to move forward. 

1020 """ 

1021 self._set_history_search() 

1022 

1023 # Go forward in history. 

1024 found_something = False 

1025 

1026 for i in range(self.working_index + 1, len(self._working_lines)): 

1027 if self._history_matches(i): 

1028 self.working_index = i 

1029 count -= 1 

1030 found_something = True 

1031 if count == 0: 

1032 break 

1033 

1034 # If we found an entry, move cursor to the end of the first line. 

1035 if found_something: 

1036 self.cursor_position = 0 

1037 self.cursor_position += self.document.get_end_of_line_position() 

1038 

1039 def history_backward(self, count: int = 1) -> None: 

1040 """ 

1041 Move backwards through history. 

1042 """ 

1043 self._set_history_search() 

1044 

1045 # Go back in history. 

1046 found_something = False 

1047 

1048 for i in range(self.working_index - 1, -1, -1): 

1049 if self._history_matches(i): 

1050 self.working_index = i 

1051 count -= 1 

1052 found_something = True 

1053 if count == 0: 

1054 break 

1055 

1056 # If we move to another entry, move cursor to the end of the line. 

1057 if found_something: 

1058 self.cursor_position = len(self.text) 

1059 

1060 def yank_nth_arg(self, n: int | None = None, _yank_last_arg: bool = False) -> None: 

1061 """ 

1062 Pick nth word from previous history entry (depending on current 

1063 `yank_nth_arg_state`) and insert it at current position. Rotate through 

1064 history if called repeatedly. If no `n` has been given, take the first 

1065 argument. (The second word.) 

1066 

1067 :param n: (None or int), The index of the word from the previous line 

1068 to take. 

1069 """ 

1070 assert n is None or isinstance(n, int) 

1071 history_strings = self.history.get_strings() 

1072 

1073 if not len(history_strings): 

1074 return 

1075 

1076 # Make sure we have a `YankNthArgState`. 

1077 if self.yank_nth_arg_state is None: 

1078 state = YankNthArgState(n=-1 if _yank_last_arg else 1) 

1079 else: 

1080 state = self.yank_nth_arg_state 

1081 

1082 if n is not None: 

1083 state.n = n 

1084 

1085 # Get new history position. 

1086 new_pos = state.history_position - 1 

1087 if -new_pos > len(history_strings): 

1088 new_pos = -1 

1089 

1090 # Take argument from line. 

1091 line = history_strings[new_pos] 

1092 

1093 words = [w.strip() for w in _QUOTED_WORDS_RE.split(line)] 

1094 words = [w for w in words if w] 

1095 try: 

1096 word = words[state.n] 

1097 except IndexError: 

1098 word = "" 

1099 

1100 # Insert new argument. 

1101 if state.previous_inserted_word: 

1102 self.delete_before_cursor(len(state.previous_inserted_word)) 

1103 self.insert_text(word) 

1104 

1105 # Save state again for next completion. (Note that the 'insert' 

1106 # operation from above clears `self.yank_nth_arg_state`.) 

1107 state.previous_inserted_word = word 

1108 state.history_position = new_pos 

1109 self.yank_nth_arg_state = state 

1110 

1111 def yank_last_arg(self, n: int | None = None) -> None: 

1112 """ 

1113 Like `yank_nth_arg`, but if no argument has been given, yank the last 

1114 word by default. 

1115 """ 

1116 self.yank_nth_arg(n=n, _yank_last_arg=True) 

1117 

1118 def start_selection( 

1119 self, selection_type: SelectionType = SelectionType.CHARACTERS 

1120 ) -> None: 

1121 """ 

1122 Take the current cursor position as the start of this selection. 

1123 """ 

1124 self.selection_state = SelectionState(self.cursor_position, selection_type) 

1125 

1126 def copy_selection(self, _cut: bool = False) -> ClipboardData: 

1127 """ 

1128 Copy selected text and return :class:`.ClipboardData` instance. 

1129 

1130 Notice that this doesn't store the copied data on the clipboard yet. 

1131 You can store it like this: 

1132 

1133 .. code:: python 

1134 

1135 data = buffer.copy_selection() 

1136 get_app().clipboard.set_data(data) 

1137 """ 

1138 new_document, clipboard_data = self.document.cut_selection() 

1139 if _cut: 

1140 self.document = new_document 

1141 

1142 self.selection_state = None 

1143 return clipboard_data 

1144 

1145 def cut_selection(self) -> ClipboardData: 

1146 """ 

1147 Delete selected text and return :class:`.ClipboardData` instance. 

1148 """ 

1149 return self.copy_selection(_cut=True) 

1150 

1151 def paste_clipboard_data( 

1152 self, 

1153 data: ClipboardData, 

1154 paste_mode: PasteMode = PasteMode.EMACS, 

1155 count: int = 1, 

1156 ) -> None: 

1157 """ 

1158 Insert the data from the clipboard. 

1159 """ 

1160 assert isinstance(data, ClipboardData) 

1161 assert paste_mode in (PasteMode.VI_BEFORE, PasteMode.VI_AFTER, PasteMode.EMACS) 

1162 

1163 original_document = self.document 

1164 self.document = self.document.paste_clipboard_data( 

1165 data, paste_mode=paste_mode, count=count 

1166 ) 

1167 

1168 # Remember original document. This assignment should come at the end, 

1169 # because assigning to 'document' will erase it. 

1170 self.document_before_paste = original_document 

1171 

1172 def newline(self, copy_margin: bool = True) -> None: 

1173 """ 

1174 Insert a line ending at the current position. 

1175 """ 

1176 if copy_margin: 

1177 self.insert_text("\n" + self.document.leading_whitespace_in_current_line) 

1178 else: 

1179 self.insert_text("\n") 

1180 

1181 def insert_line_above(self, copy_margin: bool = True) -> None: 

1182 """ 

1183 Insert a new line above the current one. 

1184 """ 

1185 if copy_margin: 

1186 insert = self.document.leading_whitespace_in_current_line + "\n" 

1187 else: 

1188 insert = "\n" 

1189 

1190 self.cursor_position += self.document.get_start_of_line_position() 

1191 self.insert_text(insert) 

1192 self.cursor_position -= 1 

1193 

1194 def insert_line_below(self, copy_margin: bool = True) -> None: 

1195 """ 

1196 Insert a new line below the current one. 

1197 """ 

1198 if copy_margin: 

1199 insert = "\n" + self.document.leading_whitespace_in_current_line 

1200 else: 

1201 insert = "\n" 

1202 

1203 self.cursor_position += self.document.get_end_of_line_position() 

1204 self.insert_text(insert) 

1205 

1206 def insert_text( 

1207 self, 

1208 data: str, 

1209 overwrite: bool = False, 

1210 move_cursor: bool = True, 

1211 fire_event: bool = True, 

1212 ) -> None: 

1213 """ 

1214 Insert characters at cursor position. 

1215 

1216 :param fire_event: Fire `on_text_insert` event. This is mainly used to 

1217 trigger autocompletion while typing. 

1218 """ 

1219 # Original text & cursor position. 

1220 otext = self.text 

1221 ocpos = self.cursor_position 

1222 

1223 # In insert/text mode. 

1224 if overwrite: 

1225 # Don't overwrite the newline itself. Just before the line ending, 

1226 # it should act like insert mode. 

1227 overwritten_text = otext[ocpos : ocpos + len(data)] 

1228 if "\n" in overwritten_text: 

1229 overwritten_text = overwritten_text[: overwritten_text.find("\n")] 

1230 

1231 text = otext[:ocpos] + data + otext[ocpos + len(overwritten_text) :] 

1232 else: 

1233 text = otext[:ocpos] + data + otext[ocpos:] 

1234 

1235 if move_cursor: 

1236 cpos = self.cursor_position + len(data) 

1237 else: 

1238 cpos = self.cursor_position 

1239 

1240 # Set new document. 

1241 # (Set text and cursor position at the same time. Otherwise, setting 

1242 # the text will fire a change event before the cursor position has been 

1243 # set. It works better to have this atomic.) 

1244 self.document = Document(text, cpos) 

1245 

1246 # Fire 'on_text_insert' event. 

1247 if fire_event: # XXX: rename to `start_complete`. 

1248 self.on_text_insert.fire() 

1249 

1250 # Only complete when "complete_while_typing" is enabled. 

1251 if self.completer and self.complete_while_typing(): 

1252 get_app().create_background_task(self._async_completer()) 

1253 

1254 # Call auto_suggest. 

1255 if self.auto_suggest: 

1256 get_app().create_background_task(self._async_suggester()) 

1257 

1258 def undo(self) -> None: 

1259 # Pop from the undo-stack until we find a text that if different from 

1260 # the current text. (The current logic of `save_to_undo_stack` will 

1261 # cause that the top of the undo stack is usually the same as the 

1262 # current text, so in that case we have to pop twice.) 

1263 while self._undo_stack: 

1264 text, pos = self._undo_stack.pop() 

1265 

1266 if text != self.text: 

1267 # Push current text to redo stack. 

1268 self._redo_stack.append((self.text, self.cursor_position)) 

1269 

1270 # Set new text/cursor_position. 

1271 self.document = Document(text, cursor_position=pos) 

1272 break 

1273 

1274 def redo(self) -> None: 

1275 if self._redo_stack: 

1276 # Copy current state on undo stack. 

1277 self.save_to_undo_stack(clear_redo_stack=False) 

1278 

1279 # Pop state from redo stack. 

1280 text, pos = self._redo_stack.pop() 

1281 self.document = Document(text, cursor_position=pos) 

1282 

1283 def validate(self, set_cursor: bool = False) -> bool: 

1284 """ 

1285 Returns `True` if valid. 

1286 

1287 :param set_cursor: Set the cursor position, if an error was found. 

1288 """ 

1289 # Don't call the validator again, if it was already called for the 

1290 # current input. 

1291 if self.validation_state != ValidationState.UNKNOWN: 

1292 return self.validation_state == ValidationState.VALID 

1293 

1294 # Call validator. 

1295 if self.validator: 

1296 try: 

1297 self.validator.validate(self.document) 

1298 except ValidationError as e: 

1299 # Set cursor position (don't allow invalid values.) 

1300 if set_cursor: 

1301 self.cursor_position = min( 

1302 max(0, e.cursor_position), len(self.text) 

1303 ) 

1304 

1305 self.validation_state = ValidationState.INVALID 

1306 self.validation_error = e 

1307 return False 

1308 

1309 # Handle validation result. 

1310 self.validation_state = ValidationState.VALID 

1311 self.validation_error = None 

1312 return True 

1313 

1314 async def _validate_async(self) -> None: 

1315 """ 

1316 Asynchronous version of `validate()`. 

1317 This one doesn't set the cursor position. 

1318 

1319 We have both variants, because a synchronous version is required. 

1320 Handling the ENTER key needs to be completely synchronous, otherwise 

1321 stuff like type-ahead is going to give very weird results. (People 

1322 could type input while the ENTER key is still processed.) 

1323 

1324 An asynchronous version is required if we have `validate_while_typing` 

1325 enabled. 

1326 """ 

1327 while True: 

1328 # Don't call the validator again, if it was already called for the 

1329 # current input. 

1330 if self.validation_state != ValidationState.UNKNOWN: 

1331 return 

1332 

1333 # Call validator. 

1334 error = None 

1335 document = self.document 

1336 

1337 if self.validator: 

1338 try: 

1339 await self.validator.validate_async(self.document) 

1340 except ValidationError as e: 

1341 error = e 

1342 

1343 # If the document changed during the validation, try again. 

1344 if self.document != document: 

1345 continue 

1346 

1347 # Handle validation result. 

1348 if error: 

1349 self.validation_state = ValidationState.INVALID 

1350 else: 

1351 self.validation_state = ValidationState.VALID 

1352 

1353 self.validation_error = error 

1354 get_app().invalidate() # Trigger redraw (display error). 

1355 

1356 def append_to_history(self) -> None: 

1357 """ 

1358 Append the current input to the history. 

1359 """ 

1360 # Save at the tail of the history. (But don't if the last entry the 

1361 # history is already the same.) 

1362 if self.text: 

1363 history_strings = self.history.get_strings() 

1364 if not len(history_strings) or history_strings[-1] != self.text: 

1365 self.history.append_string(self.text) 

1366 

1367 def _search( 

1368 self, 

1369 search_state: SearchState, 

1370 include_current_position: bool = False, 

1371 count: int = 1, 

1372 ) -> tuple[int, int] | None: 

1373 """ 

1374 Execute search. Return (working_index, cursor_position) tuple when this 

1375 search is applied. Returns `None` when this text cannot be found. 

1376 """ 

1377 assert count > 0 

1378 

1379 text = search_state.text 

1380 direction = search_state.direction 

1381 ignore_case = search_state.ignore_case() 

1382 

1383 def search_once( 

1384 working_index: int, document: Document 

1385 ) -> tuple[int, Document] | None: 

1386 """ 

1387 Do search one time. 

1388 Return (working_index, document) or `None` 

1389 """ 

1390 if direction == SearchDirection.FORWARD: 

1391 # Try find at the current input. 

1392 new_index = document.find( 

1393 text, 

1394 include_current_position=include_current_position, 

1395 ignore_case=ignore_case, 

1396 ) 

1397 

1398 if new_index is not None: 

1399 return ( 

1400 working_index, 

1401 Document(document.text, document.cursor_position + new_index), 

1402 ) 

1403 else: 

1404 # No match, go forward in the history. (Include len+1 to wrap around.) 

1405 # (Here we should always include all cursor positions, because 

1406 # it's a different line.) 

1407 for i in range(working_index + 1, len(self._working_lines) + 1): 

1408 i %= len(self._working_lines) 

1409 

1410 document = Document(self._working_lines[i], 0) 

1411 new_index = document.find( 

1412 text, include_current_position=True, ignore_case=ignore_case 

1413 ) 

1414 if new_index is not None: 

1415 return (i, Document(document.text, new_index)) 

1416 else: 

1417 # Try find at the current input. 

1418 new_index = document.find_backwards(text, ignore_case=ignore_case) 

1419 

1420 if new_index is not None: 

1421 return ( 

1422 working_index, 

1423 Document(document.text, document.cursor_position + new_index), 

1424 ) 

1425 else: 

1426 # No match, go back in the history. (Include -1 to wrap around.) 

1427 for i in range(working_index - 1, -2, -1): 

1428 i %= len(self._working_lines) 

1429 

1430 document = Document( 

1431 self._working_lines[i], len(self._working_lines[i]) 

1432 ) 

1433 new_index = document.find_backwards( 

1434 text, ignore_case=ignore_case 

1435 ) 

1436 if new_index is not None: 

1437 return ( 

1438 i, 

1439 Document(document.text, len(document.text) + new_index), 

1440 ) 

1441 return None 

1442 

1443 # Do 'count' search iterations. 

1444 working_index = self.working_index 

1445 document = self.document 

1446 for _ in range(count): 

1447 result = search_once(working_index, document) 

1448 if result is None: 

1449 return None # Nothing found. 

1450 else: 

1451 working_index, document = result 

1452 

1453 return (working_index, document.cursor_position) 

1454 

1455 def document_for_search(self, search_state: SearchState) -> Document: 

1456 """ 

1457 Return a :class:`~prompt_toolkit.document.Document` instance that has 

1458 the text/cursor position for this search, if we would apply it. This 

1459 will be used in the 

1460 :class:`~prompt_toolkit.layout.BufferControl` to display feedback while 

1461 searching. 

1462 """ 

1463 search_result = self._search(search_state, include_current_position=True) 

1464 

1465 if search_result is None: 

1466 return self.document 

1467 else: 

1468 working_index, cursor_position = search_result 

1469 

1470 # Keep selection, when `working_index` was not changed. 

1471 if working_index == self.working_index: 

1472 selection = self.selection_state 

1473 else: 

1474 selection = None 

1475 

1476 return Document( 

1477 self._working_lines[working_index], cursor_position, selection=selection 

1478 ) 

1479 

1480 def get_search_position( 

1481 self, 

1482 search_state: SearchState, 

1483 include_current_position: bool = True, 

1484 count: int = 1, 

1485 ) -> int: 

1486 """ 

1487 Get the cursor position for this search. 

1488 (This operation won't change the `working_index`. It's won't go through 

1489 the history. Vi text objects can't span multiple items.) 

1490 """ 

1491 search_result = self._search( 

1492 search_state, include_current_position=include_current_position, count=count 

1493 ) 

1494 

1495 if search_result is None: 

1496 return self.cursor_position 

1497 else: 

1498 working_index, cursor_position = search_result 

1499 return cursor_position 

1500 

1501 def apply_search( 

1502 self, 

1503 search_state: SearchState, 

1504 include_current_position: bool = True, 

1505 count: int = 1, 

1506 ) -> None: 

1507 """ 

1508 Apply search. If something is found, set `working_index` and 

1509 `cursor_position`. 

1510 """ 

1511 search_result = self._search( 

1512 search_state, include_current_position=include_current_position, count=count 

1513 ) 

1514 

1515 if search_result is not None: 

1516 working_index, cursor_position = search_result 

1517 self.working_index = working_index 

1518 self.cursor_position = cursor_position 

1519 

1520 def exit_selection(self) -> None: 

1521 self.selection_state = None 

1522 

1523 def _editor_simple_tempfile(self) -> tuple[str, Callable[[], None]]: 

1524 """ 

1525 Simple (file) tempfile implementation. 

1526 Return (tempfile, cleanup_func). 

1527 """ 

1528 suffix = to_str(self.tempfile_suffix) 

1529 descriptor, filename = tempfile.mkstemp(suffix) 

1530 

1531 os.write(descriptor, self.text.encode("utf-8")) 

1532 os.close(descriptor) 

1533 

1534 def cleanup() -> None: 

1535 os.unlink(filename) 

1536 

1537 return filename, cleanup 

1538 

1539 def _editor_complex_tempfile(self) -> tuple[str, Callable[[], None]]: 

1540 # Complex (directory) tempfile implementation. 

1541 headtail = to_str(self.tempfile) 

1542 if not headtail: 

1543 # Revert to simple case. 

1544 return self._editor_simple_tempfile() 

1545 headtail = str(headtail) 

1546 

1547 # Try to make according to tempfile logic. 

1548 head, tail = os.path.split(headtail) 

1549 if os.path.isabs(head): 

1550 head = head[1:] 

1551 

1552 dirpath = tempfile.mkdtemp() 

1553 if head: 

1554 dirpath = os.path.join(dirpath, head) 

1555 # Assume there is no issue creating dirs in this temp dir. 

1556 os.makedirs(dirpath) 

1557 

1558 # Open the filename and write current text. 

1559 filename = os.path.join(dirpath, tail) 

1560 with open(filename, "w", encoding="utf-8") as fh: 

1561 fh.write(self.text) 

1562 

1563 def cleanup() -> None: 

1564 shutil.rmtree(dirpath) 

1565 

1566 return filename, cleanup 

1567 

1568 def open_in_editor(self, validate_and_handle: bool = False) -> asyncio.Task[None]: 

1569 """ 

1570 Open code in editor. 

1571 

1572 This returns a future, and runs in a thread executor. 

1573 """ 

1574 if self.read_only(): 

1575 raise EditReadOnlyBuffer() 

1576 

1577 # Write current text to temporary file 

1578 if self.tempfile: 

1579 filename, cleanup_func = self._editor_complex_tempfile() 

1580 else: 

1581 filename, cleanup_func = self._editor_simple_tempfile() 

1582 

1583 async def run() -> None: 

1584 try: 

1585 # Open in editor 

1586 # (We need to use `run_in_terminal`, because not all editors go to 

1587 # the alternate screen buffer, and some could influence the cursor 

1588 # position.) 

1589 success = await run_in_terminal( 

1590 lambda: self._open_file_in_editor(filename), in_executor=True 

1591 ) 

1592 

1593 # Read content again. 

1594 if success: 

1595 with open(filename, "rb") as f: 

1596 text = f.read().decode("utf-8") 

1597 

1598 # Drop trailing newline. (Editors are supposed to add it at the 

1599 # end, but we don't need it.) 

1600 if text.endswith("\n"): 

1601 text = text[:-1] 

1602 

1603 self.document = Document(text=text, cursor_position=len(text)) 

1604 

1605 # Accept the input. 

1606 if validate_and_handle: 

1607 self.validate_and_handle() 

1608 

1609 finally: 

1610 # Clean up temp dir/file. 

1611 cleanup_func() 

1612 

1613 return get_app().create_background_task(run()) 

1614 

1615 def _open_file_in_editor(self, filename: str) -> bool: 

1616 """ 

1617 Call editor executable. 

1618 

1619 Return True when we received a zero return code. 

1620 """ 

1621 # If the 'VISUAL' or 'EDITOR' environment variable has been set, use that. 

1622 # Otherwise, fall back to the first available editor that we can find. 

1623 visual = os.environ.get("VISUAL") 

1624 editor = os.environ.get("EDITOR") 

1625 

1626 editors = [ 

1627 visual, 

1628 editor, 

1629 # Order of preference. 

1630 "/usr/bin/editor", 

1631 "/usr/bin/nano", 

1632 "/usr/bin/pico", 

1633 "/usr/bin/vi", 

1634 "/usr/bin/emacs", 

1635 ] 

1636 

1637 for e in editors: 

1638 if e: 

1639 try: 

1640 # Use 'shlex.split()', because $VISUAL can contain spaces 

1641 # and quotes. 

1642 returncode = subprocess.call(shlex.split(e) + [filename]) 

1643 return returncode == 0 

1644 

1645 except OSError: 

1646 # Executable does not exist, try the next one. 

1647 pass 

1648 

1649 return False 

1650 

1651 def start_completion( 

1652 self, 

1653 select_first: bool = False, 

1654 select_last: bool = False, 

1655 insert_common_part: bool = False, 

1656 complete_event: CompleteEvent | None = None, 

1657 ) -> None: 

1658 """ 

1659 Start asynchronous autocompletion of this buffer. 

1660 (This will do nothing if a previous completion was still in progress.) 

1661 """ 

1662 # Only one of these options can be selected. 

1663 assert select_first + select_last + insert_common_part <= 1 

1664 

1665 get_app().create_background_task( 

1666 self._async_completer( 

1667 select_first=select_first, 

1668 select_last=select_last, 

1669 insert_common_part=insert_common_part, 

1670 complete_event=complete_event 

1671 or CompleteEvent(completion_requested=True), 

1672 ) 

1673 ) 

1674 

1675 def _create_completer_coroutine(self) -> Callable[..., Coroutine[Any, Any, None]]: 

1676 """ 

1677 Create function for asynchronous autocompletion. 

1678 

1679 (This consumes the asynchronous completer generator, which possibly 

1680 runs the completion algorithm in another thread.) 

1681 """ 

1682 

1683 def completion_does_nothing(document: Document, completion: Completion) -> bool: 

1684 """ 

1685 Return `True` if applying this completion doesn't have any effect. 

1686 (When it doesn't insert any new text. 

1687 """ 

1688 text_before_cursor = document.text_before_cursor 

1689 replaced_text = text_before_cursor[ 

1690 len(text_before_cursor) + completion.start_position : 

1691 ] 

1692 return replaced_text == completion.text 

1693 

1694 @_only_one_at_a_time 

1695 async def async_completer( 

1696 select_first: bool = False, 

1697 select_last: bool = False, 

1698 insert_common_part: bool = False, 

1699 complete_event: CompleteEvent | None = None, 

1700 ) -> None: 

1701 document = self.document 

1702 complete_event = complete_event or CompleteEvent(text_inserted=True) 

1703 

1704 # Don't complete when we already have completions. 

1705 if self.complete_state or not self.completer: 

1706 return 

1707 

1708 # Create an empty CompletionState. 

1709 complete_state = CompletionState(original_document=self.document) 

1710 self.complete_state = complete_state 

1711 

1712 def proceed() -> bool: 

1713 """Keep retrieving completions. Input text has not yet changed 

1714 while generating completions.""" 

1715 return self.complete_state == complete_state 

1716 

1717 refresh_needed = asyncio.Event() 

1718 

1719 async def refresh_while_loading() -> None: 

1720 """Background loop to refresh the UI at most 3 times a second 

1721 while the completion are loading. Calling 

1722 `on_completions_changed.fire()` for every completion that we 

1723 receive is too expensive when there are many completions. (We 

1724 could tune `Application.max_render_postpone_time` and 

1725 `Application.min_redraw_interval`, but having this here is a 

1726 better approach.) 

1727 """ 

1728 while True: 

1729 self.on_completions_changed.fire() 

1730 refresh_needed.clear() 

1731 await asyncio.sleep(0.3) 

1732 await refresh_needed.wait() 

1733 

1734 refresh_task = asyncio.ensure_future(refresh_while_loading()) 

1735 try: 

1736 # Load. 

1737 async with aclosing( 

1738 self.completer.get_completions_async(document, complete_event) 

1739 ) as async_generator: 

1740 async for completion in async_generator: 

1741 complete_state.completions.append(completion) 

1742 refresh_needed.set() 

1743 

1744 # If the input text changes, abort. 

1745 if not proceed(): 

1746 break 

1747 

1748 # Always stop at 10k completions. 

1749 if ( 

1750 len(complete_state.completions) 

1751 >= self.max_number_of_completions 

1752 ): 

1753 break 

1754 finally: 

1755 refresh_task.cancel() 

1756 

1757 # Refresh one final time after we got everything. 

1758 self.on_completions_changed.fire() 

1759 

1760 completions = complete_state.completions 

1761 

1762 # When there is only one completion, which has nothing to add, ignore it. 

1763 if len(completions) == 1 and completion_does_nothing( 

1764 document, completions[0] 

1765 ): 

1766 del completions[:] 

1767 

1768 # Set completions if the text was not yet changed. 

1769 if proceed(): 

1770 # When no completions were found, or when the user selected 

1771 # already a completion by using the arrow keys, don't do anything. 

1772 if ( 

1773 not self.complete_state 

1774 or self.complete_state.complete_index is not None 

1775 ): 

1776 return 

1777 

1778 # When there are no completions, reset completion state anyway. 

1779 if not completions: 

1780 self.complete_state = None 

1781 # Render the ui if the completion menu was shown 

1782 # it is needed especially if there is one completion and it was deleted. 

1783 self.on_completions_changed.fire() 

1784 return 

1785 

1786 # Select first/last or insert common part, depending on the key 

1787 # binding. (For this we have to wait until all completions are 

1788 # loaded.) 

1789 

1790 if select_first: 

1791 self.go_to_completion(0) 

1792 

1793 elif select_last: 

1794 self.go_to_completion(len(completions) - 1) 

1795 

1796 elif insert_common_part: 

1797 common_part = get_common_complete_suffix(document, completions) 

1798 if common_part: 

1799 # Insert the common part, update completions. 

1800 self.insert_text(common_part) 

1801 if len(completions) > 1: 

1802 # (Don't call `async_completer` again, but 

1803 # recalculate completions. See: 

1804 # https://github.com/ipython/ipython/issues/9658) 

1805 completions[:] = [ 

1806 c.new_completion_from_position(len(common_part)) 

1807 for c in completions 

1808 ] 

1809 

1810 self._set_completions(completions=completions) 

1811 else: 

1812 self.complete_state = None 

1813 else: 

1814 # When we were asked to insert the "common" 

1815 # prefix, but there was no common suffix but 

1816 # still exactly one match, then select the 

1817 # first. (It could be that we have a completion 

1818 # which does * expansion, like '*.py', with 

1819 # exactly one match.) 

1820 if len(completions) == 1: 

1821 self.go_to_completion(0) 

1822 

1823 else: 

1824 # If the last operation was an insert, (not a delete), restart 

1825 # the completion coroutine. 

1826 

1827 if self.document.text_before_cursor == document.text_before_cursor: 

1828 return # Nothing changed. 

1829 

1830 if self.document.text_before_cursor.startswith( 

1831 document.text_before_cursor 

1832 ): 

1833 raise _Retry 

1834 

1835 return async_completer 

1836 

1837 def _create_auto_suggest_coroutine(self) -> Callable[[], Coroutine[Any, Any, None]]: 

1838 """ 

1839 Create function for asynchronous auto suggestion. 

1840 (This can be in another thread.) 

1841 """ 

1842 

1843 @_only_one_at_a_time 

1844 async def async_suggestor() -> None: 

1845 document = self.document 

1846 

1847 # Don't suggest when we already have a suggestion. 

1848 if self.suggestion or not self.auto_suggest: 

1849 return 

1850 

1851 suggestion = await self.auto_suggest.get_suggestion_async(self, document) 

1852 

1853 # Set suggestion only if the text was not yet changed. 

1854 if self.document == document: 

1855 # Set suggestion and redraw interface. 

1856 self.suggestion = suggestion 

1857 self.on_suggestion_set.fire() 

1858 else: 

1859 # Otherwise, restart thread. 

1860 raise _Retry 

1861 

1862 return async_suggestor 

1863 

1864 def _create_auto_validate_coroutine( 

1865 self, 

1866 ) -> Callable[[], Coroutine[Any, Any, None]]: 

1867 """ 

1868 Create a function for asynchronous validation while typing. 

1869 (This can be in another thread.) 

1870 """ 

1871 

1872 @_only_one_at_a_time 

1873 async def async_validator() -> None: 

1874 await self._validate_async() 

1875 

1876 return async_validator 

1877 

1878 def validate_and_handle(self) -> None: 

1879 """ 

1880 Validate buffer and handle the accept action. 

1881 """ 

1882 valid = self.validate(set_cursor=True) 

1883 

1884 # When the validation succeeded, accept the input. 

1885 if valid: 

1886 if self.accept_handler: 

1887 keep_text = self.accept_handler(self) 

1888 else: 

1889 keep_text = False 

1890 

1891 self.append_to_history() 

1892 

1893 if not keep_text: 

1894 self.reset() 

1895 

1896 

1897_T = TypeVar("_T", bound=Callable[..., Coroutine[Any, Any, None]]) 

1898 

1899 

1900def _only_one_at_a_time(coroutine: _T) -> _T: 

1901 """ 

1902 Decorator that only starts the coroutine only if the previous call has 

1903 finished. (Used to make sure that we have only one autocompleter, auto 

1904 suggestor and validator running at a time.) 

1905 

1906 When the coroutine raises `_Retry`, it is restarted. 

1907 """ 

1908 running = False 

1909 

1910 @wraps(coroutine) 

1911 async def new_coroutine(*a: Any, **kw: Any) -> Any: 

1912 nonlocal running 

1913 

1914 # Don't start a new function, if the previous is still in progress. 

1915 if running: 

1916 return 

1917 

1918 running = True 

1919 

1920 try: 

1921 while True: 

1922 try: 

1923 await coroutine(*a, **kw) 

1924 except _Retry: 

1925 continue 

1926 else: 

1927 return None 

1928 finally: 

1929 running = False 

1930 

1931 return cast(_T, new_coroutine) 

1932 

1933 

1934class _Retry(Exception): 

1935 "Retry in `_only_one_at_a_time`." 

1936 

1937 

1938def indent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: 

1939 """ 

1940 Indent text of a :class:`.Buffer` object. 

1941 """ 

1942 current_row = buffer.document.cursor_position_row 

1943 current_col = buffer.document.cursor_position_col 

1944 line_range = range(from_row, to_row) 

1945 

1946 # Apply transformation. 

1947 indent_content = " " * count 

1948 new_text = buffer.transform_lines(line_range, lambda l: indent_content + l) 

1949 buffer.document = Document( 

1950 new_text, Document(new_text).translate_row_col_to_index(current_row, 0) 

1951 ) 

1952 

1953 # Place cursor in the same position in text after indenting 

1954 buffer.cursor_position += current_col + len(indent_content) 

1955 

1956 

1957def unindent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: 

1958 """ 

1959 Unindent text of a :class:`.Buffer` object. 

1960 """ 

1961 current_row = buffer.document.cursor_position_row 

1962 current_col = buffer.document.cursor_position_col 

1963 line_range = range(from_row, to_row) 

1964 

1965 indent_content = " " * count 

1966 

1967 def transform(text: str) -> str: 

1968 remove = indent_content 

1969 if text.startswith(remove): 

1970 return text[len(remove) :] 

1971 else: 

1972 return text.lstrip() 

1973 

1974 # Apply transformation. 

1975 new_text = buffer.transform_lines(line_range, transform) 

1976 buffer.document = Document( 

1977 new_text, Document(new_text).translate_row_col_to_index(current_row, 0) 

1978 ) 

1979 

1980 # Place cursor in the same position in text after dedent 

1981 buffer.cursor_position += current_col - len(indent_content) 

1982 

1983 

1984def reshape_text(buffer: Buffer, from_row: int, to_row: int) -> None: 

1985 """ 

1986 Reformat text, taking the width into account. 

1987 `to_row` is included. 

1988 (Vi 'gq' operator.) 

1989 """ 

1990 lines = buffer.text.splitlines(True) 

1991 lines_before = lines[:from_row] 

1992 lines_after = lines[to_row + 1 :] 

1993 lines_to_reformat = lines[from_row : to_row + 1] 

1994 

1995 if lines_to_reformat: 

1996 # Take indentation from the first line. 

1997 match = re.search(r"^\s*", lines_to_reformat[0]) 

1998 length = match.end() if match else 0 # `match` can't be None, actually. 

1999 

2000 indent = lines_to_reformat[0][:length].replace("\n", "") 

2001 

2002 # Now, take all the 'words' from the lines to be reshaped. 

2003 words = "".join(lines_to_reformat).split() 

2004 

2005 # And reshape. 

2006 width = (buffer.text_width or 80) - len(indent) 

2007 reshaped_text = [indent] 

2008 current_width = 0 

2009 for w in words: 

2010 if current_width: 

2011 if len(w) + current_width + 1 > width: 

2012 reshaped_text.append("\n") 

2013 reshaped_text.append(indent) 

2014 current_width = 0 

2015 else: 

2016 reshaped_text.append(" ") 

2017 current_width += 1 

2018 

2019 reshaped_text.append(w) 

2020 current_width += len(w) 

2021 

2022 if reshaped_text[-1] != "\n": 

2023 reshaped_text.append("\n") 

2024 

2025 # Apply result. 

2026 buffer.document = Document( 

2027 text="".join(lines_before + reshaped_text + lines_after), 

2028 cursor_position=len("".join(lines_before + reshaped_text)), 

2029 )