Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/prompt_toolkit/buffer.py: 17%

807 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:05 +0000

1""" 

2Data structures for the Buffer. 

3It holds the text, cursor position, history, etc... 

4""" 

5from __future__ import annotations 

6 

7import asyncio 

8import logging 

9import os 

10import re 

11import shlex 

12import shutil 

13import subprocess 

14import tempfile 

15from collections import deque 

16from enum import Enum 

17from functools import wraps 

18from typing import Any, Awaitable, Callable, Coroutine, Deque, Iterable, TypeVar, cast 

19 

20from .application.current import get_app 

21from .application.run_in_terminal import run_in_terminal 

22from .auto_suggest import AutoSuggest, Suggestion 

23from .cache import FastDictCache 

24from .clipboard import ClipboardData 

25from .completion import ( 

26 CompleteEvent, 

27 Completer, 

28 Completion, 

29 DummyCompleter, 

30 get_common_complete_suffix, 

31) 

32from .document import Document 

33from .eventloop import aclosing 

34from .filters import FilterOrBool, to_filter 

35from .history import History, InMemoryHistory 

36from .search import SearchDirection, SearchState 

37from .selection import PasteMode, SelectionState, SelectionType 

38from .utils import Event, to_str 

39from .validation import ValidationError, Validator 

40 

41__all__ = [ 

42 "EditReadOnlyBuffer", 

43 "Buffer", 

44 "CompletionState", 

45 "indent", 

46 "unindent", 

47 "reshape_text", 

48] 

49 

50logger = logging.getLogger(__name__) 

51 

52 

53class EditReadOnlyBuffer(Exception): 

54 "Attempt editing of read-only :class:`.Buffer`." 

55 

56 

57class ValidationState(Enum): 

58 "The validation state of a buffer. This is set after the validation." 

59 VALID = "VALID" 

60 INVALID = "INVALID" 

61 UNKNOWN = "UNKNOWN" 

62 

63 

64class CompletionState: 

65 """ 

66 Immutable class that contains a completion state. 

67 """ 

68 

69 def __init__( 

70 self, 

71 original_document: Document, 

72 completions: list[Completion] | None = None, 

73 complete_index: int | None = None, 

74 ) -> None: 

75 #: Document as it was when the completion started. 

76 self.original_document = original_document 

77 

78 #: List of all the current Completion instances which are possible at 

79 #: this point. 

80 self.completions = completions or [] 

81 

82 #: Position in the `completions` array. 

83 #: This can be `None` to indicate "no completion", the original text. 

84 self.complete_index = complete_index # Position in the `_completions` array. 

85 

86 def __repr__(self) -> str: 

87 return "{}({!r}, <{!r}> completions, index={!r})".format( 

88 self.__class__.__name__, 

89 self.original_document, 

90 len(self.completions), 

91 self.complete_index, 

92 ) 

93 

94 def go_to_index(self, index: int | None) -> None: 

95 """ 

96 Create a new :class:`.CompletionState` object with the new index. 

97 

98 When `index` is `None` deselect the completion. 

99 """ 

100 if self.completions: 

101 assert index is None or 0 <= index < len(self.completions) 

102 self.complete_index = index 

103 

104 def new_text_and_position(self) -> tuple[str, int]: 

105 """ 

106 Return (new_text, new_cursor_position) for this completion. 

107 """ 

108 if self.complete_index is None: 

109 return self.original_document.text, self.original_document.cursor_position 

110 else: 

111 original_text_before_cursor = self.original_document.text_before_cursor 

112 original_text_after_cursor = self.original_document.text_after_cursor 

113 

114 c = self.completions[self.complete_index] 

115 if c.start_position == 0: 

116 before = original_text_before_cursor 

117 else: 

118 before = original_text_before_cursor[: c.start_position] 

119 

120 new_text = before + c.text + original_text_after_cursor 

121 new_cursor_position = len(before) + len(c.text) 

122 return new_text, new_cursor_position 

123 

124 @property 

125 def current_completion(self) -> Completion | None: 

126 """ 

127 Return the current completion, or return `None` when no completion is 

128 selected. 

129 """ 

130 if self.complete_index is not None: 

131 return self.completions[self.complete_index] 

132 return None 

133 

134 

135_QUOTED_WORDS_RE = re.compile(r"""(\s+|".*?"|'.*?')""") 

136 

137 

138class YankNthArgState: 

139 """ 

140 For yank-last-arg/yank-nth-arg: Keep track of where we are in the history. 

141 """ 

142 

143 def __init__( 

144 self, history_position: int = 0, n: int = -1, previous_inserted_word: str = "" 

145 ) -> None: 

146 self.history_position = history_position 

147 self.previous_inserted_word = previous_inserted_word 

148 self.n = n 

149 

150 def __repr__(self) -> str: 

151 return "{}(history_position={!r}, n={!r}, previous_inserted_word={!r})".format( 

152 self.__class__.__name__, 

153 self.history_position, 

154 self.n, 

155 self.previous_inserted_word, 

156 ) 

157 

158 

159BufferEventHandler = Callable[["Buffer"], None] 

160BufferAcceptHandler = Callable[["Buffer"], bool] 

161 

162 

163class Buffer: 

164 """ 

165 The core data structure that holds the text and cursor position of the 

166 current input line and implements all text manipulations on top of it. It 

167 also implements the history, undo stack and the completion state. 

168 

169 :param completer: :class:`~prompt_toolkit.completion.Completer` instance. 

170 :param history: :class:`~prompt_toolkit.history.History` instance. 

171 :param tempfile_suffix: The tempfile suffix (extension) to be used for the 

172 "open in editor" function. For a Python REPL, this would be ".py", so 

173 that the editor knows the syntax highlighting to use. This can also be 

174 a callable that returns a string. 

175 :param tempfile: For more advanced tempfile situations where you need 

176 control over the subdirectories and filename. For a Git Commit Message, 

177 this would be ".git/COMMIT_EDITMSG", so that the editor knows the syntax 

178 highlighting to use. This can also be a callable that returns a string. 

179 :param name: Name for this buffer. E.g. DEFAULT_BUFFER. This is mostly 

180 useful for key bindings where we sometimes prefer to refer to a buffer 

181 by their name instead of by reference. 

182 :param accept_handler: Called when the buffer input is accepted. (Usually 

183 when the user presses `enter`.) The accept handler receives this 

184 `Buffer` as input and should return True when the buffer text should be 

185 kept instead of calling reset. 

186 

187 In case of a `PromptSession` for instance, we want to keep the text, 

188 because we will exit the application, and only reset it during the next 

189 run. 

190 

191 Events: 

192 

193 :param on_text_changed: When the buffer text changes. (Callable or None.) 

194 :param on_text_insert: When new text is inserted. (Callable or None.) 

195 :param on_cursor_position_changed: When the cursor moves. (Callable or None.) 

196 :param on_completions_changed: When the completions were changed. (Callable or None.) 

197 :param on_suggestion_set: When an auto-suggestion text has been set. (Callable or None.) 

198 

199 Filters: 

200 

201 :param complete_while_typing: :class:`~prompt_toolkit.filters.Filter` 

202 or `bool`. Decide whether or not to do asynchronous autocompleting while 

203 typing. 

204 :param validate_while_typing: :class:`~prompt_toolkit.filters.Filter` 

205 or `bool`. Decide whether or not to do asynchronous validation while 

206 typing. 

207 :param enable_history_search: :class:`~prompt_toolkit.filters.Filter` or 

208 `bool` to indicate when up-arrow partial string matching is enabled. It 

209 is advised to not enable this at the same time as 

210 `complete_while_typing`, because when there is an autocompletion found, 

211 the up arrows usually browse through the completions, rather than 

212 through the history. 

213 :param read_only: :class:`~prompt_toolkit.filters.Filter`. When True, 

214 changes will not be allowed. 

215 :param multiline: :class:`~prompt_toolkit.filters.Filter` or `bool`. When 

216 not set, pressing `Enter` will call the `accept_handler`. Otherwise, 

217 pressing `Esc-Enter` is required. 

218 """ 

219 

220 def __init__( 

221 self, 

222 completer: Completer | None = None, 

223 auto_suggest: AutoSuggest | None = None, 

224 history: History | None = None, 

225 validator: Validator | None = None, 

226 tempfile_suffix: str | Callable[[], str] = "", 

227 tempfile: str | Callable[[], str] = "", 

228 name: str = "", 

229 complete_while_typing: FilterOrBool = False, 

230 validate_while_typing: FilterOrBool = False, 

231 enable_history_search: FilterOrBool = False, 

232 document: Document | None = None, 

233 accept_handler: BufferAcceptHandler | None = None, 

234 read_only: FilterOrBool = False, 

235 multiline: FilterOrBool = True, 

236 on_text_changed: BufferEventHandler | None = None, 

237 on_text_insert: BufferEventHandler | None = None, 

238 on_cursor_position_changed: BufferEventHandler | None = None, 

239 on_completions_changed: BufferEventHandler | None = None, 

240 on_suggestion_set: BufferEventHandler | None = None, 

241 ): 

