Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/buffer.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

819 statements  

1""" 

2Data structures for the Buffer. 

3It holds the text, cursor position, history, etc... 

4""" 

5 

6from __future__ import annotations 

7 

8import asyncio 

9import logging 

10import os 

11import re 

12import shlex 

13import shutil 

14import subprocess 

15import tempfile 

16from collections import deque 

17from collections.abc import Callable, Coroutine, Iterable 

18from enum import Enum 

19from functools import wraps 

20from typing import Any, TypeVar, cast 

21 

22from .application.current import get_app 

23from .application.run_in_terminal import run_in_terminal 

24from .auto_suggest import AutoSuggest, Suggestion 

25from .cache import FastDictCache 

26from .clipboard import ClipboardData 

27from .completion import ( 

28 CompleteEvent, 

29 Completer, 

30 Completion, 

31 DummyCompleter, 

32 get_common_complete_suffix, 

33) 

34from .document import Document 

35from .eventloop import aclosing 

36from .filters import FilterOrBool, to_filter 

37from .history import History, InMemoryHistory 

38from .search import SearchDirection, SearchState 

39from .selection import PasteMode, SelectionState, SelectionType 

40from .utils import Event, to_str 

41from .validation import ValidationError, Validator 

42 

43__all__ = [ 

44 "EditReadOnlyBuffer", 

45 "Buffer", 

46 "CompletionState", 

47 "indent", 

48 "unindent", 

49 "reshape_text", 

50] 

51 

52logger = logging.getLogger(__name__) 

53 

54 

55class EditReadOnlyBuffer(Exception): 

56 "Attempt editing of read-only :class:`.Buffer`." 

57 

58 

59class ValidationState(Enum): 

60 "The validation state of a buffer. This is set after the validation." 

61 

62 VALID = "VALID" 

63 INVALID = "INVALID" 

64 UNKNOWN = "UNKNOWN" 

65 

66 

67class CompletionState: 

68 """ 

69 Immutable class that contains a completion state. 

70 """ 

71 

72 def __init__( 

73 self, 

74 original_document: Document, 

75 completions: list[Completion] | None = None, 

76 complete_index: int | None = None, 

77 ) -> None: 

78 #: Document as it was when the completion started. 

79 self.original_document = original_document 

80 

81 #: List of all the current Completion instances which are possible at 

82 #: this point. 

83 self.completions = completions or [] 

84 

85 #: Position in the `completions` array. 

86 #: This can be `None` to indicate "no completion", the original text. 

87 self.complete_index = complete_index # Position in the `_completions` array. 

88 

89 def __repr__(self) -> str: 

90 return f"{self.__class__.__name__}({self.original_document!r}, <{len(self.completions)!r}> completions, index={self.complete_index!r})" 

91 

92 def go_to_index(self, index: int | None) -> None: 

93 """ 

94 Create a new :class:`.CompletionState` object with the new index. 

95 

96 When `index` is `None` deselect the completion. 

97 """ 

98 if self.completions: 

99 assert index is None or 0 <= index < len(self.completions) 

100 self.complete_index = index 

101 

102 def new_text_and_position(self) -> tuple[str, int]: 

103 """ 

104 Return (new_text, new_cursor_position) for this completion. 

105 """ 

106 if self.complete_index is None: 

107 return self.original_document.text, self.original_document.cursor_position 

108 else: 

109 original_text_before_cursor = self.original_document.text_before_cursor 

110 original_text_after_cursor = self.original_document.text_after_cursor 

111 

112 c = self.completions[self.complete_index] 

113 if c.start_position == 0: 

114 before = original_text_before_cursor 

115 else: 

116 before = original_text_before_cursor[: c.start_position] 

117 

118 new_text = before + c.text + original_text_after_cursor 

119 new_cursor_position = len(before) + len(c.text) 

120 return new_text, new_cursor_position 

121 

122 @property 

123 def current_completion(self) -> Completion | None: 

124 """ 

125 Return the current completion, or return `None` when no completion is 

126 selected. 

127 """ 

128 if self.complete_index is not None: 

129 return self.completions[self.complete_index] 

130 return None 

131 

132 

133_QUOTED_WORDS_RE = re.compile(r"""(\s+|".*?"|'.*?')""") 

134 

135 

136class YankNthArgState: 

137 """ 

138 For yank-last-arg/yank-nth-arg: Keep track of where we are in the history. 

139 """ 

140 

141 def __init__( 

142 self, history_position: int = 0, n: int = -1, previous_inserted_word: str = "" 

143 ) -> None: 

144 self.history_position = history_position 

145 self.previous_inserted_word = previous_inserted_word 

146 self.n = n 

147 

148 def __repr__(self) -> str: 

149 return f"{self.__class__.__name__}(history_position={self.history_position!r}, n={self.n!r}, previous_inserted_word={self.previous_inserted_word!r})" 

150 

151 

152BufferEventHandler = Callable[["Buffer"], None] 

153BufferAcceptHandler = Callable[["Buffer"], bool] 

154 

155 

156class Buffer: 

157 """ 

158 The core data structure that holds the text and cursor position of the 

159 current input line and implements all text manipulations on top of it. It 

160 also implements the history, undo stack and the completion state. 

161 

162 :param completer: :class:`~prompt_toolkit.completion.Completer` instance. 

163 :param history: :class:`~prompt_toolkit.history.History` instance. 

164 :param tempfile_suffix: The tempfile suffix (extension) to be used for the 

165 "open in editor" function. For a Python REPL, this would be ".py", so 

166 that the editor knows the syntax highlighting to use. This can also be 

167 a callable that returns a string. 

168 :param tempfile: For more advanced tempfile situations where you need 

169 control over the subdirectories and filename. For a Git Commit Message, 

170 this would be ".git/COMMIT_EDITMSG", so that the editor knows the syntax 

171 highlighting to use. This can also be a callable that returns a string. 

172 :param name: Name for this buffer. E.g. DEFAULT_BUFFER. This is mostly 

173 useful for key bindings where we sometimes prefer to refer to a buffer 

174 by their name instead of by reference. 

175 :param accept_handler: Called when the buffer input is accepted. (Usually 

176 when the user presses `enter`.) The accept handler receives this 

177 `Buffer` as input and should return True when the buffer text should be 

178 kept instead of calling reset. 

179 

180 In case of a `PromptSession` for instance, we want to keep the text, 

181 because we will exit the application, and only reset it during the next 

182 run. 

183 :param max_number_of_completions: Never display more than this number of 

184 completions, even when the completer can produce more (limited by 

185 default to 10k for performance). 

186 

187 Events: 

188 

189 :param on_text_changed: When the buffer text changes. (Callable or None.) 

190 :param on_text_insert: When new text is inserted. (Callable or None.) 

191 :param on_cursor_position_changed: When the cursor moves. (Callable or None.) 

192 :param on_completions_changed: When the completions were changed. (Callable or None.) 

193 :param on_suggestion_set: When an auto-suggestion text has been set. (Callable or None.) 

194 

195 Filters: 

196 

197 :param complete_while_typing: :class:`~prompt_toolkit.filters.Filter` 

198 or `bool`. Decide whether or not to do asynchronous autocompleting while 

199 typing. 

200 :param validate_while_typing: :class:`~prompt_toolkit.filters.Filter` 

201 or `bool`. Decide whether or not to do asynchronous validation while 

202 typing. 

203 :param enable_history_search: :class:`~prompt_toolkit.filters.Filter` or 

204 `bool` to indicate when up-arrow partial string matching is enabled. It 

205 is advised to not enable this at the same time as 

206 `complete_while_typing`, because when there is an autocompletion found, 

207 the up arrows usually browse through the completions, rather than 

208 through the history. 

209 :param read_only: :class:`~prompt_toolkit.filters.Filter`. When True, 

210 changes will not be allowed. 

211 :param multiline: :class:`~prompt_toolkit.filters.Filter` or `bool`. When 

212 not set, pressing `Enter` will call the `accept_handler`. Otherwise, 

213 pressing `Esc-Enter` is required. 

214 """ 

215 

216 def __init__( 

217 self, 

218 completer: Completer | None = None, 

219 auto_suggest: AutoSuggest | None = None, 

220 history: History | None = None, 

221 validator: Validator | None = None, 

222 tempfile_suffix: str | Callable[[], str] = "", 

223 tempfile: str | Callable[[], str] = "", 

224 name: str = "", 

225 complete_while_typing: FilterOrBool = False, 

226 validate_while_typing: FilterOrBool = False, 

227 enable_history_search: FilterOrBool = False, 

228 document: Document | None = None, 

229 accept_handler: BufferAcceptHandler | None = None, 

230 read_only: FilterOrBool = False, 

231 multiline: FilterOrBool = True, 

232 max_number_of_completions: int = 10000, 

233 on_text_changed: BufferEventHandler | None = None, 

234 on_text_insert: BufferEventHandler | None = None, 

235 on_cursor_position_changed: BufferEventHandler | None = None, 

236 on_completions_changed: BufferEventHandler | None = None, 

237 on_suggestion_set: BufferEventHandler | None = None, 

238 ) -> None: 