242 # Accept both filters and booleans as input. 

243 enable_history_search = to_filter(enable_history_search) 

244 complete_while_typing = to_filter(complete_while_typing) 

245 validate_while_typing = to_filter(validate_while_typing) 

246 read_only = to_filter(read_only) 

247 multiline = to_filter(multiline) 

248 

249 self.completer = completer or DummyCompleter() 

250 self.auto_suggest = auto_suggest 

251 self.validator = validator 

252 self.tempfile_suffix = tempfile_suffix 

253 self.tempfile = tempfile 

254 self.name = name 

255 self.accept_handler = accept_handler 

256 

257 # Filters. (Usually, used by the key bindings to drive the buffer.) 

258 self.complete_while_typing = complete_while_typing 

259 self.validate_while_typing = validate_while_typing 

260 self.enable_history_search = enable_history_search 

261 self.read_only = read_only 

262 self.multiline = multiline 

263 

264 # Text width. (For wrapping, used by the Vi 'gq' operator.) 

265 self.text_width = 0 

266 

267 #: The command buffer history. 

268 # Note that we shouldn't use a lazy 'or' here. bool(history) could be 

269 # False when empty. 

270 self.history = InMemoryHistory() if history is None else history 

271 

272 self.__cursor_position = 0 

273 

274 # Events 

275 self.on_text_changed: Event[Buffer] = Event(self, on_text_changed) 

276 self.on_text_insert: Event[Buffer] = Event(self, on_text_insert) 

277 self.on_cursor_position_changed: Event[Buffer] = Event( 

278 self, on_cursor_position_changed 

279 ) 

280 self.on_completions_changed: Event[Buffer] = Event(self, on_completions_changed) 

281 self.on_suggestion_set: Event[Buffer] = Event(self, on_suggestion_set) 

282 

283 # Document cache. (Avoid creating new Document instances.) 

284 self._document_cache: FastDictCache[ 

285 tuple[str, int, SelectionState | None], Document 

286 ] = FastDictCache(Document, size=10) 

287 

288 # Create completer / auto suggestion / validation coroutines. 

289 self._async_suggester = self._create_auto_suggest_coroutine() 

290 self._async_completer = self._create_completer_coroutine() 

291 self._async_validator = self._create_auto_validate_coroutine() 

292 

293 # Asyncio task for populating the history. 

294 self._load_history_task: asyncio.Future[None] | None = None 

295 

296 # Reset other attributes. 

297 self.reset(document=document) 

298 

299 def __repr__(self) -> str: 

300 if len(self.text) < 15: 

301 text = self.text 

302 else: 

303 text = self.text[:12] + "..." 

304 

305 return f"<Buffer(name={self.name!r}, text={text!r}) at {id(self)!r}>" 

306 

307 def reset( 

308 self, document: Document | None = None, append_to_history: bool = False 

309 ) -> None: 

310 """ 

311 :param append_to_history: Append current input to history first. 

312 """ 

313 if append_to_history: 

314 self.append_to_history() 

315 

316 document = document or Document() 

317 

318 self.__cursor_position = document.cursor_position 

319 

320 # `ValidationError` instance. (Will be set when the input is wrong.) 

321 self.validation_error: ValidationError | None = None 

322 self.validation_state: ValidationState | None = ValidationState.UNKNOWN 

323 

324 # State of the selection. 

325 self.selection_state: SelectionState | None = None 

326 

327 # Multiple cursor mode. (When we press 'I' or 'A' in visual-block mode, 

328 # we can insert text on multiple lines at once. This is implemented by 

329 # using multiple cursors.) 

330 self.multiple_cursor_positions: list[int] = [] 

331 

332 # When doing consecutive up/down movements, prefer to stay at this column. 

333 self.preferred_column: int | None = None 

334 

335 # State of complete browser 

336 # For interactive completion through Ctrl-N/Ctrl-P. 

337 self.complete_state: CompletionState | None = None 

338 

339 # State of Emacs yank-nth-arg completion. 

340 self.yank_nth_arg_state: YankNthArgState | None = None # for yank-nth-arg. 

341 

342 # Remember the document that we had *right before* the last paste 

343 # operation. This is used for rotating through the kill ring. 

344 self.document_before_paste: Document | None = None 

345 

346 # Current suggestion. 

347 self.suggestion: Suggestion | None = None 

348 

349 # The history search text. (Used for filtering the history when we 

350 # browse through it.) 

351 self.history_search_text: str | None = None 

352 

353 # Undo/redo stacks (stack of `(text, cursor_position)`). 

354 self._undo_stack: list[tuple[str, int]] = [] 

355 self._redo_stack: list[tuple[str, int]] = [] 

356 

357 # Cancel history loader. If history loading was still ongoing. 

358 # Cancel the `_load_history_task`, so that next repaint of the 

359 # `BufferControl` we will repopulate it. 

360 if self._load_history_task is not None: 

361 self._load_history_task.cancel() 

362 self._load_history_task = None 

363 

364 #: The working lines. Similar to history, except that this can be 

365 #: modified. The user can press arrow_up and edit previous entries. 

366 #: Ctrl-C should reset this, and copy the whole history back in here. 

367 #: Enter should process the current command and append to the real 

368 #: history. 

369 self._working_lines: Deque[str] = deque([document.text]) 

370 self.__working_index = 0 

371 

372 def load_history_if_not_yet_loaded(self) -> None: 

373 """ 

374 Create task for populating the buffer history (if not yet done). 

375 

376 Note:: 

377 

378 This needs to be called from within the event loop of the 

379 application, because history loading is async, and we need to be 

380 sure the right event loop is active. Therefor, we call this method 

381 in the `BufferControl.create_content`. 

382 

383 There are situations where prompt_toolkit applications are created 

384 in one thread, but will later run in a different thread (Ptpython 

385 is one example. The REPL runs in a separate thread, in order to 

386 prevent interfering with a potential different event loop in the 

387 main thread. The REPL UI however is still created in the main 

388 thread.) We could decide to not support creating prompt_toolkit 

389 objects in one thread and running the application in a different 

390 thread, but history loading is the only place where it matters, and 

391 this solves it. 

392 """ 

393 if self._load_history_task is None: 

394 

395 async def load_history() -> None: 

396 async for item in self.history.load(): 

397 self._working_lines.appendleft(item) 

398 self.__working_index += 1 

399 

400 self._load_history_task = get_app().create_background_task(load_history()) 

401 

402 def load_history_done(f: asyncio.Future[None]) -> None: 

403 """ 

404 Handle `load_history` result when either done, cancelled, or 

405 when an exception was raised. 

406 """ 

407 try: 

408 f.result() 

409 except asyncio.CancelledError: 

410 # Ignore cancellation. But handle it, so that we don't get 

411 # this traceback. 

412 pass 

413 except GeneratorExit: 

414 # Probably not needed, but we had situations where 

415 # `GeneratorExit` was raised in `load_history` during 

416 # cancellation. 

417 pass 

418 except BaseException: 

419 # Log error if something goes wrong. (We don't have a 

420 # caller to which we can propagate this exception.) 

421 logger.exception("Loading history failed") 

422 

423 self._load_history_task.add_done_callback(load_history_done) 

424 

425 # <getters/setters> 

426 

427 def _set_text(self, value: str) -> bool: 

428 """set text at current working_index. Return whether it changed.""" 

429 working_index = self.working_index 

430 working_lines = self._working_lines 

431 

432 original_value = working_lines[working_index] 

433 working_lines[working_index] = value 

434 

435 # Return True when this text has been changed. 

436 if len(value) != len(original_value): 

437 # For Python 2, it seems that when two strings have a different 

438 # length and one is a prefix of the other, Python still scans 

439 # character by character to see whether the strings are different. 

440 # (Some benchmarking showed significant differences for big 

441 # documents. >100,000 of lines.) 

442 return True 

443 elif value != original_value: 

444 return True 

445 return False 

446 

447 def _set_cursor_position(self, value: int) -> bool: 

448 """Set cursor position. Return whether it changed.""" 

449 original_position = self.__cursor_position 

450 self.__cursor_position = max(0, value) 

451 

452 return self.__cursor_position != original_position 

453 

454 @property 

455 def text(self) -> str: 

456 return self._working_lines[self.working_index] 

457 

458 @text.setter 

459 def text(self, value: str) -> None: 

460 """ 

461 Setting text. (When doing this, make sure that the cursor_position is 

462 valid for this text. text/cursor_position should be consistent at any time, 

463 otherwise set a Document instead.) 

464 """ 

465 # Ensure cursor position remains within the size of the text. 

466 if self.cursor_position > len(value): 

467 self.cursor_position = len(value) 

468 

469 # Don't allow editing of read-only buffers. 

470 if self.read_only(): 

471 raise EditReadOnlyBuffer() 

472 

473 changed = self._set_text(value) 

474 

475 if changed: 

476 self._text_changed() 

477 

478 # Reset history search text. 

479 # (Note that this doesn't need to happen when working_index 

480 # changes, which is when we traverse the history. That's why we 

481 # don't do this in `self._text_changed`.) 

482 self.history_search_text = None 

483 

484 @property 

485 def cursor_position(self) -> int: 

486 return self.__cursor_position 

487 

488 @cursor_position.setter 

489 def cursor_position(self, value: int) -> None: 

490 """ 

491 Setting cursor position. 

492 """ 

493 assert isinstance(value, int) 

494 

495 # Ensure cursor position is within the size of the text. 

496 if value > len(self.text): 

497 value = len(self.text) 

498 if value < 0: 

499 value = 0 

500 

501 changed = self._set_cursor_position(value) 

502 

503 if changed: 

504 self._cursor_position_changed() 

505 

506 @property 

507 def working_index(self) -> int: 

508 return self.__working_index 

509 

510 @working_index.setter 

511 def working_index(self, value: int) -> None: 

512 if self.__working_index != value: 

513 self.__working_index = value 

514 # Make sure to reset the cursor position, otherwise we end up in 

515 # situations where the cursor position is out of the bounds of the 

516 # text. 

517 self.cursor_position = 0 

518 self._text_changed() 

519 

520 def _text_changed(self) -> None: 

521 # Remove any validation errors and complete state. 

522 self.validation_error = None 

523 self.validation_state = ValidationState.UNKNOWN 

524 self.complete_state = None 

525 self.yank_nth_arg_state = None 

526 self.document_before_paste = None 

527 self.selection_state = None 

528 self.suggestion = None 

529 self.preferred_column = None 

530 

531 # fire 'on_text_changed' event. 

532 self.on_text_changed.fire() 

533 

534 # Input validation. 

535 # (This happens on all change events, unlike auto completion, also when 

536 # deleting text.) 

537 if self.validator and self.validate_while_typing(): 

538 get_app().create_background_task(self._async_validator()) 

539 

540 def _cursor_position_changed(self) -> None: 

541 # Remove any complete state. 

542 # (Input validation should only be undone when the cursor position 

543 # changes.) 

544 self.complete_state = None 

545 self.yank_nth_arg_state = None 

546 self.document_before_paste = None 

547 

548 # Unset preferred_column. (Will be set after the cursor movement, if 

549 # required.) 

550 self.preferred_column = None 

551 

552 # Note that the cursor position can change if we have a selection the 

553 # new position of the cursor determines the end of the selection. 

554 

555 # fire 'on_cursor_position_changed' event. 

556 self.on_cursor_position_changed.fire() 

557 

558 @property 

559 def document(self) -> Document: 

560 """ 

561 Return :class:`~prompt_toolkit.document.Document` instance from the 

562 current text, cursor position and selection state. 

563 """ 

564 return self._document_cache[ 

565 self.text, self.cursor_position, self.selection_state 

566 ] 

567 

568 @document.setter 

569 def document(self, value: Document) -> None: 

570 """ 

571 Set :class:`~prompt_toolkit.document.Document` instance. 

572 

573 This will set both the text and cursor position at the same time, but 

574 atomically. (Change events will be triggered only after both have been set.) 

575 """ 

576 self.set_document(value) 

577 

578 def set_document(self, value: Document, bypass_readonly: bool = False) -> None: 

579 """ 

580 Set :class:`~prompt_toolkit.document.Document` instance. Like the 

581 ``document`` property, but accept an ``bypass_readonly`` argument. 

582 

583 :param bypass_readonly: When True, don't raise an 

584 :class:`.EditReadOnlyBuffer` exception, even 

585 when the buffer is read-only. 

586 

587 .. warning:: 

588 

589 When this buffer is read-only and `bypass_readonly` was not passed, 

590 the `EditReadOnlyBuffer` exception will be caught by the 

591 `KeyProcessor` and is silently suppressed. This is important to 

592 keep in mind when writing key bindings, because it won't do what 

593 you expect, and there won't be a stack trace. Use try/finally 

594 around this function if you need some cleanup code. 

595 """ 

596 # Don't allow editing of read-only buffers. 

597 if not bypass_readonly and self.read_only(): 

598 raise EditReadOnlyBuffer() 

599 

600 # Set text and cursor position first. 

601 text_changed = self._set_text(value.text) 

602 cursor_position_changed = self._set_cursor_position(value.cursor_position) 

603 

604 # Now handle change events. (We do this when text/cursor position is 

605 # both set and consistent.) 

606 if text_changed: 

607 self._text_changed() 

608 self.history_search_text = None 

609 

610 if cursor_position_changed: 

611 self._cursor_position_changed() 

612 

613 @property 

614 def is_returnable(self) -> bool: 

615 """ 

616 True when there is something handling accept. 

617 """ 

618 return bool(self.accept_handler) 

619 

620 # End of <getters/setters> 

621 

622 def save_to_undo_stack(self, clear_redo_stack: bool = True) -> None: 

623 """ 

624 Safe current state (input text and cursor position), so that we can 

625 restore it by calling undo. 

626 """ 

627 # Safe if the text is different from the text at the top of the stack 

628 # is different. If the text is the same, just update the cursor position. 

629 if self._undo_stack and self._undo_stack[-1][0] == self.text: 

630 self._undo_stack[-1] = (self._undo_stack[-1][0], self.cursor_position) 

631 else: 

632 self._undo_stack.append((self.text, self.cursor_position)) 

633 

634 # Saving anything to the undo stack, clears the redo stack. 

635 if clear_redo_stack: 

636 self._redo_stack = [] 

637 

638 def transform_lines( 

639 self, 

640 line_index_iterator: Iterable[int], 

641 transform_callback: Callable[[str], str], 

642 ) -> str: 

643 """ 

644 Transforms the text on a range of lines. 

645 When the iterator yield an index not in the range of lines that the 

646 document contains, it skips them silently. 

647 

648 To uppercase some lines:: 

649 

650 new_text = transform_lines(range(5,10), lambda text: text.upper()) 

651 

652 :param line_index_iterator: Iterator of line numbers (int) 

653 :param transform_callback: callable that takes the original text of a 

654 line, and return the new text for this line. 

655 

656 :returns: The new text. 

657 """ 

658 # Split lines 

659 lines = self.text.split("\n") 

660 

661 # Apply transformation 

662 for index in line_index_iterator: 

663 try: 

664 lines[index] = transform_callback(lines[index]) 

665 except IndexError: 

666 pass 

667 

668 return "\n".join(lines) 

669 

670 def transform_current_line(self, transform_callback: Callable[[str], str]) -> None: 

671 """ 

672 Apply the given transformation function to the current line. 

673 

674 :param transform_callback: callable that takes a string and return a new string. 

675 """ 

676 document = self.document 

677 a = document.cursor_position + document.get_start_of_line_position() 

678 b = document.cursor_position + document.get_end_of_line_position() 

679 self.text = ( 

680 document.text[:a] 

681 + transform_callback(document.text[a:b]) 

682 + document.text[b:] 

683 ) 

684 

685 def transform_region( 

686 self, from_: int, to: int, transform_callback: Callable[[str], str] 

687 ) -> None: 

688 """ 

689 Transform a part of the input string. 

690 

691 :param from_: (int) start position. 

692 :param to: (int) end position. 

693 :param transform_callback: Callable which accepts a string and returns 

694 the transformed string. 

695 """ 

696 assert from_ < to 

697 

698 self.text = "".join( 

699 [ 

700 self.text[:from_] 

701 + transform_callback(self.text[from_:to]) 

702 + self.text[to:] 

703 ] 

704 ) 

705 

706 def cursor_left(self, count: int = 1) -> None: 

707 self.cursor_position += self.document.get_cursor_left_position(count=count) 

708 

709 def cursor_right(self, count: int = 1) -> None: 

710 self.cursor_position += self.document.get_cursor_right_position(count=count) 

711 

712 def cursor_up(self, count: int = 1) -> None: 

713 """(for multiline edit). Move cursor to the previous line.""" 

714 original_column = self.preferred_column or self.document.cursor_position_col 

715 self.cursor_position += self.document.get_cursor_up_position( 

716 count=count, preferred_column=original_column 

717 ) 

718 

719 # Remember the original column for the next up/down movement. 

720 self.preferred_column = original_column 

721 

722 def cursor_down(self, count: int = 1) -> None: 

723 """(for multiline edit). Move cursor to the next line.""" 

724 original_column = self.preferred_column or self.document.cursor_position_col 