239 # Accept both filters and booleans as input. 

240 enable_history_search = to_filter(enable_history_search) 

241 complete_while_typing = to_filter(complete_while_typing) 

242 validate_while_typing = to_filter(validate_while_typing) 

243 read_only = to_filter(read_only) 

244 multiline = to_filter(multiline) 

245 

246 self.completer = completer or DummyCompleter() 

247 self.auto_suggest = auto_suggest 

248 self.validator = validator 

249 self.tempfile_suffix = tempfile_suffix 

250 self.tempfile = tempfile 

251 self.name = name 

252 self.accept_handler = accept_handler 

253 

254 # Filters. (Usually, used by the key bindings to drive the buffer.) 

255 self.complete_while_typing = complete_while_typing 

256 self.validate_while_typing = validate_while_typing 

257 self.enable_history_search = enable_history_search 

258 self.read_only = read_only 

259 self.multiline = multiline 

260 self.max_number_of_completions = max_number_of_completions 

261 

262 # Text width. (For wrapping, used by the Vi 'gq' operator.) 

263 self.text_width = 0 

264 

265 #: The command buffer history. 

266 # Note that we shouldn't use a lazy 'or' here. bool(history) could be 

267 # False when empty. 

268 self.history = InMemoryHistory() if history is None else history 

269 

270 self.__cursor_position = 0 

271 

272 # Events 

273 self.on_text_changed: Event[Buffer] = Event(self, on_text_changed) 

274 self.on_text_insert: Event[Buffer] = Event(self, on_text_insert) 

275 self.on_cursor_position_changed: Event[Buffer] = Event( 

276 self, on_cursor_position_changed 

277 ) 

278 self.on_completions_changed: Event[Buffer] = Event(self, on_completions_changed) 

279 self.on_suggestion_set: Event[Buffer] = Event(self, on_suggestion_set) 

280 

281 # Document cache. (Avoid creating new Document instances.) 

282 self._document_cache: FastDictCache[ 

283 tuple[str, int, SelectionState | None], Document 

284 ] = FastDictCache(Document, size=10) 

285 

286 # Create completer / auto suggestion / validation coroutines. 

287 self._async_suggester = self._create_auto_suggest_coroutine() 

288 self._async_completer = self._create_completer_coroutine() 

289 self._async_validator = self._create_auto_validate_coroutine() 

290 

291 # Asyncio task for populating the history. 

292 self._load_history_task: asyncio.Future[None] | None = None 

293 

294 # Reset other attributes. 

295 self.reset(document=document) 

296 

297 def __repr__(self) -> str: 

298 if len(self.text) < 15: 

299 text = self.text 

300 else: 

301 text = self.text[:12] + "..." 

302 

303 return f"<Buffer(name={self.name!r}, text={text!r}) at {id(self)!r}>" 

304 

305 def reset( 

306 self, document: Document | None = None, append_to_history: bool = False 

307 ) -> None: 

308 """ 

309 :param append_to_history: Append current input to history first. 

310 """ 

311 if append_to_history: 

312 self.append_to_history() 

313 

314 document = document or Document() 

315 

316 self.__cursor_position = document.cursor_position 

317 

318 # `ValidationError` instance. (Will be set when the input is wrong.) 

319 self.validation_error: ValidationError | None = None 

320 self.validation_state: ValidationState | None = ValidationState.UNKNOWN 

321 

322 # State of the selection. 

323 self.selection_state: SelectionState | None = None 

324 

325 # Multiple cursor mode. (When we press 'I' or 'A' in visual-block mode, 

326 # we can insert text on multiple lines at once. This is implemented by 

327 # using multiple cursors.) 

328 self.multiple_cursor_positions: list[int] = [] 

329 

330 # When doing consecutive up/down movements, prefer to stay at this column. 

331 self.preferred_column: int | None = None 

332 

333 # State of complete browser 

334 # For interactive completion through Ctrl-N/Ctrl-P. 

335 self.complete_state: CompletionState | None = None 

336 

337 # State of Emacs yank-nth-arg completion. 

338 self.yank_nth_arg_state: YankNthArgState | None = None # for yank-nth-arg. 

339 

340 # Remember the document that we had *right before* the last paste 

341 # operation. This is used for rotating through the kill ring. 

342 self.document_before_paste: Document | None = None 

343 

344 # Current suggestion. 

345 self.suggestion: Suggestion | None = None 

346 

347 # The history search text. (Used for filtering the history when we 

348 # browse through it.) 

349 self.history_search_text: str | None = None 

350 

351 # Undo/redo stacks (stack of `(text, cursor_position)`). 

352 self._undo_stack: list[tuple[str, int]] = [] 

353 self._redo_stack: list[tuple[str, int]] = [] 

354 

355 # Cancel history loader. If history loading was still ongoing. 

356 # Cancel the `_load_history_task`, so that next repaint of the 

357 # `BufferControl` we will repopulate it. 

358 if self._load_history_task is not None: 

359 self._load_history_task.cancel() 

360 self._load_history_task = None 

361 

362 #: The working lines. Similar to history, except that this can be 

363 #: modified. The user can press arrow_up and edit previous entries. 

364 #: Ctrl-C should reset this, and copy the whole history back in here. 

365 #: Enter should process the current command and append to the real 

366 #: history. 

367 self._working_lines: deque[str] = deque([document.text]) 

368 self.__working_index = 0 

369 

370 def load_history_if_not_yet_loaded(self) -> None: 

371 """ 

372 Create task for populating the buffer history (if not yet done). 

373 

374 Note:: 

375 

376 This needs to be called from within the event loop of the 

377 application, because history loading is async, and we need to be 

378 sure the right event loop is active. Therefor, we call this method 

379 in the `BufferControl.create_content`. 

380 

381 There are situations where prompt_toolkit applications are created 

382 in one thread, but will later run in a different thread (Ptpython 

383 is one example. The REPL runs in a separate thread, in order to 

384 prevent interfering with a potential different event loop in the 

385 main thread. The REPL UI however is still created in the main 

386 thread.) We could decide to not support creating prompt_toolkit 

387 objects in one thread and running the application in a different 

388 thread, but history loading is the only place where it matters, and 

389 this solves it. 

390 """ 

391 if self._load_history_task is None: 

392 

393 async def load_history() -> None: 

394 async for item in self.history.load(): 

395 self._working_lines.appendleft(item) 

396 self.__working_index += 1 

397 

398 self._load_history_task = get_app().create_background_task(load_history()) 

399 

400 def load_history_done(f: asyncio.Future[None]) -> None: 

401 """ 

402 Handle `load_history` result when either done, cancelled, or 

403 when an exception was raised. 

404 """ 

405 try: 

406 f.result() 

407 except asyncio.CancelledError: 

408 # Ignore cancellation. But handle it, so that we don't get 

409 # this traceback. 

410 pass 

411 except GeneratorExit: 

412 # Probably not needed, but we had situations where 

413 # `GeneratorExit` was raised in `load_history` during 

414 # cancellation. 

415 pass 

416 except BaseException: 

417 # Log error if something goes wrong. (We don't have a 

418 # caller to which we can propagate this exception.) 

419 logger.exception("Loading history failed") 

420 

421 self._load_history_task.add_done_callback(load_history_done) 

422 

423 # <getters/setters> 

424 

425 def _set_text(self, value: str) -> bool: 

426 """set text at current working_index. Return whether it changed.""" 

427 working_index = self.working_index 

428 working_lines = self._working_lines 

429 

430 original_value = working_lines[working_index] 

431 working_lines[working_index] = value 

432 

433 # Return True when this text has been changed. 

434 if len(value) != len(original_value): 

435 # For Python 2, it seems that when two strings have a different 

436 # length and one is a prefix of the other, Python still scans 

437 # character by character to see whether the strings are different. 

438 # (Some benchmarking showed significant differences for big 

439 # documents. >100,000 of lines.) 

440 return True 

441 elif value != original_value: 

442 return True 

443 return False 

444 

445 def _set_cursor_position(self, value: int) -> bool: 

446 """Set cursor position. Return whether it changed.""" 

447 original_position = self.__cursor_position 

448 self.__cursor_position = max(0, value) 

449 

450 return self.__cursor_position != original_position 

451 

452 @property 

453 def text(self) -> str: 

454 return self._working_lines[self.working_index] 

455 

456 @text.setter 

457 def text(self, value: str) -> None: 

458 """ 

459 Setting text. (When doing this, make sure that the cursor_position is 

460 valid for this text. text/cursor_position should be consistent at any time, 

461 otherwise set a Document instead.) 

462 """ 

463 # Ensure cursor position remains within the size of the text. 

464 if self.cursor_position > len(value): 

465 self.cursor_position = len(value) 

466 

467 # Don't allow editing of read-only buffers. 

468 if self.read_only(): 

469 raise EditReadOnlyBuffer() 

470 

471 changed = self._set_text(value) 

472 

473 if changed: 

474 self._text_changed() 

475 

476 # Reset history search text. 

477 # (Note that this doesn't need to happen when working_index 

478 # changes, which is when we traverse the history. That's why we 

479 # don't do this in `self._text_changed`.) 

480 self.history_search_text = None 

481 

482 @property 

483 def cursor_position(self) -> int: 

484 return self.__cursor_position 

485 

486 @cursor_position.setter 

487 def cursor_position(self, value: int) -> None: 

488 """ 

489 Setting cursor position. 

490 """ 

491 assert isinstance(value, int) 

492 

493 # Ensure cursor position is within the size of the text. 

494 if value > len(self.text): 

495 value = len(self.text) 

496 if value < 0: 

497 value = 0 

498 

499 changed = self._set_cursor_position(value) 

500 

501 if changed: 

502 self._cursor_position_changed() 

503 

504 @property 

505 def working_index(self) -> int: 

506 return self.__working_index 

507 

508 @working_index.setter 

509 def working_index(self, value: int) -> None: 

510 if self.__working_index != value: 

511 self.__working_index = value 

512 # Make sure to reset the cursor position, otherwise we end up in 

513 # situations where the cursor position is out of the bounds of the 

514 # text. 

515 self.cursor_position = 0 

516 self._text_changed() 

517 

518 def _text_changed(self) -> None: 

519 # Remove any validation errors and complete state. 

520 self.validation_error = None 

521 self.validation_state = ValidationState.UNKNOWN 

522 self.complete_state = None 

523 self.yank_nth_arg_state = None 

524 self.document_before_paste = None 

525 self.selection_state = None 

526 self.suggestion = None 

527 self.preferred_column = None 

528 

529 # fire 'on_text_changed' event. 

530 self.on_text_changed.fire() 

531 

532 # Input validation. 

533 # (This happens on all change events, unlike auto completion, also when 

534 # deleting text.) 

535 if self.validator and self.validate_while_typing(): 

536 get_app().create_background_task(self._async_validator()) 

537 

538 def _cursor_position_changed(self) -> None: 

539 # Remove any complete state. 

540 # (Input validation should only be undone when the cursor position 

541 # changes.) 

542 self.complete_state = None 

543 self.yank_nth_arg_state = None 

544 self.document_before_paste = None 

545 

546 # Unset preferred_column. (Will be set after the cursor movement, if 

547 # required.) 

548 self.preferred_column = None 

549 

550 # Note that the cursor position can change if we have a selection the 

551 # new position of the cursor determines the end of the selection. 

552 

553 # fire 'on_cursor_position_changed' event. 

554 self.on_cursor_position_changed.fire() 

555 

556 @property 

557 def document(self) -> Document: 

558 """ 

559 Return :class:`~prompt_toolkit.document.Document` instance from the 

560 current text, cursor position and selection state. 

561 """ 

562 return self._document_cache[ 

563 self.text, self.cursor_position, self.selection_state 

564 ] 

565 

566 @document.setter 

567 def document(self, value: Document) -> None: 

568 """ 

569 Set :class:`~prompt_toolkit.document.Document` instance. 

570 

571 This will set both the text and cursor position at the same time, but 

572 atomically. (Change events will be triggered only after both have been set.) 

573 """ 

574 self.set_document(value) 

575 

576 def set_document(self, value: Document, bypass_readonly: bool = False) -> None: 

577 """ 

578 Set :class:`~prompt_toolkit.document.Document` instance. Like the 

579 ``document`` property, but accept an ``bypass_readonly`` argument. 

580 

581 :param bypass_readonly: When True, don't raise an 

582 :class:`.EditReadOnlyBuffer` exception, even 

583 when the buffer is read-only. 

584 

585 .. warning:: 

586 

587 When this buffer is read-only and `bypass_readonly` was not passed, 

588 the `EditReadOnlyBuffer` exception will be caught by the 

589 `KeyProcessor` and is silently suppressed. This is important to 

590 keep in mind when writing key bindings, because it won't do what 

591 you expect, and there won't be a stack trace. Use try/finally 

592 around this function if you need some cleanup code. 

593 """ 

594 # Don't allow editing of read-only buffers. 

595 if not bypass_readonly and self.read_only(): 

596 raise EditReadOnlyBuffer() 

597 

598 # Set text and cursor position first. 

599 text_changed = self._set_text(value.text) 

600 cursor_position_changed = self._set_cursor_position(value.cursor_position) 

601 

602 # Now handle change events. (We do this when text/cursor position is 

603 # both set and consistent.) 

604 if text_changed: 

605 self._text_changed() 

606 self.history_search_text = None 

607 

608 if cursor_position_changed: 

609 self._cursor_position_changed() 

610 

611 @property 

612 def is_returnable(self) -> bool: 

613 """ 

614 True when there is something handling accept. 

615 """ 

616 return bool(self.accept_handler) 

617 

618 # End of <getters/setters> 

619 

620 def save_to_undo_stack(self, clear_redo_stack: bool = True) -> None: 

621 """ 

622 Safe current state (input text and cursor position), so that we can 

623 restore it by calling undo. 

624 """ 

625 # Safe if the text is different from the text at the top of the stack 

626 # is different. If the text is the same, just update the cursor position. 

627 if self._undo_stack and self._undo_stack[-1][0] == self.text: 

628 self._undo_stack[-1] = (self._undo_stack[-1][0], self.cursor_position) 

629 else: 

630 self._undo_stack.append((self.text, self.cursor_position)) 

631 

632 # Saving anything to the undo stack, clears the redo stack. 

633 if clear_redo_stack: 

634 self._redo_stack = [] 

635 

636 def transform_lines( 

637 self, 

638 line_index_iterator: Iterable[int], 

639 transform_callback: Callable[[str], str], 

640 ) -> str: 

641 """ 

642 Transforms the text on a range of lines. 

643 When the iterator yield an index not in the range of lines that the 

644 document contains, it skips them silently. 

645 

646 To uppercase some lines:: 

647 

648 new_text = transform_lines(range(5,10), lambda text: text.upper()) 

649 

650 :param line_index_iterator: Iterator of line numbers (int) 

651 :param transform_callback: callable that takes the original text of a 

652 line, and return the new text for this line. 

653 

654 :returns: The new text. 

655 """ 

656 # Split lines 

657 lines = self.text.split("\n") 

658 

659 # Apply transformation 

660 for index in line_index_iterator: 

661 try: 

662 lines[index] = transform_callback(lines[index]) 

663 except IndexError: 

664 pass 

665 

666 return "\n".join(lines) 

667 

668 def transform_current_line(self, transform_callback: Callable[[str], str]) -> None: 

669 """ 

670 Apply the given transformation function to the current line. 

671 

672 :param transform_callback: callable that takes a string and return a new string. 

673 """ 

674 document = self.document 

675 a = document.cursor_position + document.get_start_of_line_position() 

676 b = document.cursor_position + document.get_end_of_line_position() 

677 self.text = ( 

678 document.text[:a] 

679 + transform_callback(document.text[a:b]) 

680 + document.text[b:] 

681 ) 

682 

683 def transform_region( 

684 self, from_: int, to: int, transform_callback: Callable[[str], str] 

685 ) -> None: 

686 """ 

687 Transform a part of the input string. 

688 

689 :param from_: (int) start position. 

690 :param to: (int) end position. 

691 :param transform_callback: Callable which accepts a string and returns 

692 the transformed string. 

693 """ 

694 assert from_ < to 

695 

696 self.text = "".join( 

697 [ 

698 self.text[:from_] 

699 + transform_callback(self.text[from_:to]) 

700 + self.text[to:] 

701 ] 

702 ) 

703 

704 def cursor_left(self, count: int = 1) -> None: 

705 self.cursor_position += self.document.get_cursor_left_position(count=count) 

706 

707 def cursor_right(self, count: int = 1) -> None: 

708 self.cursor_position += self.document.get_cursor_right_position(count=count) 

709 

710 def cursor_up(self, count: int = 1) -> None: 

711 """(for multiline edit). Move cursor to the previous line.""" 

712 original_column = self.preferred_column or self.document.cursor_position_col 

713 self.cursor_position += self.document.get_cursor_up_position( 

714 count=count, preferred_column=original_column 

715 ) 

716 

717 # Remember the original column for the next up/down movement. 

718 self.preferred_column = original_column 

719 

720 def cursor_down(self, count: int = 1) -> None: 

721 """(for multiline edit). Move cursor to the next line.""" 

722 original_column = self.preferred_column or self.document.cursor_position_col 