725 self.cursor_position += self.document.get_cursor_down_position( 

726 count=count, preferred_column=original_column 

727 ) 

728 

729 # Remember the original column for the next up/down movement. 

730 self.preferred_column = original_column 

731 

732 def auto_up( 

733 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False 

734 ) -> None: 

735 """ 

736 If we're not on the first line (of a multiline input) go a line up, 

737 otherwise go back in history. (If nothing is selected.) 

738 """ 

739 if self.complete_state: 

740 self.complete_previous(count=count) 

741 elif self.document.cursor_position_row > 0: 

742 self.cursor_up(count=count) 

743 elif not self.selection_state: 

744 self.history_backward(count=count) 

745 

746 # Go to the start of the line? 

747 if go_to_start_of_line_if_history_changes: 

748 self.cursor_position += self.document.get_start_of_line_position() 

749 

750 def auto_down( 

751 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False 

752 ) -> None: 

753 """ 

754 If we're not on the last line (of a multiline input) go a line down, 

755 otherwise go forward in history. (If nothing is selected.) 

756 """ 

757 if self.complete_state: 

758 self.complete_next(count=count) 

759 elif self.document.cursor_position_row < self.document.line_count - 1: 

760 self.cursor_down(count=count) 

761 elif not self.selection_state: 

762 self.history_forward(count=count) 

763 

764 # Go to the start of the line? 

765 if go_to_start_of_line_if_history_changes: 

766 self.cursor_position += self.document.get_start_of_line_position() 

767 

768 def delete_before_cursor(self, count: int = 1) -> str: 

769 """ 

770 Delete specified number of characters before cursor and return the 

771 deleted text. 

772 """ 

773 assert count >= 0 

774 deleted = "" 

775 

776 if self.cursor_position > 0: 

777 deleted = self.text[self.cursor_position - count : self.cursor_position] 

778 

779 new_text = ( 

780 self.text[: self.cursor_position - count] 

781 + self.text[self.cursor_position :] 

782 ) 

783 new_cursor_position = self.cursor_position - len(deleted) 

784 

785 # Set new Document atomically. 

786 self.document = Document(new_text, new_cursor_position) 

787 

788 return deleted 

789 

790 def delete(self, count: int = 1) -> str: 

791 """ 

792 Delete specified number of characters and Return the deleted text. 

793 """ 

794 if self.cursor_position < len(self.text): 

795 deleted = self.document.text_after_cursor[:count] 

796 self.text = ( 

797 self.text[: self.cursor_position] 

798 + self.text[self.cursor_position + len(deleted) :] 

799 ) 

800 return deleted 

801 else: 

802 return "" 

803 

804 def join_next_line(self, separator: str = " ") -> None: 

805 """ 

806 Join the next line to the current one by deleting the line ending after 

807 the current line. 

808 """ 

809 if not self.document.on_last_line: 

810 self.cursor_position += self.document.get_end_of_line_position() 

811 self.delete() 

812 

813 # Remove spaces. 

814 self.text = ( 

815 self.document.text_before_cursor 

816 + separator 

817 + self.document.text_after_cursor.lstrip(" ") 

818 ) 

819 

820 def join_selected_lines(self, separator: str = " ") -> None: 

821 """ 

822 Join the selected lines. 

823 """ 

824 assert self.selection_state 

825 

826 # Get lines. 

827 from_, to = sorted( 

828 [self.cursor_position, self.selection_state.original_cursor_position] 

829 ) 

830 

831 before = self.text[:from_] 

832 lines = self.text[from_:to].splitlines() 

833 after = self.text[to:] 

834 

835 # Replace leading spaces with just one space. 

836 lines = [l.lstrip(" ") + separator for l in lines] 

837 

838 # Set new document. 

839 self.document = Document( 

840 text=before + "".join(lines) + after, 

841 cursor_position=len(before + "".join(lines[:-1])) - 1, 

842 ) 

843 

844 def swap_characters_before_cursor(self) -> None: 

845 """ 

846 Swap the last two characters before the cursor. 

847 """ 

848 pos = self.cursor_position 

849 

850 if pos >= 2: 

851 a = self.text[pos - 2] 

852 b = self.text[pos - 1] 

853 

854 self.text = self.text[: pos - 2] + b + a + self.text[pos:] 

855 

856 def go_to_history(self, index: int) -> None: 

857 """ 

858 Go to this item in the history. 

859 """ 

860 if index < len(self._working_lines): 

861 self.working_index = index 

862 self.cursor_position = len(self.text) 

863 

864 def complete_next(self, count: int = 1, disable_wrap_around: bool = False) -> None: 

865 """ 

866 Browse to the next completions. 

867 (Does nothing if there are no completion.) 

868 """ 

869 index: int | None 

870 

871 if self.complete_state: 

872 completions_count = len(self.complete_state.completions) 

873 

874 if self.complete_state.complete_index is None: 

875 index = 0 

876 elif self.complete_state.complete_index == completions_count - 1: 

877 index = None 

878 

879 if disable_wrap_around: 

880 return 

881 else: 

882 index = min( 

883 completions_count - 1, self.complete_state.complete_index + count 

884 ) 

885 self.go_to_completion(index) 

886 

887 def complete_previous( 

888 self, count: int = 1, disable_wrap_around: bool = False 

889 ) -> None: 

890 """ 

891 Browse to the previous completions. 

892 (Does nothing if there are no completion.) 

893 """ 

894 index: int | None 

895 

896 if self.complete_state: 

897 if self.complete_state.complete_index == 0: 

898 index = None 

899 

900 if disable_wrap_around: 

901 return 

902 elif self.complete_state.complete_index is None: 

903 index = len(self.complete_state.completions) - 1 

904 else: 

905 index = max(0, self.complete_state.complete_index - count) 

906 

907 self.go_to_completion(index) 

908 

909 def cancel_completion(self) -> None: 

910 """ 

911 Cancel completion, go back to the original text. 

912 """ 

913 if self.complete_state: 

914 self.go_to_completion(None) 

915 self.complete_state = None 

916 

917 def _set_completions(self, completions: list[Completion]) -> CompletionState: 

918 """ 

919 Start completions. (Generate list of completions and initialize.) 

920 

921 By default, no completion will be selected. 

922 """ 

923 self.complete_state = CompletionState( 

924 original_document=self.document, completions=completions 

925 ) 

926 

927 # Trigger event. This should eventually invalidate the layout. 

928 self.on_completions_changed.fire() 

929 

930 return self.complete_state 

931 

932 def start_history_lines_completion(self) -> None: 

933 """ 

934 Start a completion based on all the other lines in the document and the 

935 history. 

936 """ 

937 found_completions: set[str] = set() 

938 completions = [] 

939 

940 # For every line of the whole history, find matches with the current line. 

941 current_line = self.document.current_line_before_cursor.lstrip() 

942 

943 for i, string in enumerate(self._working_lines): 

944 for j, l in enumerate(string.split("\n")): 

945 l = l.strip() 

946 if l and l.startswith(current_line): 

947 # When a new line has been found. 

948 if l not in found_completions: 

949 found_completions.add(l) 

950 

951 # Create completion. 

952 if i == self.working_index: 

953 display_meta = "Current, line %s" % (j + 1) 

954 else: 

955 display_meta = f"History {i + 1}, line {j + 1}" 

956 

957 completions.append( 

958 Completion( 

959 text=l, 

960 start_position=-len(current_line), 

961 display_meta=display_meta, 

962 ) 

963 ) 

964 

965 self._set_completions(completions=completions[::-1]) 

966 self.go_to_completion(0) 

967 

968 def go_to_completion(self, index: int | None) -> None: 

969 """ 

970 Select a completion from the list of current completions. 

971 """ 

972 assert self.complete_state 

973 

974 # Set new completion 

975 state = self.complete_state 

976 state.go_to_index(index) 

977 

978 # Set text/cursor position 

979 new_text, new_cursor_position = state.new_text_and_position() 

980 self.document = Document(new_text, new_cursor_position) 

981 

982 # (changing text/cursor position will unset complete_state.) 

983 self.complete_state = state 

984 

985 def apply_completion(self, completion: Completion) -> None: 

986 """ 

987 Insert a given completion. 

988 """ 

989 # If there was already a completion active, cancel that one. 

990 if self.complete_state: 

991 self.go_to_completion(None) 

992 self.complete_state = None 

993 

994 # Insert text from the given completion. 

995 self.delete_before_cursor(-completion.start_position) 

996 self.insert_text(completion.text) 

997 

998 def _set_history_search(self) -> None: 

999 """ 

1000 Set `history_search_text`. 

1001 (The text before the cursor will be used for filtering the history.) 

1002 """ 

1003 if self.enable_history_search(): 

1004 if self.history_search_text is None: 

1005 self.history_search_text = self.document.text_before_cursor 