723 self.cursor_position += self.document.get_cursor_down_position( 

724 count=count, preferred_column=original_column 

725 ) 

726 

727 # Remember the original column for the next up/down movement. 

728 self.preferred_column = original_column 

729 

730 def auto_up( 

731 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False 

732 ) -> None: 

733 """ 

734 If we're not on the first line (of a multiline input) go a line up, 

735 otherwise go back in history. (If nothing is selected.) 

736 """ 

737 if self.complete_state: 

738 self.complete_previous(count=count) 

739 elif self.document.cursor_position_row > 0: 

740 self.cursor_up(count=count) 

741 elif not self.selection_state: 

742 self.history_backward(count=count) 

743 

744 # Go to the start of the line? 

745 if go_to_start_of_line_if_history_changes: 

746 self.cursor_position += self.document.get_start_of_line_position() 

747 

748 def auto_down( 

749 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False 

750 ) -> None: 

751 """ 

752 If we're not on the last line (of a multiline input) go a line down, 

753 otherwise go forward in history. (If nothing is selected.) 

754 """ 

755 if self.complete_state: 

756 self.complete_next(count=count) 

757 elif self.document.cursor_position_row < self.document.line_count - 1: 

758 self.cursor_down(count=count) 

759 elif not self.selection_state: 

760 self.history_forward(count=count) 

761 

762 # Go to the start of the line? 

763 if go_to_start_of_line_if_history_changes: 

764 self.cursor_position += self.document.get_start_of_line_position() 

765 

766 def delete_before_cursor(self, count: int = 1) -> str: 

767 """ 

768 Delete specified number of characters before cursor and return the 

769 deleted text. 

770 """ 

771 assert count >= 0 

772 deleted = "" 

773 

774 if self.cursor_position > 0: 

775 deleted = self.text[self.cursor_position - count : self.cursor_position] 

776 

777 new_text = ( 

778 self.text[: self.cursor_position - count] 

779 + self.text[self.cursor_position :] 

780 ) 

781 new_cursor_position = self.cursor_position - len(deleted) 

782 

783 # Set new Document atomically. 

784 self.document = Document(new_text, new_cursor_position) 

785 

786 return deleted 

787 

788 def delete(self, count: int = 1) -> str: 

789 """ 

790 Delete specified number of characters and Return the deleted text. 

791 """ 

792 if self.cursor_position < len(self.text): 

793 deleted = self.document.text_after_cursor[:count] 

794 self.text = ( 

795 self.text[: self.cursor_position] 

796 + self.text[self.cursor_position + len(deleted) :] 

797 ) 

798 return deleted 

799 else: 

800 return "" 

801 

802 def join_next_line(self, separator: str = " ") -> None: 

803 """ 

804 Join the next line to the current one by deleting the line ending after 

805 the current line. 

806 """ 

807 if not self.document.on_last_line: 

808 self.cursor_position += self.document.get_end_of_line_position() 

809 self.delete() 

810 

811 # Remove spaces. 

812 self.text = ( 

813 self.document.text_before_cursor 

814 + separator 

815 + self.document.text_after_cursor.lstrip(" ") 

816 ) 

817 

818 def join_selected_lines(self, separator: str = " ") -> None: 

819 """ 

820 Join the selected lines. 

821 """ 

822 assert self.selection_state 

823 

824 # Get lines. 

825 from_, to = sorted( 

826 [self.cursor_position, self.selection_state.original_cursor_position] 

827 ) 

828 

829 before = self.text[:from_] 

830 lines = self.text[from_:to].splitlines() 

831 after = self.text[to:] 

832 

833 # Replace leading spaces with just one space. 

834 lines = [l.lstrip(" ") + separator for l in lines] 

835 

836 # Set new document. 

837 self.document = Document( 

838 text=before + "".join(lines) + after, 

839 cursor_position=len(before + "".join(lines[:-1])) - 1, 

840 ) 

841 

842 def swap_characters_before_cursor(self) -> None: 

843 """ 

844 Swap the last two characters before the cursor. 

845 """ 

846 pos = self.cursor_position 

847 

848 if pos >= 2: 

849 a = self.text[pos - 2] 

850 b = self.text[pos - 1] 

851 

852 self.text = self.text[: pos - 2] + b + a + self.text[pos:] 

853 

854 def go_to_history(self, index: int) -> None: 

855 """ 

856 Go to this item in the history. 

857 """ 

858 if index < len(self._working_lines): 

859 self.working_index = index 

860 self.cursor_position = len(self.text) 

861 

862 def complete_next(self, count: int = 1, disable_wrap_around: bool = False) -> None: 

863 """ 

864 Browse to the next completions. 

865 (Does nothing if there are no completion.) 

866 """ 

867 index: int | None 

868 

869 if self.complete_state: 

870 completions_count = len(self.complete_state.completions) 

871 

872 if self.complete_state.complete_index is None: 

873 index = 0 

874 elif self.complete_state.complete_index == completions_count - 1: 

875 index = None 

876 

877 if disable_wrap_around: 

878 return 

879 else: 

880 index = min( 

881 completions_count - 1, self.complete_state.complete_index + count 

882 ) 

883 self.go_to_completion(index) 

884 

885 def complete_previous( 

886 self, count: int = 1, disable_wrap_around: bool = False 

887 ) -> None: 

888 """ 

889 Browse to the previous completions. 

890 (Does nothing if there are no completion.) 

891 """ 

892 index: int | None 

893 

894 if self.complete_state: 

895 if self.complete_state.complete_index == 0: 

896 index = None 

897 

898 if disable_wrap_around: 

899 return 

900 elif self.complete_state.complete_index is None: 

901 index = len(self.complete_state.completions) - 1 

902 else: 

903 index = max(0, self.complete_state.complete_index - count) 

904 

905 self.go_to_completion(index) 

906 

907 def cancel_completion(self) -> None: 

908 """ 

909 Cancel completion, go back to the original text. 

910 """ 

911 if self.complete_state: 

912 self.go_to_completion(None) 

913 self.complete_state = None 

914 

915 def _set_completions(self, completions: list[Completion]) -> CompletionState: 

916 """ 

917 Start completions. (Generate list of completions and initialize.) 

918 

919 By default, no completion will be selected. 

920 """ 

921 self.complete_state = CompletionState( 

922 original_document=self.document, completions=completions 

923 ) 

924 

925 # Trigger event. This should eventually invalidate the layout. 

926 self.on_completions_changed.fire() 

927 

928 return self.complete_state 

929 

930 def start_history_lines_completion(self) -> None: 

931 """ 

932 Start a completion based on all the other lines in the document and the 

933 history. 

934 """ 

935 found_completions: set[str] = set() 

936 completions = [] 

937 

938 # For every line of the whole history, find matches with the current line. 

939 current_line = self.document.current_line_before_cursor.lstrip() 

940 

941 for i, string in enumerate(self._working_lines): 

942 for j, l in enumerate(string.split("\n")): 

943 l = l.strip() 

944 if l and l.startswith(current_line): 

945 # When a new line has been found. 

946 if l not in found_completions: 

947 found_completions.add(l) 

948 

949 # Create completion. 

950 if i == self.working_index: 

951 display_meta = "Current, line %s" % (j + 1) 

952 else: 

953 display_meta = f"History {i + 1}, line {j + 1}" 

954 

955 completions.append( 

956 Completion( 

957 text=l, 

958 start_position=-len(current_line), 

959 display_meta=display_meta, 

960 ) 

961 ) 

962 

963 self._set_completions(completions=completions[::-1]) 

964 self.go_to_completion(0) 

965 

966 def go_to_completion(self, index: int | None) -> None: 

967 """ 

968 Select a completion from the list of current completions. 

969 """ 

970 assert self.complete_state 

971 

972 # Set new completion 

973 state = self.complete_state 

974 state.go_to_index(index) 

975 

976 # Set text/cursor position 

977 new_text, new_cursor_position = state.new_text_and_position() 

978 self.document = Document(new_text, new_cursor_position) 

979 

980 # (changing text/cursor position will unset complete_state.) 

981 self.complete_state = state 

982 

983 def apply_completion(self, completion: Completion) -> None: 

984 """ 

985 Insert a given completion. 

986 """ 

987 # If there was already a completion active, cancel that one. 

988 if self.complete_state: 

989 self.go_to_completion(None) 

990 self.complete_state = None 

991 

992 # Insert text from the given completion. 

993 self.delete_before_cursor(-completion.start_position) 

994 self.insert_text(completion.text) 

995 

996 def _set_history_search(self) -> None: 

997 """ 

998 Set `history_search_text`. 

999 (The text before the cursor will be used for filtering the history.) 

1000 """ 

1001 if self.enable_history_search(): 

1002 if self.history_search_text is None: 

1003 self.history_search_text = self.document.text_before_cursor 

1004 else: 

1005 self.history_search_text = None 

1006 

1007 def _history_matches(self, i: int) -> bool: 

1008 """ 

1009 True when the current entry matches the history search. 

1010 (when we don't have history search, it's also True.) 

1011 """ 

1012 return self.history_search_text is None or self._working_lines[i].startswith( 

1013 self.history_search_text 

1014 ) 

1015 

1016 def history_forward(self, count: int = 1) -> None: 

1017 """ 

1018 Move forwards through the history. 

1019 

1020 :param count: Amount of items to move forward. 

1021 """ 

1022 self._set_history_search() 

1023 

1024 # Go forward in history. 

1025 found_something = False 

1026 

1027 for i in range(self.working_index + 1, len(self._working_lines)): 

1028 if self._history_matches(i): 

1029 self.working_index = i 

1030 count -= 1 

1031 found_something = True 

1032 if count == 0: 

1033 break 

1034 

1035 # If we found an entry, move cursor to the end of the first line. 

1036 if found_something: 

1037 self.cursor_position = 0 

1038 self.cursor_position += self.document.get_end_of_line_position() 

1039 

1040 def history_backward(self, count: int = 1) -> None: 

1041 """ 

1042 Move backwards through history. 

1043 """ 

1044 self._set_history_search() 

1045 

1046 # Go back in history. 

1047 found_something = False 

1048 

1049 for i in range(self.working_index - 1, -1, -1): 

1050 if self._history_matches(i): 

1051 self.working_index = i 

1052 count -= 1 

1053 found_something = True 

1054 if count == 0: 

1055 break 

1056 

1057 # If we move to another entry, move cursor to the end of the line. 

1058 if found_something: 

1059 self.cursor_position = len(self.text) 

1060 

1061 def yank_nth_arg(self, n: int | None = None, _yank_last_arg: bool = False) -> None: 

1062 """ 

1063 Pick nth word from previous history entry (depending on current 

1064 `yank_nth_arg_state`) and insert it at current position. Rotate through 

1065 history if called repeatedly. If no `n` has been given, take the first 

1066 argument. (The second word.) 

1067 

1068 :param n: (None or int), The index of the word from the previous line 

1069 to take. 

1070 """ 

1071 assert n is None or isinstance(n, int) 

1072 history_strings = self.history.get_strings() 

1073 

1074 if not len(history_strings): 

1075 return 

1076 

1077 # Make sure we have a `YankNthArgState`. 

1078 if self.yank_nth_arg_state is None: 

1079 state = YankNthArgState(n=-1 if _yank_last_arg else 1) 

1080 else: 

1081 state = self.yank_nth_arg_state 

1082 

1083 if n is not None: 

1084 state.n = n 

1085 

1086 # Get new history position. 

1087 new_pos = state.history_position - 1 

1088 if -new_pos > len(history_strings): 

1089 new_pos = -1 

1090 

1091 # Take argument from line. 

1092 line = history_strings[new_pos] 

1093 

1094 words = [w.strip() for w in _QUOTED_WORDS_RE.split(line)] 

1095 words = [w for w in words if w] 

1096 try: 

1097 word = words[state.n] 

1098 except IndexError: 

1099 word = "" 

1100 

1101 # Insert new argument. 

1102 if state.previous_inserted_word: 

1103 self.delete_before_cursor(len(state.previous_inserted_word)) 

1104 self.insert_text(word) 

1105 

1106 # Save state again for next completion. (Note that the 'insert' 

1107 # operation from above clears `self.yank_nth_arg_state`.) 

1108 state.previous_inserted_word = word 

1109 state.history_position = new_pos 

1110 self.yank_nth_arg_state = state 

1111 

1112 def yank_last_arg(self, n: int | None = None) -> None: 

1113 """ 

1114 Like `yank_nth_arg`, but if no argument has been given, yank the last 

1115 word by default. 

1116 """ 

1117 self.yank_nth_arg(n=n, _yank_last_arg=True) 

1118 

1119 def start_selection( 

1120 self, selection_type: SelectionType = SelectionType.CHARACTERS 

1121 ) -> None: 

1122 """ 

1123 Take the current cursor position as the start of this selection. 

1124 """ 

1125 self.selection_state = SelectionState(self.cursor_position, selection_type) 

1126 

1127 def copy_selection(self, _cut: bool = False) -> ClipboardData: 

1128 """ 

1129 Copy selected text and return :class:`.ClipboardData` instance. 

1130 

1131 Notice that this doesn't store the copied data on the clipboard yet. 

1132 You can store it like this: 

1133 

1134 .. code:: python 

1135 

1136 data = buffer.copy_selection() 

1137 get_app().clipboard.set_data(data) 

1138 """ 

1139 new_document, clipboard_data = self.document.cut_selection() 

1140 if _cut: 

1141 self.document = new_document 

1142 

1143 self.selection_state = None 

1144 return clipboard_data 

1145 

1146 def cut_selection(self) -> ClipboardData: 

1147 """ 

1148 Delete selected text and return :class:`.ClipboardData` instance. 

1149 """ 

1150 return self.copy_selection(_cut=True) 

1151 

1152 def paste_clipboard_data( 

1153 self, 

1154 data: ClipboardData, 

1155 paste_mode: PasteMode = PasteMode.EMACS, 

1156 count: int = 1, 

1157 ) -> None: 

1158 """ 

1159 Insert the data from the clipboard. 

1160 """ 

1161 assert isinstance(data, ClipboardData) 

1162 assert paste_mode in (PasteMode.VI_BEFORE, PasteMode.VI_AFTER, PasteMode.EMACS) 

1163 

1164 original_document = self.document 

1165 self.document = self.document.paste_clipboard_data( 

1166 data, paste_mode=paste_mode, count=count 

1167 ) 

1168 

1169 # Remember original document. This assignment should come at the end, 

1170 # because assigning to 'document' will erase it. 

1171 self.document_before_paste = original_document 

1172 

1173 def newline(self, copy_margin: bool = True) -> None: 

1174 """ 

1175 Insert a line ending at the current position. 

1176 """ 

1177 if copy_margin: 

1178 self.insert_text("\n" + self.document.leading_whitespace_in_current_line) 

1179 else: 

1180 self.insert_text("\n") 

1181 

1182 def insert_line_above(self, copy_margin: bool = True) -> None: 

1183 """ 

1184 Insert a new line above the current one. 

1185 """ 

1186 if copy_margin: 

1187 insert = self.document.leading_whitespace_in_current_line + "\n" 

1188 else: 

1189 insert = "\n" 

1190 

1191 self.cursor_position += self.document.get_start_of_line_position() 

1192 self.insert_text(insert) 

1193 self.cursor_position -= 1 

1194 

1195 def insert_line_below(self, copy_margin: bool = True) -> None: 

1196 """ 

1197 Insert a new line below the current one. 

1198 """ 

1199 if copy_margin: 

1200 insert = "\n" + self.document.leading_whitespace_in_current_line 

1201 else: 

1202 insert = "\n" 

1203 

1204 self.cursor_position += self.document.get_end_of_line_position() 

1205 self.insert_text(insert) 

1206 

1207 def insert_text( 

1208 self, 

1209 data: str, 

1210 overwrite: bool = False, 

1211 move_cursor: bool = True, 

1212 fire_event: bool = True, 

1213 ) -> None: 

1214 """ 

1215 Insert characters at cursor position. 

1216 

1217 :param fire_event: Fire `on_text_insert` event. This is mainly used to 

1218 trigger autocompletion while typing. 

1219 """ 

1220 # Original text & cursor position. 

1221 otext = self.text 

1222 ocpos = self.cursor_position 

1223 

1224 # In insert/text mode. 

1225 if overwrite: 

1226 # Don't overwrite the newline itself. Just before the line ending, 

1227 # it should act like insert mode. 

1228 overwritten_text = otext[ocpos : ocpos + len(data)] 

1229 if "\n" in overwritten_text: 

1230 overwritten_text = overwritten_text[: overwritten_text.find("\n")] 

1231 

1232 text = otext[:ocpos] + data + otext[ocpos + len(overwritten_text) :] 

1233 else: 

1234 text = otext[:ocpos] + data + otext[ocpos:] 

1235 

1236 if move_cursor: 

1237 cpos = self.cursor_position + len(data) 

1238 else: 

1239 cpos = self.cursor_position 

1240 

1241 # Set new document. 

1242 # (Set text and cursor position at the same time. Otherwise, setting 

1243 # the text will fire a change event before the cursor position has been 

1244 # set. It works better to have this atomic.) 

1245 self.document = Document(text, cpos) 

1246 

1247 # Fire 'on_text_insert' event. 

1248 if fire_event: # XXX: rename to `start_complete`. 

1249 self.on_text_insert.fire() 

1250 

1251 # Only complete when "complete_while_typing" is enabled. 

1252 if self.completer and self.complete_while_typing(): 

1253 get_app().create_background_task(self._async_completer()) 

1254 

1255 # Call auto_suggest. 