1006 else: 

1007 self.history_search_text = None 

1008 

1009 def _history_matches(self, i: int) -> bool: 

1010 """ 

1011 True when the current entry matches the history search. 

1012 (when we don't have history search, it's also True.) 

1013 """ 

1014 return self.history_search_text is None or self._working_lines[i].startswith( 

1015 self.history_search_text 

1016 ) 

1017 

1018 def history_forward(self, count: int = 1) -> None: 

1019 """ 

1020 Move forwards through the history. 

1021 

1022 :param count: Amount of items to move forward. 

1023 """ 

1024 self._set_history_search() 

1025 

1026 # Go forward in history. 

1027 found_something = False 

1028 

1029 for i in range(self.working_index + 1, len(self._working_lines)): 

1030 if self._history_matches(i): 

1031 self.working_index = i 

1032 count -= 1 

1033 found_something = True 

1034 if count == 0: 

1035 break 

1036 

1037 # If we found an entry, move cursor to the end of the first line. 

1038 if found_something: 

1039 self.cursor_position = 0 

1040 self.cursor_position += self.document.get_end_of_line_position() 

1041 

1042 def history_backward(self, count: int = 1) -> None: 

1043 """ 

1044 Move backwards through history. 

1045 """ 

1046 self._set_history_search() 

1047 

1048 # Go back in history. 

1049 found_something = False 

1050 

1051 for i in range(self.working_index - 1, -1, -1): 

1052 if self._history_matches(i): 

1053 self.working_index = i 

1054 count -= 1 

1055 found_something = True 

1056 if count == 0: 

1057 break 

1058 

1059 # If we move to another entry, move cursor to the end of the line. 

1060 if found_something: 

1061 self.cursor_position = len(self.text) 

1062 

1063 def yank_nth_arg(self, n: int | None = None, _yank_last_arg: bool = False) -> None: 

1064 """ 

1065 Pick nth word from previous history entry (depending on current 

1066 `yank_nth_arg_state`) and insert it at current position. Rotate through 

1067 history if called repeatedly. If no `n` has been given, take the first 

1068 argument. (The second word.) 

1069 

1070 :param n: (None or int), The index of the word from the previous line 

1071 to take. 

1072 """ 

1073 assert n is None or isinstance(n, int) 

1074 history_strings = self.history.get_strings() 

1075 

1076 if not len(history_strings): 

1077 return 

1078 

1079 # Make sure we have a `YankNthArgState`. 

1080 if self.yank_nth_arg_state is None: 

1081 state = YankNthArgState(n=-1 if _yank_last_arg else 1) 

1082 else: 

1083 state = self.yank_nth_arg_state 

1084 

1085 if n is not None: 

1086 state.n = n 

1087 

1088 # Get new history position. 

1089 new_pos = state.history_position - 1 

1090 if -new_pos > len(history_strings): 

1091 new_pos = -1 

1092 

1093 # Take argument from line. 

1094 line = history_strings[new_pos] 

1095 

1096 words = [w.strip() for w in _QUOTED_WORDS_RE.split(line)] 

1097 words = [w for w in words if w] 

1098 try: 

1099 word = words[state.n] 

1100 except IndexError: 

1101 word = "" 

1102 

1103 # Insert new argument. 

1104 if state.previous_inserted_word: 

1105 self.delete_before_cursor(len(state.previous_inserted_word)) 

1106 self.insert_text(word) 

1107 

1108 # Save state again for next completion. (Note that the 'insert' 

1109 # operation from above clears `self.yank_nth_arg_state`.) 

1110 state.previous_inserted_word = word 

1111 state.history_position = new_pos 

1112 self.yank_nth_arg_state = state 

1113 

1114 def yank_last_arg(self, n: int | None = None) -> None: 

1115 """ 

1116 Like `yank_nth_arg`, but if no argument has been given, yank the last 

1117 word by default. 

1118 """ 

1119 self.yank_nth_arg(n=n, _yank_last_arg=True) 

1120 

1121 def start_selection( 

1122 self, selection_type: SelectionType = SelectionType.CHARACTERS 

1123 ) -> None: 

1124 """ 

1125 Take the current cursor position as the start of this selection. 

1126 """ 

1127 self.selection_state = SelectionState(self.cursor_position, selection_type) 

1128 

1129 def copy_selection(self, _cut: bool = False) -> ClipboardData: 

1130 """ 

1131 Copy selected text and return :class:`.ClipboardData` instance. 

1132 

1133 Notice that this doesn't store the copied data on the clipboard yet. 

1134 You can store it like this: 

1135 

1136 .. code:: python 

1137 

1138 data = buffer.copy_selection() 

1139 get_app().clipboard.set_data(data) 

1140 """ 

1141 new_document, clipboard_data = self.document.cut_selection() 

1142 if _cut: 

1143 self.document = new_document 

1144 

1145 self.selection_state = None 

1146 return clipboard_data 

1147 

1148 def cut_selection(self) -> ClipboardData: 

1149 """ 

1150 Delete selected text and return :class:`.ClipboardData` instance. 

1151 """ 

1152 return self.copy_selection(_cut=True) 

1153 

1154 def paste_clipboard_data( 

1155 self, 

1156 data: ClipboardData, 

1157 paste_mode: PasteMode = PasteMode.EMACS, 

1158 count: int = 1, 

1159 ) -> None: 

1160 """ 

1161 Insert the data from the clipboard. 

1162 """ 

1163 assert isinstance(data, ClipboardData) 

1164 assert paste_mode in (PasteMode.VI_BEFORE, PasteMode.VI_AFTER, PasteMode.EMACS) 

1165 

1166 original_document = self.document 

1167 self.document = self.document.paste_clipboard_data( 

1168 data, paste_mode=paste_mode, count=count 

1169 ) 

1170 

1171 # Remember original document. This assignment should come at the end, 

1172 # because assigning to 'document' will erase it. 

1173 self.document_before_paste = original_document 

1174 

1175 def newline(self, copy_margin: bool = True) -> None: 

1176 """ 

1177 Insert a line ending at the current position. 

1178 """ 

1179 if copy_margin: 

1180 self.insert_text("\n" + self.document.leading_whitespace_in_current_line) 

1181 else: 

1182 self.insert_text("\n") 

1183 

1184 def insert_line_above(self, copy_margin: bool = True) -> None: 

1185 """ 

1186 Insert a new line above the current one. 

1187 """ 

1188 if copy_margin: 

1189 insert = self.document.leading_whitespace_in_current_line + "\n" 

1190 else: 

1191 insert = "\n" 

1192 

1193 self.cursor_position += self.document.get_start_of_line_position() 

1194 self.insert_text(insert) 

1195 self.cursor_position -= 1 

1196 

1197 def insert_line_below(self, copy_margin: bool = True) -> None: 

1198 """ 

1199 Insert a new line below the current one. 

1200 """ 

1201 if copy_margin: 

1202 insert = "\n" + self.document.leading_whitespace_in_current_line 

1203 else: 

1204 insert = "\n" 

1205 

1206 self.cursor_position += self.document.get_end_of_line_position() 

1207 self.insert_text(insert) 

1208 

1209 def insert_text( 

1210 self, 

1211 data: str, 

1212 overwrite: bool = False, 

1213 move_cursor: bool = True, 

1214 fire_event: bool = True, 

1215 ) -> None: 

1216 """ 

1217 Insert characters at cursor position. 

1218 

1219 :param fire_event: Fire `on_text_insert` event. This is mainly used to 

1220 trigger autocompletion while typing. 

1221 """ 

1222 # Original text & cursor position. 

1223 otext = self.text 

1224 ocpos = self.cursor_position 

1225 

1226 # In insert/text mode. 

1227 if overwrite: 

1228 # Don't overwrite the newline itself. Just before the line ending, 

1229 # it should act like insert mode. 

1230 overwritten_text = otext[ocpos : ocpos + len(data)] 

1231 if "\n" in overwritten_text: 

1232 overwritten_text = overwritten_text[: overwritten_text.find("\n")] 

1233 

1234 text = otext[:ocpos] + data + otext[ocpos + len(overwritten_text) :] 

1235 else: 

1236 text = otext[:ocpos] + data + otext[ocpos:] 

1237 

1238 if move_cursor: 

1239 cpos = self.cursor_position + len(data) 

1240 else: 

1241 cpos = self.cursor_position 

1242 

1243 # Set new document. 

1244 # (Set text and cursor position at the same time. Otherwise, setting 

1245 # the text will fire a change event before the cursor position has been 

1246 # set. It works better to have this atomic.) 

1247 self.document = Document(text, cpos) 

1248 

1249 # Fire 'on_text_insert' event. 

1250 if fire_event: # XXX: rename to `start_complete`. 

1251 self.on_text_insert.fire() 

1252 

1253 # Only complete when "complete_while_typing" is enabled. 