1256 if self.auto_suggest: 

1257 get_app().create_background_task(self._async_suggester()) 

1258 

1259 def undo(self) -> None: 

1260 # Pop from the undo-stack until we find a text that if different from 

1261 # the current text. (The current logic of `save_to_undo_stack` will 

1262 # cause that the top of the undo stack is usually the same as the 

1263 # current text, so in that case we have to pop twice.) 

1264 while self._undo_stack: 

1265 text, pos = self._undo_stack.pop() 

1266 

1267 if text != self.text: 

1268 # Push current text to redo stack. 

1269 self._redo_stack.append((self.text, self.cursor_position)) 

1270 

1271 # Set new text/cursor_position. 

1272 self.document = Document(text, cursor_position=pos) 

1273 break 

1274 

1275 def redo(self) -> None: 

1276 if self._redo_stack: 

1277 # Copy current state on undo stack. 

1278 self.save_to_undo_stack(clear_redo_stack=False) 

1279 

1280 # Pop state from redo stack. 

1281 text, pos = self._redo_stack.pop() 

1282 self.document = Document(text, cursor_position=pos) 

1283 

1284 def validate(self, set_cursor: bool = False) -> bool: 

1285 """ 

1286 Returns `True` if valid. 

1287 

1288 :param set_cursor: Set the cursor position, if an error was found. 

1289 """ 

1290 # Don't call the validator again, if it was already called for the 

1291 # current input. 

1292 if self.validation_state != ValidationState.UNKNOWN: 

1293 return self.validation_state == ValidationState.VALID 

1294 

1295 # Call validator. 

1296 if self.validator: 

1297 try: 

1298 self.validator.validate(self.document) 

1299 except ValidationError as e: 

1300 # Set cursor position (don't allow invalid values.) 

1301 if set_cursor: 

1302 self.cursor_position = min( 

1303 max(0, e.cursor_position), len(self.text) 

1304 ) 

1305 

1306 self.validation_state = ValidationState.INVALID 

1307 self.validation_error = e 

1308 return False 

1309 

1310 # Handle validation result. 

1311 self.validation_state = ValidationState.VALID 

1312 self.validation_error = None 

1313 return True 

1314 

1315 async def _validate_async(self) -> None: 

1316 """ 

1317 Asynchronous version of `validate()`. 

1318 This one doesn't set the cursor position. 

1319 

1320 We have both variants, because a synchronous version is required. 

1321 Handling the ENTER key needs to be completely synchronous, otherwise 

1322 stuff like type-ahead is going to give very weird results. (People 

1323 could type input while the ENTER key is still processed.) 

1324 

1325 An asynchronous version is required if we have `validate_while_typing` 

1326 enabled. 

1327 """ 

1328 while True: 

1329 # Don't call the validator again, if it was already called for the 

1330 # current input. 

1331 if self.validation_state != ValidationState.UNKNOWN: 

1332 return 

1333 

1334 # Call validator. 

1335 error = None 

1336 document = self.document 

1337 

1338 if self.validator: 

1339 try: 

1340 await self.validator.validate_async(self.document) 

1341 except ValidationError as e: 

1342 error = e 

1343 

1344 # If the document changed during the validation, try again. 

1345 if self.document != document: 

1346 continue 

1347 

1348 # Handle validation result. 

1349 if error: 

1350 self.validation_state = ValidationState.INVALID 

1351 else: 

1352 self.validation_state = ValidationState.VALID 

1353 

1354 self.validation_error = error 

1355 get_app().invalidate() # Trigger redraw (display error). 

1356 

1357 def append_to_history(self) -> None: 

1358 """ 

1359 Append the current input to the history. 

1360 """ 

1361 # Save at the tail of the history. (But don't if the last entry the 

1362 # history is already the same.) 

1363 if self.text: 

1364 history_strings = self.history.get_strings() 

1365 if not len(history_strings) or history_strings[-1] != self.text: 

1366 self.history.append_string(self.text) 

1367 

1368 def _search( 

1369 self, 

1370 search_state: SearchState, 

1371 include_current_position: bool = False, 

1372 count: int = 1, 

1373 ) -> tuple[int, int] | None: 

1374 """ 

1375 Execute search. Return (working_index, cursor_position) tuple when this 

1376 search is applied. Returns `None` when this text cannot be found. 

1377 """ 

1378 assert count > 0 

1379 

1380 text = search_state.text 

1381 direction = search_state.direction 

1382 ignore_case = search_state.ignore_case() 

1383 

1384 def search_once( 

1385 working_index: int, document: Document 

1386 ) -> tuple[int, Document] | None: 

1387 """ 

1388 Do search one time. 

1389 Return (working_index, document) or `None` 

1390 """ 

1391 if direction == SearchDirection.FORWARD: 

1392 # Try find at the current input. 

1393 new_index = document.find( 

1394 text, 

1395 include_current_position=include_current_position, 

1396 ignore_case=ignore_case, 

1397 ) 

1398 

1399 if new_index is not None: 

1400 return ( 

1401 working_index, 

1402 Document(document.text, document.cursor_position + new_index), 

1403 ) 

1404 else: 

1405 # No match, go forward in the history. (Include len+1 to wrap around.) 

1406 # (Here we should always include all cursor positions, because 

1407 # it's a different line.) 

1408 for i in range(working_index + 1, len(self._working_lines) + 1): 

1409 i %= len(self._working_lines) 

1410 

1411 document = Document(self._working_lines[i], 0) 

1412 new_index = document.find( 

1413 text, include_current_position=True, ignore_case=ignore_case 

1414 ) 

1415 if new_index is not None: 

1416 return (i, Document(document.text, new_index)) 

1417 else: 

1418 # Try find at the current input. 

1419 new_index = document.find_backwards(text, ignore_case=ignore_case) 

1420 

1421 if new_index is not None: 

1422 return ( 

1423 working_index, 

1424 Document(document.text, document.cursor_position + new_index), 

1425 ) 

1426 else: 

1427 # No match, go back in the history. (Include -1 to wrap around.) 

1428 for i in range(working_index - 1, -2, -1): 

1429 i %= len(self._working_lines) 

1430 

1431 document = Document( 

1432 self._working_lines[i], len(self._working_lines[i]) 

1433 ) 

1434 new_index = document.find_backwards( 

1435 text, ignore_case=ignore_case 

1436 ) 

1437 if new_index is not None: 

1438 return ( 

1439 i, 

1440 Document(document.text, len(document.text) + new_index), 

1441 ) 

1442 return None 

1443 

1444 # Do 'count' search iterations. 

1445 working_index = self.working_index 

1446 document = self.document 

1447 for _ in range(count): 

1448 result = search_once(working_index, document) 

1449 if result is None: 

1450 return None # Nothing found. 

1451 else: 

1452 working_index, document = result 

1453 

1454 return (working_index, document.cursor_position) 

1455 

1456 def document_for_search(self, search_state: SearchState) -> Document: 

1457 """ 

1458 Return a :class:`~prompt_toolkit.document.Document` instance that has 

1459 the text/cursor position for this search, if we would apply it. This 

1460 will be used in the 

1461 :class:`~prompt_toolkit.layout.BufferControl` to display feedback while 

1462 searching. 

1463 """ 

1464 search_result = self._search(search_state, include_current_position=True) 

1465 

1466 if search_result is None: 

1467 return self.document 

1468 else: 

1469 working_index, cursor_position = search_result 

1470 

1471 # Keep selection, when `working_index` was not changed. 

1472 if working_index == self.working_index: 

1473 selection = self.selection_state 

1474 else: 

1475 selection = None 

1476 

1477 return Document( 

1478 self._working_lines[working_index], cursor_position, selection=selection 

1479 ) 

1480 

1481 def get_search_position( 

1482 self, 

1483 search_state: SearchState, 

1484 include_current_position: bool = True, 

1485 count: int = 1, 

1486 ) -> int: 

1487 """ 

1488 Get the cursor position for this search. 

1489 (This operation won't change the `working_index`. It's won't go through 

1490 the history. Vi text objects can't span multiple items.) 

1491 """ 

1492 search_result = self._search( 

1493 search_state, include_current_position=include_current_position, count=count 

1494 ) 

1495 

1496 if search_result is None: 

1497 return self.cursor_position 

1498 else: 

1499 working_index, cursor_position = search_result 

1500 return cursor_position 

1501 

1502 def apply_search( 

1503 self, 

1504 search_state: SearchState, 

1505 include_current_position: bool = True, 

1506 count: int = 1, 

1507 ) -> None: 

1508 """ 

1509 Apply search. If something is found, set `working_index` and 

1510 `cursor_position`. 

1511 """ 

1512 search_result = self._search( 

1513 search_state, include_current_position=include_current_position, count=count 

1514 ) 

1515 

1516 if search_result is not None: 

1517 working_index, cursor_position = search_result 

1518 self.working_index = working_index 

1519 self.cursor_position = cursor_position 

1520 

1521 def exit_selection(self) -> None: 

1522 self.selection_state = None 

1523 

1524 def _editor_simple_tempfile(self) -> tuple[str, Callable[[], None]]: 

1525 """ 

1526 Simple (file) tempfile implementation. 

1527 Return (tempfile, cleanup_func). 

1528 """ 

1529 suffix = to_str(self.tempfile_suffix) 

1530 descriptor, filename = tempfile.mkstemp(suffix) 

1531 

1532 os.write(descriptor, self.text.encode("utf-8")) 

1533 os.close(descriptor) 

1534 

1535 def cleanup() -> None: 

1536 os.unlink(filename) 

1537 

1538 return filename, cleanup 

1539 

1540 def _editor_complex_tempfile(self) -> tuple[str, Callable[[], None]]: 

1541 # Complex (directory) tempfile implementation. 

1542 headtail = to_str(self.tempfile) 

1543 if not headtail: 

1544 # Revert to simple case. 

1545 return self._editor_simple_tempfile() 

1546 headtail = str(headtail) 

1547 

1548 # Try to make according to tempfile logic. 

1549 head, tail = os.path.split(headtail) 

1550 if os.path.isabs(head): 

1551 head = head[1:] 

1552 

1553 dirpath = tempfile.mkdtemp() 

1554 if head: 

1555 dirpath = os.path.join(dirpath, head) 

1556 # Assume there is no issue creating dirs in this temp dir. 

1557 os.makedirs(dirpath) 

1558 

1559 # Open the filename and write current text. 

1560 filename = os.path.join(dirpath, tail) 

1561 with open(filename, "w", encoding="utf-8") as fh: 

1562 fh.write(self.text) 

1563 

1564 def cleanup() -> None: 

1565 shutil.rmtree(dirpath) 

1566 

1567 return filename, cleanup 

1568 

1569 def open_in_editor(self, validate_and_handle: bool = False) -> asyncio.Task[None]: 

1570 """ 

1571 Open code in editor. 

1572 

1573 This returns a future, and runs in a thread executor. 

1574 """ 

1575 if self.read_only(): 

1576 raise EditReadOnlyBuffer() 

1577 

1578 # Write current text to temporary file 

1579 if self.tempfile: 

1580 filename, cleanup_func = self._editor_complex_tempfile() 

1581 else: 

1582 filename, cleanup_func = self._editor_simple_tempfile() 

1583 

1584 async def run() -> None: 

1585 try: 

1586 # Open in editor 

1587 # (We need to use `run_in_terminal`, because not all editors go to 

1588 # the alternate screen buffer, and some could influence the cursor 

1589 # position.) 

1590 success = await run_in_terminal( 

1591 lambda: self._open_file_in_editor(filename), in_executor=True 

1592 ) 

1593 

1594 # Read content again. 

1595 if success: 

1596 with open(filename, "rb") as f: 

1597 text = f.read().decode("utf-8") 

1598 

1599 # Drop trailing newline. (Editors are supposed to add it at the 

1600 # end, but we don't need it.) 

1601 if text.endswith("\n"): 

1602 text = text[:-1] 

1603 

1604 self.document = Document(text=text, cursor_position=len(text)) 

1605 

1606 # Accept the input. 

1607 if validate_and_handle: 

1608 self.validate_and_handle() 

1609 

1610 finally: 

1611 # Clean up temp dir/file. 

1612 cleanup_func() 

1613 

1614 return get_app().create_background_task(run()) 

1615 

1616 def _open_file_in_editor(self, filename: str) -> bool: 

1617 """ 

1618 Call editor executable. 

1619 

1620 Return True when we received a zero return code. 

1621 """ 

1622 # If the 'VISUAL' or 'EDITOR' environment variable has been set, use that. 

1623 # Otherwise, fall back to the first available editor that we can find. 

1624 visual = os.environ.get("VISUAL") 

1625 editor = os.environ.get("EDITOR") 

1626 

1627 editors = [ 

1628 visual, 

1629 editor, 

1630 # Order of preference. 

1631 "/usr/bin/editor", 

1632 "/usr/bin/nano", 

1633 "/usr/bin/pico", 

1634 "/usr/bin/vi", 

1635 "/usr/bin/emacs", 

1636 ] 

1637 

1638 for e in editors: 

1639 if e: 

1640 try: 

1641 # Use 'shlex.split()', because $VISUAL can contain spaces 

1642 # and quotes. 

1643 returncode = subprocess.call(shlex.split(e) + [filename]) 

1644 return returncode == 0 

1645 

1646 except OSError: 

1647 # Executable does not exist, try the next one. 

1648 pass 

1649 

1650 return False 

1651 

1652 def start_completion( 

1653 self, 

1654 select_first: bool = False, 

1655 select_last: bool = False, 

1656 insert_common_part: bool = False, 

1657 complete_event: CompleteEvent | None = None, 

1658 ) -> None: 

1659 """ 

1660 Start asynchronous autocompletion of this buffer. 

1661 (This will do nothing if a previous completion was still in progress.) 

1662 """ 

1663 # Only one of these options can be selected. 

1664 assert select_first + select_last + insert_common_part <= 1 

1665 

1666 get_app().create_background_task( 

1667 self._async_completer( 

1668 select_first=select_first, 

1669 select_last=select_last, 

1670 insert_common_part=insert_common_part, 

1671 complete_event=complete_event 

1672 or CompleteEvent(completion_requested=True), 

1673 ) 

1674 ) 

1675 

1676 def _create_completer_coroutine(self) -> Callable[..., Coroutine[Any, Any, None]]: 

1677 """ 

1678 Create function for asynchronous autocompletion. 

1679 

1680 (This consumes the asynchronous completer generator, which possibly 

1681 runs the completion algorithm in another thread.) 

1682 """ 

1683 

1684 def completion_does_nothing(document: Document, completion: Completion) -> bool: 

1685 """ 

1686 Return `True` if applying this completion doesn't have any effect. 

1687 (When it doesn't insert any new text. 

1688 """ 

1689 text_before_cursor = document.text_before_cursor 

1690 replaced_text = text_before_cursor[ 

1691 len(text_before_cursor) + completion.start_position : 

1692 ] 

1693 return replaced_text == completion.text 

1694 

1695 @_only_one_at_a_time 

1696 async def async_completer( 

1697 select_first: bool = False, 

1698 select_last: bool = False, 

1699 insert_common_part: bool = False, 

1700 complete_event: CompleteEvent | None = None, 

1701 ) -> None: 

1702 document = self.document 

1703 complete_event = complete_event or CompleteEvent(text_inserted=True) 

1704 

1705 # Don't complete when we already have completions. 

1706 if self.complete_state or not self.completer: 

1707 return 

1708 

1709 # Create an empty CompletionState. 

1710 complete_state = CompletionState(original_document=self.document) 

1711 self.complete_state = complete_state 

1712 

1713 def proceed() -> bool: 

1714 """Keep retrieving completions. Input text has not yet changed 

1715 while generating completions.""" 

1716 return self.complete_state == complete_state 

1717 

1718 refresh_needed = asyncio.Event() 

1719 

1720 async def refresh_while_loading() -> None: 

1721 """Background loop to refresh the UI at most 3 times a second 

1722 while the completion are loading. Calling 

1723 `on_completions_changed.fire()` for every completion that we 

1724 receive is too expensive when there are many completions. (We 

1725 could tune `Application.max_render_postpone_time` and 

1726 `Application.min_redraw_interval`, but having this here is a 

1727 better approach.) 

1728 """ 

1729 while True: 

1730 self.on_completions_changed.fire() 

1731 refresh_needed.clear() 

1732 await asyncio.sleep(0.3) 

1733 await refresh_needed.wait() 

1734 

1735 refresh_task = asyncio.ensure_future(refresh_while_loading()) 

1736 try: 

1737 # Load. 

1738 async with aclosing( 

1739 self.completer.get_completions_async(document, complete_event) 

1740 ) as async_generator: 

1741 async for completion in async_generator: 

1742 complete_state.completions.append(completion) 

1743 refresh_needed.set() 

1744 

1745 # If the input text changes, abort. 

1746 if not proceed(): 

1747 break 

1748 

1749 # Always stop at 10k completions. 

1750 if ( 

1751 len(complete_state.completions) 

1752 >= self.max_number_of_completions 

1753 ): 

1754 break 

1755 finally: 

1756 refresh_task.cancel() 

1757 

1758 # Refresh one final time after we got everything. 

1759 self.on_completions_changed.fire() 

1760 

1761 completions = complete_state.completions 

1762 

1763 # When there is only one completion, which has nothing to add, ignore it. 

1764 if len(completions) == 1 and completion_does_nothing( 

1765 document, completions[0] 

1766 ): 

1767 del completions[:] 

1768 