1254 if self.completer and self.complete_while_typing(): 

1255 get_app().create_background_task(self._async_completer()) 

1256 

1257 # Call auto_suggest. 

1258 if self.auto_suggest: 

1259 get_app().create_background_task(self._async_suggester()) 

1260 

1261 def undo(self) -> None: 

1262 # Pop from the undo-stack until we find a text that if different from 

1263 # the current text. (The current logic of `save_to_undo_stack` will 

1264 # cause that the top of the undo stack is usually the same as the 

1265 # current text, so in that case we have to pop twice.) 

1266 while self._undo_stack: 

1267 text, pos = self._undo_stack.pop() 

1268 

1269 if text != self.text: 

1270 # Push current text to redo stack. 

1271 self._redo_stack.append((self.text, self.cursor_position)) 

1272 

1273 # Set new text/cursor_position. 

1274 self.document = Document(text, cursor_position=pos) 

1275 break 

1276 

1277 def redo(self) -> None: 

1278 if self._redo_stack: 

1279 # Copy current state on undo stack. 

1280 self.save_to_undo_stack(clear_redo_stack=False) 

1281 

1282 # Pop state from redo stack. 

1283 text, pos = self._redo_stack.pop() 

1284 self.document = Document(text, cursor_position=pos) 

1285 

1286 def validate(self, set_cursor: bool = False) -> bool: 

1287 """ 

1288 Returns `True` if valid. 

1289 

1290 :param set_cursor: Set the cursor position, if an error was found. 

1291 """ 

1292 # Don't call the validator again, if it was already called for the 

1293 # current input. 

1294 if self.validation_state != ValidationState.UNKNOWN: 

1295 return self.validation_state == ValidationState.VALID 

1296 

1297 # Call validator. 

1298 if self.validator: 

1299 try: 

1300 self.validator.validate(self.document) 

1301 except ValidationError as e: 

1302 # Set cursor position (don't allow invalid values.) 

1303 if set_cursor: 

1304 self.cursor_position = min( 

1305 max(0, e.cursor_position), len(self.text) 

1306 ) 

1307 

1308 self.validation_state = ValidationState.INVALID 

1309 self.validation_error = e 

1310 return False 

1311 

1312 # Handle validation result. 

1313 self.validation_state = ValidationState.VALID 

1314 self.validation_error = None 

1315 return True 

1316 

1317 async def _validate_async(self) -> None: 

1318 """ 

1319 Asynchronous version of `validate()`. 

1320 This one doesn't set the cursor position. 

1321 

1322 We have both variants, because a synchronous version is required. 

1323 Handling the ENTER key needs to be completely synchronous, otherwise 

1324 stuff like type-ahead is going to give very weird results. (People 

1325 could type input while the ENTER key is still processed.) 

1326 

1327 An asynchronous version is required if we have `validate_while_typing` 

1328 enabled. 

1329 """ 

1330 while True: 

1331 # Don't call the validator again, if it was already called for the 

1332 # current input. 

1333 if self.validation_state != ValidationState.UNKNOWN: 

1334 return 

1335 

1336 # Call validator. 

1337 error = None 

1338 document = self.document 

1339 

1340 if self.validator: 

1341 try: 

1342 await self.validator.validate_async(self.document) 

1343 except ValidationError as e: 

1344 error = e 

1345 

1346 # If the document changed during the validation, try again. 

1347 if self.document != document: 

1348 continue 

1349 

1350 # Handle validation result. 

1351 if error: 

1352 self.validation_state = ValidationState.INVALID 

1353 else: 

1354 self.validation_state = ValidationState.VALID 

1355 

1356 self.validation_error = error 

1357 get_app().invalidate() # Trigger redraw (display error). 

1358 

1359 def append_to_history(self) -> None: 

1360 """ 

1361 Append the current input to the history. 

1362 """ 

1363 # Save at the tail of the history. (But don't if the last entry the 

1364 # history is already the same.) 

1365 if self.text: 

1366 history_strings = self.history.get_strings() 

1367 if not len(history_strings) or history_strings[-1] != self.text: 

1368 self.history.append_string(self.text) 

1369 

1370 def _search( 

1371 self, 

1372 search_state: SearchState, 

1373 include_current_position: bool = False, 

1374 count: int = 1, 

1375 ) -> tuple[int, int] | None: 

1376 """ 

1377 Execute search. Return (working_index, cursor_position) tuple when this 

1378 search is applied. Returns `None` when this text cannot be found. 

1379 """ 

1380 assert count > 0 

1381 

1382 text = search_state.text 

1383 direction = search_state.direction 

1384 ignore_case = search_state.ignore_case() 

1385 

1386 def search_once( 

1387 working_index: int, document: Document 

1388 ) -> tuple[int, Document] | None: 

1389 """ 

1390 Do search one time. 

1391 Return (working_index, document) or `None` 

1392 """ 

1393 if direction == SearchDirection.FORWARD: 

1394 # Try find at the current input. 

1395 new_index = document.find( 

1396 text, 

1397 include_current_position=include_current_position, 

1398 ignore_case=ignore_case, 

1399 ) 

1400 

1401 if new_index is not None: 

1402 return ( 

1403 working_index, 

1404 Document(document.text, document.cursor_position + new_index), 

1405 ) 

1406 else: 

1407 # No match, go forward in the history. (Include len+1 to wrap around.) 

1408 # (Here we should always include all cursor positions, because 

1409 # it's a different line.) 

1410 for i in range(working_index + 1, len(self._working_lines) + 1): 

1411 i %= len(self._working_lines) 

1412 

1413 document = Document(self._working_lines[i], 0) 

1414 new_index = document.find( 

1415 text, include_current_position=True, ignore_case=ignore_case 

1416 ) 

1417 if new_index is not None: 

1418 return (i, Document(document.text, new_index)) 

1419 else: 

1420 # Try find at the current input. 

1421 new_index = document.find_backwards(text, ignore_case=ignore_case) 

1422 

1423 if new_index is not None: 

1424 return ( 

1425 working_index, 

1426 Document(document.text, document.cursor_position + new_index), 

1427 ) 

1428 else: 

1429 # No match, go back in the history. (Include -1 to wrap around.) 

1430 for i in range(working_index - 1, -2, -1): 

1431 i %= len(self._working_lines) 

1432 

1433 document = Document( 

1434 self._working_lines[i], len(self._working_lines[i]) 

1435 ) 

1436 new_index = document.find_backwards( 

1437 text, ignore_case=ignore_case 

1438 ) 

1439 if new_index is not None: 

1440 return ( 

1441 i, 

1442 Document(document.text, len(document.text) + new_index), 

1443 ) 

1444 return None 

1445 

1446 # Do 'count' search iterations. 

1447 working_index = self.working_index 

1448 document = self.document 

1449 for _ in range(count): 

1450 result = search_once(working_index, document) 

1451 if result is None: 

1452 return None # Nothing found. 

1453 else: 

1454 working_index, document = result 

1455 

1456 return (working_index, document.cursor_position) 

1457 

1458 def document_for_search(self, search_state: SearchState) -> Document: 

1459 """ 

1460 Return a :class:`~prompt_toolkit.document.Document` instance that has 

1461 the text/cursor position for this search, if we would apply it. This 

1462 will be used in the 

1463 :class:`~prompt_toolkit.layout.BufferControl` to display feedback while 

1464 searching. 

1465 """ 

1466 search_result = self._search(search_state, include_current_position=True) 

1467 

1468 if search_result is None: 

1469 return self.document 

1470 else: 

1471 working_index, cursor_position = search_result 

1472 

1473 # Keep selection, when `working_index` was not changed. 

1474 if working_index == self.working_index: 

1475 selection = self.selection_state 

1476 else: 

1477 selection = None 

1478 

1479 return Document( 

1480 self._working_lines[working_index], cursor_position, selection=selection 

1481 ) 

1482 

1483 def get_search_position( 

1484 self, 

1485 search_state: SearchState, 

1486 include_current_position: bool = True, 

1487 count: int = 1, 

1488 ) -> int: 

1489 """ 

1490 Get the cursor position for this search. 

1491 (This operation won't change the `working_index`. It's won't go through 

1492 the history. Vi text objects can't span multiple items.) 

1493 """ 

1494 search_result = self._search( 

1495 search_state, include_current_position=include_current_position, count=count 

1496 ) 

1497 

1498 if search_result is None: 

1499 return self.cursor_position 

1500 else: 

1501 working_index, cursor_position = search_result 

1502 return cursor_position 

1503 

1504 def apply_search( 

1505 self, 

1506 search_state: SearchState, 

1507 include_current_position: bool = True, 

1508 count: int = 1, 

1509 ) -> None: 

1510 """ 

1511 Apply search. If something is found, set `working_index` and 

1512 `cursor_position`. 

1513 """ 

1514 search_result = self._search( 

1515 search_state, include_current_position=include_current_position, count=count 

1516 ) 

1517 

1518 if search_result is not None: 

1519 working_index, cursor_position = search_result 

1520 self.working_index = working_index 

1521 self.cursor_position = cursor_position 

1522 

1523 def exit_selection(self) -> None: 

1524 self.selection_state = None 

1525 

1526 def _editor_simple_tempfile(self) -> tuple[str, Callable[[], None]]: 

1527 """ 

1528 Simple (file) tempfile implementation. 

1529 Return (tempfile, cleanup_func). 

1530 """ 

1531 suffix = to_str(self.tempfile_suffix) 

1532 descriptor, filename = tempfile.mkstemp(suffix) 

1533 

1534 os.write(descriptor, self.text.encode("utf-8")) 

1535 os.close(descriptor) 

1536 

1537 def cleanup() -> None: 

1538 os.unlink(filename) 

1539 

1540 return filename, cleanup 

1541 

1542 def _editor_complex_tempfile(self) -> tuple[str, Callable[[], None]]: 

1543 # Complex (directory) tempfile implementation. 

1544 headtail = to_str(self.tempfile) 

1545 if not headtail: 

1546 # Revert to simple case. 

1547 return self._editor_simple_tempfile() 

1548 headtail = str(headtail) 

1549 

1550 # Try to make according to tempfile logic. 

1551 head, tail = os.path.split(headtail) 

1552 if os.path.isabs(head): 

1553 head = head[1:] 

1554 

1555 dirpath = tempfile.mkdtemp() 

1556 if head: 

1557 dirpath = os.path.join(dirpath, head) 

1558 # Assume there is no issue creating dirs in this temp dir. 

1559 os.makedirs(dirpath) 

1560 

1561 # Open the filename and write current text. 

1562 filename = os.path.join(dirpath, tail) 

1563 with open(filename, "w", encoding="utf-8") as fh: 

1564 fh.write(self.text) 

1565 

1566 def cleanup() -> None: 

1567 shutil.rmtree(dirpath) 

1568 

1569 return filename, cleanup 

1570 

1571 def open_in_editor(self, validate_and_handle: bool = False) -> asyncio.Task[None]: 

1572 """ 

1573 Open code in editor. 

1574 

1575 This returns a future, and runs in a thread executor. 

1576 """ 

1577 if self.read_only(): 

1578 raise EditReadOnlyBuffer() 

1579 

1580 # Write current text to temporary file 

1581 if self.tempfile: 

1582 filename, cleanup_func = self._editor_complex_tempfile() 

1583 else: 

1584 filename, cleanup_func = self._editor_simple_tempfile() 

1585 

1586 async def run() -> None: 

1587 try: 

1588 # Open in editor 

1589 # (We need to use `run_in_terminal`, because not all editors go to 

1590 # the alternate screen buffer, and some could influence the cursor 

1591 # position.) 

1592 success = await run_in_terminal( 

1593 lambda: self._open_file_in_editor(filename), in_executor=True 

1594 ) 

1595 

1596 # Read content again. 

1597 if success: 

1598 with open(filename, "rb") as f: 

1599 text = f.read().decode("utf-8") 

1600 

1601 # Drop trailing newline. (Editors are supposed to add it at the 

1602 # end, but we don't need it.) 

1603 if text.endswith("\n"): 

1604 text = text[:-1] 

1605 

1606 self.document = Document(text=text, cursor_position=len(text)) 

1607 

1608 # Accept the input. 

1609 if validate_and_handle: 

1610 self.validate_and_handle() 

1611 

1612 finally: 

1613 # Clean up temp dir/file. 

1614 cleanup_func() 

1615 

1616 return get_app().create_background_task(run()) 

1617 

1618 def _open_file_in_editor(self, filename: str) -> bool: 

1619 """ 

1620 Call editor executable. 

1621 

1622 Return True when we received a zero return code. 

1623 """ 

1624 # If the 'VISUAL' or 'EDITOR' environment variable has been set, use that. 

1625 # Otherwise, fall back to the first available editor that we can find. 

1626 visual = os.environ.get("VISUAL") 

1627 editor = os.environ.get("EDITOR") 

1628 

1629 editors = [ 

1630 visual, 

1631 editor, 

1632 # Order of preference. 

1633 "/usr/bin/editor", 

1634 "/usr/bin/nano", 

1635 "/usr/bin/pico", 

1636 "/usr/bin/vi", 

1637 "/usr/bin/emacs", 

1638 ] 

1639 

1640 for e in editors: 

1641 if e: 

1642 try: 

1643 # Use 'shlex.split()', because $VISUAL can contain spaces 

1644 # and quotes. 

1645 returncode = subprocess.call(shlex.split(e) + [filename]) 

1646 return returncode == 0 

1647 

1648 except OSError: 

1649 # Executable does not exist, try the next one. 

1650 pass 

1651 

1652 return False 

1653 

1654 def start_completion( 

1655 self, 

1656 select_first: bool = False, 

1657 select_last: bool = False, 

1658 insert_common_part: bool = False, 

1659 complete_event: CompleteEvent | None = None, 

1660 ) -> None: 

1661 """ 

1662 Start asynchronous autocompletion of this buffer. 

1663 (This will do nothing if a previous completion was still in progress.) 

1664 """ 

1665 # Only one of these options can be selected. 

1666 assert select_first + select_last + insert_common_part <= 1 

1667 

1668 get_app().create_background_task( 

1669 self._async_completer( 

1670 select_first=select_first, 

1671 select_last=select_last, 

1672 insert_common_part=insert_common_part, 

1673 complete_event=complete_event 

1674 or CompleteEvent(completion_requested=True), 

1675 ) 

1676 ) 

1677 

1678 def _create_completer_coroutine(self) -> Callable[..., Coroutine[Any, Any, None]]: 

1679 """ 

1680 Create function for asynchronous autocompletion. 

1681 

1682 (This consumes the asynchronous completer generator, which possibly 

1683 runs the completion algorithm in another thread.) 

1684 """ 

1685 

1686 def completion_does_nothing(document: Document, completion: Completion) -> bool: 

1687 """ 

1688 Return `True` if applying this completion doesn't have any effect. 

1689 (When it doesn't insert any new text. 

1690 """ 

1691 text_before_cursor = document.text_before_cursor 

1692 replaced_text = text_before_cursor[ 

1693 len(text_before_cursor) + completion.start_position : 

1694 ] 

1695 return replaced_text == completion.text 

1696 

1697 @_only_one_at_a_time 

1698 async def async_completer( 

1699 select_first: bool = False, 

1700 select_last: bool = False, 

1701 insert_common_part: bool = False, 

1702 complete_event: CompleteEvent | None = None, 

1703 ) -> None: 

1704 document = self.document 

1705 complete_event = complete_event or CompleteEvent(text_inserted=True) 

1706 

1707 # Don't complete when we already have completions. 

1708 if self.complete_state or not self.completer: 

1709 return 

1710 

1711 # Create an empty CompletionState. 

1712 complete_state = CompletionState(original_document=self.document) 

1713 self.complete_state = complete_state 

1714 

1715 def proceed() -> bool: 

1716 """Keep retrieving completions. Input text has not yet changed 

1717 while generating completions.""" 

1718 return self.complete_state == complete_state 

1719 

1720 refresh_needed = asyncio.Event() 

1721 

1722 async def refresh_while_loading() -> None: 

1723 """Background loop to refresh the UI at most 3 times a second 

1724 while the completion are loading. Calling 

1725 `on_completions_changed.fire()` for every completion that we 

1726 receive is too expensive when there are many completions. (We 

1727 could tune `Application.max_render_postpone_time` and 

1728 `Application.min_redraw_interval`, but having this here is a 

1729 better approach.) 

1730 """ 

1731 while True: 

1732 self.on_completions_changed.fire() 

1733 refresh_needed.clear() 

1734 await asyncio.sleep(0.3) 

1735 await refresh_needed.wait() 

1736 

1737 refresh_task = asyncio.ensure_future(refresh_while_loading()) 

1738 try: 

1739 # Load. 

1740 async with aclosing( 

1741 self.completer.get_completions_async(document, complete_event) 

1742 ) as async_generator: 

1743 async for completion in async_generator: 

1744 complete_state.completions.append(completion) 

1745 refresh_needed.set() 

1746 

1747 # If the input text changes, abort. 

1748 if not proceed(): 

1749 break 

1750 finally: 

1751 refresh_task.cancel() 

1752 

1753 # Refresh one final time after we got everything. 

1754 self.on_completions_changed.fire() 

1755 

1756 completions = complete_state.completions 

1757 

1758 # When there is only one completion, which has nothing to add, ignore it. 

1759 if len(completions) == 1 and completion_does_nothing( 

1760 document, completions[0] 

1761 ): 