1769 # Set completions if the text was not yet changed. 

1770 if proceed(): 

1771 # When no completions were found, or when the user selected 

1772 # already a completion by using the arrow keys, don't do anything. 

1773 if ( 

1774 not self.complete_state 

1775 or self.complete_state.complete_index is not None 

1776 ): 

1777 return 

1778 

1779 # When there are no completions, reset completion state anyway. 

1780 if not completions: 

1781 self.complete_state = None 

1782 # Render the ui if the completion menu was shown 

1783 # it is needed especially if there is one completion and it was deleted. 

1784 self.on_completions_changed.fire() 

1785 return 

1786 

1787 # Select first/last or insert common part, depending on the key 

1788 # binding. (For this we have to wait until all completions are 

1789 # loaded.) 

1790 

1791 if select_first: 

1792 self.go_to_completion(0) 

1793 

1794 elif select_last: 

1795 self.go_to_completion(len(completions) - 1) 

1796 

1797 elif insert_common_part: 

1798 common_part = get_common_complete_suffix(document, completions) 

1799 if common_part: 

1800 # Insert the common part, update completions. 

1801 self.insert_text(common_part) 

1802 if len(completions) > 1: 

1803 # (Don't call `async_completer` again, but 

1804 # recalculate completions. See: 

1805 # https://github.com/ipython/ipython/issues/9658) 

1806 completions[:] = [ 

1807 c.new_completion_from_position(len(common_part)) 

1808 for c in completions 

1809 ] 

1810 

1811 self._set_completions(completions=completions) 

1812 else: 

1813 self.complete_state = None 

1814 else: 

1815 # When we were asked to insert the "common" 

1816 # prefix, but there was no common suffix but 

1817 # still exactly one match, then select the 

1818 # first. (It could be that we have a completion 

1819 # which does * expansion, like '*.py', with 

1820 # exactly one match.) 

1821 if len(completions) == 1: 

1822 self.go_to_completion(0) 

1823 

1824 else: 

1825 # If the last operation was an insert, (not a delete), restart 

1826 # the completion coroutine. 

1827 

1828 if self.document.text_before_cursor == document.text_before_cursor: 

1829 return # Nothing changed. 

1830 

1831 if self.document.text_before_cursor.startswith( 

1832 document.text_before_cursor 

1833 ): 

1834 raise _Retry 

1835 

1836 return async_completer 

1837 

1838 def _create_auto_suggest_coroutine(self) -> Callable[[], Coroutine[Any, Any, None]]: 

1839 """ 

1840 Create function for asynchronous auto suggestion. 

1841 (This can be in another thread.) 

1842 """ 

1843 

1844 @_only_one_at_a_time 

1845 async def async_suggestor() -> None: 

1846 document = self.document 

1847 

1848 # Don't suggest when we already have a suggestion. 

1849 if self.suggestion or not self.auto_suggest: 

1850 return 

1851 

1852 suggestion = await self.auto_suggest.get_suggestion_async(self, document) 

1853 

1854 # Set suggestion only if the text was not yet changed. 

1855 if self.document == document: 

1856 # Set suggestion and redraw interface. 

1857 self.suggestion = suggestion 

1858 self.on_suggestion_set.fire() 

1859 else: 

1860 # Otherwise, restart thread. 

1861 raise _Retry 

1862 

1863 return async_suggestor 

1864 

1865 def _create_auto_validate_coroutine( 

1866 self, 

1867 ) -> Callable[[], Coroutine[Any, Any, None]]: 

1868 """ 

1869 Create a function for asynchronous validation while typing. 

1870 (This can be in another thread.) 

1871 """ 

1872 

1873 @_only_one_at_a_time 

1874 async def async_validator() -> None: 

1875 await self._validate_async() 

1876 

1877 return async_validator 

1878 

1879 def validate_and_handle(self) -> None: 

1880 """ 

1881 Validate buffer and handle the accept action. 

1882 """ 

1883 valid = self.validate(set_cursor=True) 

1884 

1885 # When the validation succeeded, accept the input. 

1886 if valid: 

1887 if self.accept_handler: 

1888 keep_text = self.accept_handler(self) 

1889 else: 

1890 keep_text = False 

1891 

1892 self.append_to_history() 

1893 

1894 if not keep_text: 

1895 self.reset() 

1896 

1897 

1898_T = TypeVar("_T", bound=Callable[..., Coroutine[Any, Any, None]]) 

1899 

1900 

1901def _only_one_at_a_time(coroutine: _T) -> _T: 

1902 """ 

1903 Decorator that only starts the coroutine only if the previous call has 

1904 finished. (Used to make sure that we have only one autocompleter, auto 

1905 suggestor and validator running at a time.) 

1906 

1907 When the coroutine raises `_Retry`, it is restarted. 

1908 """ 

1909 running = False 

1910 

1911 @wraps(coroutine) 

1912 async def new_coroutine(*a: Any, **kw: Any) -> Any: 

1913 nonlocal running 

1914 

1915 # Don't start a new function, if the previous is still in progress. 

1916 if running: 

1917 return 

1918 

1919 running = True 

1920 

1921 try: 

1922 while True: 

1923 try: 

1924 await coroutine(*a, **kw) 

1925 except _Retry: 

1926 continue 

1927 else: 

1928 return None 

1929 finally: 

1930 running = False 

1931 

1932 return cast(_T, new_coroutine) 

1933 

1934 

1935class _Retry(Exception): 

1936 "Retry in `_only_one_at_a_time`." 

1937 

1938 

1939def indent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: 

1940 """ 

1941 Indent text of a :class:`.Buffer` object. 

1942 """ 

1943 current_row = buffer.document.cursor_position_row 

1944 current_col = buffer.document.cursor_position_col 

1945 line_range = range(from_row, to_row) 

1946 

1947 # Apply transformation. 

1948 indent_content = " " * count 

1949 new_text = buffer.transform_lines(line_range, lambda l: indent_content + l) 

1950 buffer.document = Document( 

1951 new_text, Document(new_text).translate_row_col_to_index(current_row, 0) 

1952 ) 

1953 

1954 # Place cursor in the same position in text after indenting 

1955 buffer.cursor_position += current_col + len(indent_content) 

1956 

1957 

1958def unindent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: 

1959 """ 

1960 Unindent text of a :class:`.Buffer` object. 

1961 """ 

1962 current_row = buffer.document.cursor_position_row 

1963 current_col = buffer.document.cursor_position_col 

1964 line_range = range(from_row, to_row) 

1965 

1966 indent_content = " " * count 

1967 

1968 def transform(text: str) -> str: 

1969 remove = indent_content 

1970 if text.startswith(remove): 

1971 return text[len(remove) :] 

1972 else: 

1973 return text.lstrip() 

1974 

1975 # Apply transformation. 

1976 new_text = buffer.transform_lines(line_range, transform) 

1977 buffer.document = Document( 

1978 new_text, Document(new_text).translate_row_col_to_index(current_row, 0) 

1979 ) 

1980 

1981 # Place cursor in the same position in text after dedent 

1982 buffer.cursor_position += current_col - len(indent_content) 

1983 

1984 

1985def reshape_text(buffer: Buffer, from_row: int, to_row: int) -> None: 

1986 """ 

1987 Reformat text, taking the width into account. 

1988 `to_row` is included. 

1989 (Vi 'gq' operator.) 

1990 """ 

1991 lines = buffer.text.splitlines(True) 

1992 lines_before = lines[:from_row] 

1993 lines_after = lines[to_row + 1 :] 

1994 lines_to_reformat = lines[from_row : to_row + 1] 

1995 

1996 if lines_to_reformat: 

1997 # Take indentation from the first line. 

1998 match = re.search(r"^\s*", lines_to_reformat[0]) 

1999 length = match.end() if match else 0 # `match` can't be None, actually. 

2000 

2001 indent = lines_to_reformat[0][:length].replace("\n", "") 

2002 

2003 # Now, take all the 'words' from the lines to be reshaped. 

2004 words = "".join(lines_to_reformat).split() 

2005 

2006 # And reshape. 

2007 width = (buffer.text_width or 80) - len(indent) 

2008 reshaped_text = [indent] 

2009 current_width = 0 

2010 for w in words: 

2011 if current_width: 

2012 if len(w) + current_width + 1 > width: 

2013 reshaped_text.append("\n") 

2014 reshaped_text.append(indent) 

2015 current_width = 0 

2016 else: 

2017 reshaped_text.append(" ") 

2018 current_width += 1 

2019 

2020 reshaped_text.append(w) 

2021 current_width += len(w) 

2022 

2023 if reshaped_text[-1] != "\n": 

2024 reshaped_text.append("\n") 

2025 

2026 # Apply result. 

2027 buffer.document = Document( 

2028 text="".join(lines_before + reshaped_text + lines_after), 

2029 cursor_position=len("".join(lines_before + reshaped_text)), 

2030 )