1762 del completions[:] 

1763 

1764 # Set completions if the text was not yet changed. 

1765 if proceed(): 

1766 # When no completions were found, or when the user selected 

1767 # already a completion by using the arrow keys, don't do anything. 

1768 if ( 

1769 not self.complete_state 

1770 or self.complete_state.complete_index is not None 

1771 ): 

1772 return 

1773 

1774 # When there are no completions, reset completion state anyway. 

1775 if not completions: 

1776 self.complete_state = None 

1777 # Render the ui if the completion menu was shown 

1778 # it is needed especially if there is one completion and it was deleted. 

1779 self.on_completions_changed.fire() 

1780 return 

1781 

1782 # Select first/last or insert common part, depending on the key 

1783 # binding. (For this we have to wait until all completions are 

1784 # loaded.) 

1785 

1786 if select_first: 

1787 self.go_to_completion(0) 

1788 

1789 elif select_last: 

1790 self.go_to_completion(len(completions) - 1) 

1791 

1792 elif insert_common_part: 

1793 common_part = get_common_complete_suffix(document, completions) 

1794 if common_part: 

1795 # Insert the common part, update completions. 

1796 self.insert_text(common_part) 

1797 if len(completions) > 1: 

1798 # (Don't call `async_completer` again, but 

1799 # recalculate completions. See: 

1800 # https://github.com/ipython/ipython/issues/9658) 

1801 completions[:] = [ 

1802 c.new_completion_from_position(len(common_part)) 

1803 for c in completions 

1804 ] 

1805 

1806 self._set_completions(completions=completions) 

1807 else: 

1808 self.complete_state = None 

1809 else: 

1810 # When we were asked to insert the "common" 

1811 # prefix, but there was no common suffix but 

1812 # still exactly one match, then select the 

1813 # first. (It could be that we have a completion 

1814 # which does * expansion, like '*.py', with 

1815 # exactly one match.) 

1816 if len(completions) == 1: 

1817 self.go_to_completion(0) 

1818 

1819 else: 

1820 # If the last operation was an insert, (not a delete), restart 

1821 # the completion coroutine. 

1822 

1823 if self.document.text_before_cursor == document.text_before_cursor: 

1824 return # Nothing changed. 

1825 

1826 if self.document.text_before_cursor.startswith( 

1827 document.text_before_cursor 

1828 ): 

1829 raise _Retry 

1830 

1831 return async_completer 

1832 

1833 def _create_auto_suggest_coroutine(self) -> Callable[[], Coroutine[Any, Any, None]]: 

1834 """ 

1835 Create function for asynchronous auto suggestion. 

1836 (This can be in another thread.) 

1837 """ 

1838 

1839 @_only_one_at_a_time 

1840 async def async_suggestor() -> None: 

1841 document = self.document 

1842 

1843 # Don't suggest when we already have a suggestion. 

1844 if self.suggestion or not self.auto_suggest: 

1845 return 

1846 

1847 suggestion = await self.auto_suggest.get_suggestion_async(self, document) 

1848 

1849 # Set suggestion only if the text was not yet changed. 

1850 if self.document == document: 

1851 # Set suggestion and redraw interface. 

1852 self.suggestion = suggestion 

1853 self.on_suggestion_set.fire() 

1854 else: 

1855 # Otherwise, restart thread. 

1856 raise _Retry 

1857 

1858 return async_suggestor 

1859 

1860 def _create_auto_validate_coroutine( 

1861 self, 

1862 ) -> Callable[[], Coroutine[Any, Any, None]]: 

1863 """ 

1864 Create a function for asynchronous validation while typing. 

1865 (This can be in another thread.) 

1866 """ 

1867 

1868 @_only_one_at_a_time 

1869 async def async_validator() -> None: 

1870 await self._validate_async() 

1871 

1872 return async_validator 

1873 

1874 def validate_and_handle(self) -> None: 

1875 """ 

1876 Validate buffer and handle the accept action. 

1877 """ 

1878 valid = self.validate(set_cursor=True) 

1879 

1880 # When the validation succeeded, accept the input. 

1881 if valid: 

1882 if self.accept_handler: 

1883 keep_text = self.accept_handler(self) 

1884 else: 

1885 keep_text = False 

1886 

1887 self.append_to_history() 

1888 

1889 if not keep_text: 

1890 self.reset() 

1891 

1892 

1893_T = TypeVar("_T", bound=Callable[..., Awaitable[None]]) 

1894 

1895 

1896def _only_one_at_a_time(coroutine: _T) -> _T: 

1897 """ 

1898 Decorator that only starts the coroutine only if the previous call has 

1899 finished. (Used to make sure that we have only one autocompleter, auto 

1900 suggestor and validator running at a time.) 

1901 

1902 When the coroutine raises `_Retry`, it is restarted. 

1903 """ 

1904 running = False 

1905 

1906 @wraps(coroutine) 

1907 async def new_coroutine(*a: Any, **kw: Any) -> Any: 

1908 nonlocal running 

1909 

1910 # Don't start a new function, if the previous is still in progress. 

1911 if running: 

1912 return 

1913 

1914 running = True 

1915 

1916 try: 

1917 while True: 

1918 try: 

1919 await coroutine(*a, **kw) 

1920 except _Retry: 

1921 continue 

1922 else: 

1923 return None 

1924 finally: 

1925 running = False 

1926 

1927 return cast(_T, new_coroutine) 

1928 

1929 

1930class _Retry(Exception): 

1931 "Retry in `_only_one_at_a_time`." 

1932 

1933 

1934def indent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: 

1935 """ 

1936 Indent text of a :class:`.Buffer` object. 

1937 """ 

1938 current_row = buffer.document.cursor_position_row 

1939 line_range = range(from_row, to_row) 

1940 

1941 # Apply transformation. 

1942 new_text = buffer.transform_lines(line_range, lambda l: " " * count + l) 

1943 buffer.document = Document( 

1944 new_text, Document(new_text).translate_row_col_to_index(current_row, 0) 

1945 ) 

1946 

1947 # Go to the start of the line. 

1948 buffer.cursor_position += buffer.document.get_start_of_line_position( 

1949 after_whitespace=True 

1950 ) 

1951 

1952 

1953def unindent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: 

1954 """ 

1955 Unindent text of a :class:`.Buffer` object. 

1956 """ 

1957 current_row = buffer.document.cursor_position_row 

1958 line_range = range(from_row, to_row) 

1959 

1960 def transform(text: str) -> str: 

1961 remove = " " * count 

1962 if text.startswith(remove): 

1963 return text[len(remove) :] 

1964 else: 

1965 return text.lstrip() 

1966 

1967 # Apply transformation. 

1968 new_text = buffer.transform_lines(line_range, transform) 

1969 buffer.document = Document( 

1970 new_text, Document(new_text).translate_row_col_to_index(current_row, 0) 

1971 ) 

1972 

1973 # Go to the start of the line. 

1974 buffer.cursor_position += buffer.document.get_start_of_line_position( 

1975 after_whitespace=True 

1976 ) 

1977 

1978 

1979def reshape_text(buffer: Buffer, from_row: int, to_row: int) -> None: 

1980 """ 

1981 Reformat text, taking the width into account. 

1982 `to_row` is included. 

1983 (Vi 'gq' operator.) 

1984 """ 

1985 lines = buffer.text.splitlines(True) 

1986 lines_before = lines[:from_row] 

1987 lines_after = lines[to_row + 1 :] 

1988 lines_to_reformat = lines[from_row : to_row + 1] 

1989 

1990 if lines_to_reformat: 

1991 # Take indentation from the first line. 

1992 match = re.search(r"^\s*", lines_to_reformat[0]) 

1993 length = match.end() if match else 0 # `match` can't be None, actually. 

1994 

1995 indent = lines_to_reformat[0][:length].replace("\n", "") 

1996 

1997 # Now, take all the 'words' from the lines to be reshaped. 

1998 words = "".join(lines_to_reformat).split() 

1999 

2000 # And reshape. 

2001 width = (buffer.text_width or 80) - len(indent) 

2002 reshaped_text = [indent] 

2003 current_width = 0 

2004 for w in words: 

2005 if current_width: 

2006 if len(w) + current_width + 1 > width: 

2007 reshaped_text.append("\n") 

2008 reshaped_text.append(indent) 

2009 current_width = 0 

2010 else: 

2011 reshaped_text.append(" ") 

2012 current_width += 1 

2013 

2014 reshaped_text.append(w) 

2015 current_width += len(w) 

2016 

2017 if reshaped_text[-1] != "\n": 

2018 reshaped_text.append("\n") 

2019 

2020 # Apply result. 

2021 buffer.document = Document( 

2022 text="".join(lines_before + reshaped_text + lines_after), 

2023 cursor_position=len("".join(lines_before + reshaped_text)), 

2024 )