Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/buffer.py: 17%

807 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1""" 

2Data structures for the Buffer. 

3It holds the text, cursor position, history, etc... 

4""" 

5from __future__ import annotations 

6 

7import asyncio 

8import logging 

9import os 

10import re 

11import shlex 

12import shutil 

13import subprocess 

14import tempfile 

15from collections import deque 

16from enum import Enum 

17from functools import wraps 

18from typing import ( 

19 Any, 

20 Awaitable, 

21 Callable, 

22 Coroutine, 

23 Deque, 

24 Iterable, 

25 List, 

26 Optional, 

27 Set, 

28 Tuple, 

29 TypeVar, 

30 Union, 

31 cast, 

32) 

33 

34from .application.current import get_app 

35from .application.run_in_terminal import run_in_terminal 

36from .auto_suggest import AutoSuggest, Suggestion 

37from .cache import FastDictCache 

38from .clipboard import ClipboardData 

39from .completion import ( 

40 CompleteEvent, 

41 Completer, 

42 Completion, 

43 DummyCompleter, 

44 get_common_complete_suffix, 

45) 

46from .document import Document 

47from .eventloop import aclosing 

48from .filters import FilterOrBool, to_filter 

49from .history import History, InMemoryHistory 

50from .search import SearchDirection, SearchState 

51from .selection import PasteMode, SelectionState, SelectionType 

52from .utils import Event, to_str 

53from .validation import ValidationError, Validator 

54 

55__all__ = [ 

56 "EditReadOnlyBuffer", 

57 "Buffer", 

58 "CompletionState", 

59 "indent", 

60 "unindent", 

61 "reshape_text", 

62] 

63 

64logger = logging.getLogger(__name__) 

65 

66 

67class EditReadOnlyBuffer(Exception): 

68 "Attempt editing of read-only :class:`.Buffer`." 

69 

70 

71class ValidationState(Enum): 

72 "The validation state of a buffer. This is set after the validation." 

73 VALID = "VALID" 

74 INVALID = "INVALID" 

75 UNKNOWN = "UNKNOWN" 

76 

77 

78class CompletionState: 

79 """ 

80 Immutable class that contains a completion state. 

81 """ 

82 

83 def __init__( 

84 self, 

85 original_document: Document, 

86 completions: list[Completion] | None = None, 

87 complete_index: int | None = None, 

88 ) -> None: 

89 #: Document as it was when the completion started. 

90 self.original_document = original_document 

91 

92 #: List of all the current Completion instances which are possible at 

93 #: this point. 

94 self.completions = completions or [] 

95 

96 #: Position in the `completions` array. 

97 #: This can be `None` to indicate "no completion", the original text. 

98 self.complete_index = complete_index # Position in the `_completions` array. 

99 

100 def __repr__(self) -> str: 

101 return "{}({!r}, <{!r}> completions, index={!r})".format( 

102 self.__class__.__name__, 

103 self.original_document, 

104 len(self.completions), 

105 self.complete_index, 

106 ) 

107 

108 def go_to_index(self, index: int | None) -> None: 

109 """ 

110 Create a new :class:`.CompletionState` object with the new index. 

111 

112 When `index` is `None` deselect the completion. 

113 """ 

114 if self.completions: 

115 assert index is None or 0 <= index < len(self.completions) 

116 self.complete_index = index 

117 

118 def new_text_and_position(self) -> tuple[str, int]: 

119 """ 

120 Return (new_text, new_cursor_position) for this completion. 

121 """ 

122 if self.complete_index is None: 

123 return self.original_document.text, self.original_document.cursor_position 

124 else: 

125 original_text_before_cursor = self.original_document.text_before_cursor 

126 original_text_after_cursor = self.original_document.text_after_cursor 

127 

128 c = self.completions[self.complete_index] 

129 if c.start_position == 0: 

130 before = original_text_before_cursor 

131 else: 

132 before = original_text_before_cursor[: c.start_position] 

133 

134 new_text = before + c.text + original_text_after_cursor 

135 new_cursor_position = len(before) + len(c.text) 

136 return new_text, new_cursor_position 

137 

138 @property 

139 def current_completion(self) -> Completion | None: 

140 """ 

141 Return the current completion, or return `None` when no completion is 

142 selected. 

143 """ 

144 if self.complete_index is not None: 

145 return self.completions[self.complete_index] 

146 return None 

147 

148 

149_QUOTED_WORDS_RE = re.compile(r"""(\s+|".*?"|'.*?')""") 

150 

151 

152class YankNthArgState: 

153 """ 

154 For yank-last-arg/yank-nth-arg: Keep track of where we are in the history. 

155 """ 

156 

157 def __init__( 

158 self, history_position: int = 0, n: int = -1, previous_inserted_word: str = "" 

159 ) -> None: 

160 self.history_position = history_position 

161 self.previous_inserted_word = previous_inserted_word 

162 self.n = n 

163 

164 def __repr__(self) -> str: 

165 return "{}(history_position={!r}, n={!r}, previous_inserted_word={!r})".format( 

166 self.__class__.__name__, 

167 self.history_position, 

168 self.n, 

169 self.previous_inserted_word, 

170 ) 

171 

172 

173BufferEventHandler = Callable[["Buffer"], None] 

174BufferAcceptHandler = Callable[["Buffer"], bool] 

175 

176 

177class Buffer: 

178 """ 

179 The core data structure that holds the text and cursor position of the 

180 current input line and implements all text manipulations on top of it. It 

181 also implements the history, undo stack and the completion state. 

182 

183 :param completer: :class:`~prompt_toolkit.completion.Completer` instance. 

184 :param history: :class:`~prompt_toolkit.history.History` instance. 

185 :param tempfile_suffix: The tempfile suffix (extension) to be used for the 

186 "open in editor" function. For a Python REPL, this would be ".py", so 

187 that the editor knows the syntax highlighting to use. This can also be 

188 a callable that returns a string. 

189 :param tempfile: For more advanced tempfile situations where you need 

190 control over the subdirectories and filename. For a Git Commit Message, 

191 this would be ".git/COMMIT_EDITMSG", so that the editor knows the syntax 

192 highlighting to use. This can also be a callable that returns a string. 

193 :param name: Name for this buffer. E.g. DEFAULT_BUFFER. This is mostly 

194 useful for key bindings where we sometimes prefer to refer to a buffer 

195 by their name instead of by reference. 

196 :param accept_handler: Called when the buffer input is accepted. (Usually 

197 when the user presses `enter`.) The accept handler receives this 

198 `Buffer` as input and should return True when the buffer text should be 

199 kept instead of calling reset. 

200 

201 In case of a `PromptSession` for instance, we want to keep the text, 

202 because we will exit the application, and only reset it during the next 

203 run. 

204 

205 Events: 

206 

207 :param on_text_changed: When the buffer text changes. (Callable or None.) 

208 :param on_text_insert: When new text is inserted. (Callable or None.) 

209 :param on_cursor_position_changed: When the cursor moves. (Callable or None.) 

210 :param on_completions_changed: When the completions were changed. (Callable or None.) 

211 :param on_suggestion_set: When an auto-suggestion text has been set. (Callable or None.) 

212 

213 Filters: 

214 

215 :param complete_while_typing: :class:`~prompt_toolkit.filters.Filter` 

216 or `bool`. Decide whether or not to do asynchronous autocompleting while 

217 typing. 

218 :param validate_while_typing: :class:`~prompt_toolkit.filters.Filter` 

219 or `bool`. Decide whether or not to do asynchronous validation while 

220 typing. 

221 :param enable_history_search: :class:`~prompt_toolkit.filters.Filter` or 

222 `bool` to indicate when up-arrow partial string matching is enabled. It 

223 is advised to not enable this at the same time as 

224 `complete_while_typing`, because when there is an autocompletion found, 

225 the up arrows usually browse through the completions, rather than 

226 through the history. 

227 :param read_only: :class:`~prompt_toolkit.filters.Filter`. When True, 

228 changes will not be allowed. 

229 :param multiline: :class:`~prompt_toolkit.filters.Filter` or `bool`. When 

230 not set, pressing `Enter` will call the `accept_handler`. Otherwise, 

231 pressing `Esc-Enter` is required. 

232 """ 

233 

234 def __init__( 

235 self, 

236 completer: Completer | None = None, 

237 auto_suggest: AutoSuggest | None = None, 

238 history: History | None = None, 

239 validator: Validator | None = None, 

240 tempfile_suffix: str | Callable[[], str] = "", 

241 tempfile: str | Callable[[], str] = "", 

242 name: str = "", 

243 complete_while_typing: FilterOrBool = False, 

244 validate_while_typing: FilterOrBool = False, 

245 enable_history_search: FilterOrBool = False, 

246 document: Document | None = None, 

247 accept_handler: BufferAcceptHandler | None = None, 

248 read_only: FilterOrBool = False, 

249 multiline: FilterOrBool = True, 

250 on_text_changed: BufferEventHandler | None = None, 

251 on_text_insert: BufferEventHandler | None = None, 

252 on_cursor_position_changed: BufferEventHandler | None = None, 

253 on_completions_changed: BufferEventHandler | None = None, 

254 on_suggestion_set: BufferEventHandler | None = None, 

255 ): 

256 # Accept both filters and booleans as input. 

257 enable_history_search = to_filter(enable_history_search) 

258 complete_while_typing = to_filter(complete_while_typing) 

259 validate_while_typing = to_filter(validate_while_typing) 

260 read_only = to_filter(read_only) 

261 multiline = to_filter(multiline) 

262 

263 self.completer = completer or DummyCompleter() 

264 self.auto_suggest = auto_suggest 

265 self.validator = validator 

266 self.tempfile_suffix = tempfile_suffix 

267 self.tempfile = tempfile 

268 self.name = name 

269 self.accept_handler = accept_handler 

270 

271 # Filters. (Usually, used by the key bindings to drive the buffer.) 

272 self.complete_while_typing = complete_while_typing 

273 self.validate_while_typing = validate_while_typing 

274 self.enable_history_search = enable_history_search 

275 self.read_only = read_only 

276 self.multiline = multiline 

277 

278 # Text width. (For wrapping, used by the Vi 'gq' operator.) 

279 self.text_width = 0 

280 

281 #: The command buffer history. 

282 # Note that we shouldn't use a lazy 'or' here. bool(history) could be 

283 # False when empty. 

284 self.history = InMemoryHistory() if history is None else history 

285 

286 self.__cursor_position = 0 

287 

288 # Events 

289 self.on_text_changed: Event[Buffer] = Event(self, on_text_changed) 

290 self.on_text_insert: Event[Buffer] = Event(self, on_text_insert) 

291 self.on_cursor_position_changed: Event[Buffer] = Event( 

292 self, on_cursor_position_changed 

293 ) 

294 self.on_completions_changed: Event[Buffer] = Event(self, on_completions_changed) 

295 self.on_suggestion_set: Event[Buffer] = Event(self, on_suggestion_set) 

296 

297 # Document cache. (Avoid creating new Document instances.) 

298 self._document_cache: FastDictCache[ 

299 tuple[str, int, SelectionState | None], Document 

300 ] = FastDictCache(Document, size=10) 

301 

302 # Create completer / auto suggestion / validation coroutines. 

303 self._async_suggester = self._create_auto_suggest_coroutine() 

304 self._async_completer = self._create_completer_coroutine() 

305 self._async_validator = self._create_auto_validate_coroutine() 

306 

307 # Asyncio task for populating the history. 

308 self._load_history_task: asyncio.Future[None] | None = None 

309 

310 # Reset other attributes. 

311 self.reset(document=document) 

312 

313 def __repr__(self) -> str: 

314 if len(self.text) < 15: 

315 text = self.text 

316 else: 

317 text = self.text[:12] + "..." 

318 

319 return f"<Buffer(name={self.name!r}, text={text!r}) at {id(self)!r}>" 

320 

321 def reset( 

322 self, document: Document | None = None, append_to_history: bool = False 

323 ) -> None: 

324 """ 

325 :param append_to_history: Append current input to history first. 

326 """ 

327 if append_to_history: 

328 self.append_to_history() 

329 

330 document = document or Document() 

331 

332 self.__cursor_position = document.cursor_position 

333 

334 # `ValidationError` instance. (Will be set when the input is wrong.) 

335 self.validation_error: ValidationError | None = None 

336 self.validation_state: ValidationState | None = ValidationState.UNKNOWN 

337 

338 # State of the selection. 

339 self.selection_state: SelectionState | None = None 

340 

341 # Multiple cursor mode. (When we press 'I' or 'A' in visual-block mode, 

342 # we can insert text on multiple lines at once. This is implemented by 

343 # using multiple cursors.) 

344 self.multiple_cursor_positions: list[int] = [] 

345 

346 # When doing consecutive up/down movements, prefer to stay at this column. 

347 self.preferred_column: int | None = None 

348 

349 # State of complete browser 

350 # For interactive completion through Ctrl-N/Ctrl-P. 

351 self.complete_state: CompletionState | None = None 

352 

353 # State of Emacs yank-nth-arg completion. 

354 self.yank_nth_arg_state: YankNthArgState | None = None # for yank-nth-arg. 

355 

356 # Remember the document that we had *right before* the last paste 

357 # operation. This is used for rotating through the kill ring. 

358 self.document_before_paste: Document | None = None 

359 

360 # Current suggestion. 

361 self.suggestion: Suggestion | None = None 

362 

363 # The history search text. (Used for filtering the history when we 

364 # browse through it.) 

365 self.history_search_text: str | None = None 

366 

367 # Undo/redo stacks (stack of `(text, cursor_position)`). 

368 self._undo_stack: list[tuple[str, int]] = [] 

369 self._redo_stack: list[tuple[str, int]] = [] 

370 

371 # Cancel history loader. If history loading was still ongoing. 

372 # Cancel the `_load_history_task`, so that next repaint of the 

373 # `BufferControl` we will repopulate it. 

374 if self._load_history_task is not None: 

375 self._load_history_task.cancel() 

376 self._load_history_task = None 

377 

378 #: The working lines. Similar to history, except that this can be 

379 #: modified. The user can press arrow_up and edit previous entries. 

380 #: Ctrl-C should reset this, and copy the whole history back in here. 

381 #: Enter should process the current command and append to the real 

382 #: history. 

383 self._working_lines: Deque[str] = deque([document.text]) 

384 self.__working_index = 0 

385 

386 def load_history_if_not_yet_loaded(self) -> None: 

387 """ 

388 Create task for populating the buffer history (if not yet done). 

389 

390 Note:: 

391 

392 This needs to be called from within the event loop of the 

393 application, because history loading is async, and we need to be 

394 sure the right event loop is active. Therefor, we call this method 

395 in the `BufferControl.create_content`. 

396 

397 There are situations where prompt_toolkit applications are created 

398 in one thread, but will later run in a different thread (Ptpython 

399 is one example. The REPL runs in a separate thread, in order to 

400 prevent interfering with a potential different event loop in the 

401 main thread. The REPL UI however is still created in the main 

402 thread.) We could decide to not support creating prompt_toolkit 

403 objects in one thread and running the application in a different 

404 thread, but history loading is the only place where it matters, and 

405 this solves it. 

406 """ 

407 if self._load_history_task is None: 

408 

409 async def load_history() -> None: 

410 async for item in self.history.load(): 

411 self._working_lines.appendleft(item) 

412 self.__working_index += 1 

413 

414 self._load_history_task = get_app().create_background_task(load_history()) 

415 

416 def load_history_done(f: asyncio.Future[None]) -> None: 

417 """ 

418 Handle `load_history` result when either done, cancelled, or 

419 when an exception was raised. 

420 """ 

421 try: 

422 f.result() 

423 except asyncio.CancelledError: 

424 # Ignore cancellation. But handle it, so that we don't get 

425 # this traceback. 

426 pass 

427 except GeneratorExit: 

428 # Probably not needed, but we had situations where 

429 # `GeneratorExit` was raised in `load_history` during 

430 # cancellation. 

431 pass 

432 except BaseException: 

433 # Log error if something goes wrong. (We don't have a 

434 # caller to which we can propagate this exception.) 

435 logger.exception("Loading history failed") 

436 

437 self._load_history_task.add_done_callback(load_history_done) 

438 

439 # <getters/setters> 

440 

441 def _set_text(self, value: str) -> bool: 

442 """set text at current working_index. Return whether it changed.""" 

443 working_index = self.working_index 

444 working_lines = self._working_lines 

445 

446 original_value = working_lines[working_index] 

447 working_lines[working_index] = value 

448 

449 # Return True when this text has been changed. 

450 if len(value) != len(original_value): 

451 # For Python 2, it seems that when two strings have a different 

452 # length and one is a prefix of the other, Python still scans 

453 # character by character to see whether the strings are different. 

454 # (Some benchmarking showed significant differences for big 

455 # documents. >100,000 of lines.) 

456 return True 

457 elif value != original_value: 

458 return True 

459 return False 

460 

461 def _set_cursor_position(self, value: int) -> bool: 

462 """Set cursor position. Return whether it changed.""" 

463 original_position = self.__cursor_position 

464 self.__cursor_position = max(0, value) 

465 

466 return self.__cursor_position != original_position 

467 

468 @property 

469 def text(self) -> str: 

470 return self._working_lines[self.working_index] 

471 

472 @text.setter 

473 def text(self, value: str) -> None: 

474 """ 

475 Setting text. (When doing this, make sure that the cursor_position is 

476 valid for this text. text/cursor_position should be consistent at any time, 

477 otherwise set a Document instead.) 

478 """ 

479 # Ensure cursor position remains within the size of the text. 

480 if self.cursor_position > len(value): 

481 self.cursor_position = len(value) 

482 

483 # Don't allow editing of read-only buffers. 

484 if self.read_only(): 

485 raise EditReadOnlyBuffer() 

486 

487 changed = self._set_text(value) 

488 

489 if changed: 

490 self._text_changed() 

491 

492 # Reset history search text. 

493 # (Note that this doesn't need to happen when working_index 

494 # changes, which is when we traverse the history. That's why we 

495 # don't do this in `self._text_changed`.) 

496 self.history_search_text = None 

497 

498 @property 

499 def cursor_position(self) -> int: 

500 return self.__cursor_position 

501 

502 @cursor_position.setter 

503 def cursor_position(self, value: int) -> None: 

504 """ 

505 Setting cursor position. 

506 """ 

507 assert isinstance(value, int) 

508 

509 # Ensure cursor position is within the size of the text. 

510 if value > len(self.text): 

511 value = len(self.text) 

512 if value < 0: 

513 value = 0 

514 

515 changed = self._set_cursor_position(value) 

516 

517 if changed: 

518 self._cursor_position_changed() 

519 

520 @property 

521 def working_index(self) -> int: 

522 return self.__working_index 

523 

524 @working_index.setter 

525 def working_index(self, value: int) -> None: 

526 if self.__working_index != value: 

527 self.__working_index = value 

528 # Make sure to reset the cursor position, otherwise we end up in 

529 # situations where the cursor position is out of the bounds of the 

530 # text. 

531 self.cursor_position = 0 

532 self._text_changed() 

533 

534 def _text_changed(self) -> None: 

535 # Remove any validation errors and complete state. 

536 self.validation_error = None 

537 self.validation_state = ValidationState.UNKNOWN 

538 self.complete_state = None 

539 self.yank_nth_arg_state = None 

540 self.document_before_paste = None 

541 self.selection_state = None 

542 self.suggestion = None 

543 self.preferred_column = None 

544 

545 # fire 'on_text_changed' event. 

546 self.on_text_changed.fire() 

547 

548 # Input validation. 

549 # (This happens on all change events, unlike auto completion, also when 

550 # deleting text.) 

551 if self.validator and self.validate_while_typing(): 

552 get_app().create_background_task(self._async_validator()) 

553 

554 def _cursor_position_changed(self) -> None: 

555 # Remove any complete state. 

556 # (Input validation should only be undone when the cursor position 

557 # changes.) 

558 self.complete_state = None 

559 self.yank_nth_arg_state = None 

560 self.document_before_paste = None 

561 

562 # Unset preferred_column. (Will be set after the cursor movement, if 

563 # required.) 

564 self.preferred_column = None 

565 

566 # Note that the cursor position can change if we have a selection the 

567 # new position of the cursor determines the end of the selection. 

568 

569 # fire 'on_cursor_position_changed' event. 

570 self.on_cursor_position_changed.fire() 

571 

572 @property 

573 def document(self) -> Document: 

574 """ 

575 Return :class:`~prompt_toolkit.document.Document` instance from the 

576 current text, cursor position and selection state. 

577 """ 

578 return self._document_cache[ 

579 self.text, self.cursor_position, self.selection_state 

580 ] 

581 

582 @document.setter 

583 def document(self, value: Document) -> None: 

584 """ 

585 Set :class:`~prompt_toolkit.document.Document` instance. 

586 

587 This will set both the text and cursor position at the same time, but 

588 atomically. (Change events will be triggered only after both have been set.) 

589 """ 

590 self.set_document(value) 

591 

592 def set_document(self, value: Document, bypass_readonly: bool = False) -> None: 

593 """ 

594 Set :class:`~prompt_toolkit.document.Document` instance. Like the 

595 ``document`` property, but accept an ``bypass_readonly`` argument. 

596 

597 :param bypass_readonly: When True, don't raise an 

598 :class:`.EditReadOnlyBuffer` exception, even 

599 when the buffer is read-only. 

600 

601 .. warning:: 

602 

603 When this buffer is read-only and `bypass_readonly` was not passed, 

604 the `EditReadOnlyBuffer` exception will be caught by the 

605 `KeyProcessor` and is silently suppressed. This is important to 

606 keep in mind when writing key bindings, because it won't do what 

607 you expect, and there won't be a stack trace. Use try/finally 

608 around this function if you need some cleanup code. 

609 """ 

610 # Don't allow editing of read-only buffers. 

611 if not bypass_readonly and self.read_only(): 

612 raise EditReadOnlyBuffer() 

613 

614 # Set text and cursor position first. 

615 text_changed = self._set_text(value.text) 

616 cursor_position_changed = self._set_cursor_position(value.cursor_position) 

617 

618 # Now handle change events. (We do this when text/cursor position is 

619 # both set and consistent.) 

620 if text_changed: 

621 self._text_changed() 

622 self.history_search_text = None 

623 

624 if cursor_position_changed: 

625 self._cursor_position_changed() 

626 

627 @property 

628 def is_returnable(self) -> bool: 

629 """ 

630 True when there is something handling accept. 

631 """ 

632 return bool(self.accept_handler) 

633 

634 # End of <getters/setters> 

635 

636 def save_to_undo_stack(self, clear_redo_stack: bool = True) -> None: 

637 """ 

638 Safe current state (input text and cursor position), so that we can 

639 restore it by calling undo. 

640 """ 

641 # Safe if the text is different from the text at the top of the stack 

642 # is different. If the text is the same, just update the cursor position. 

643 if self._undo_stack and self._undo_stack[-1][0] == self.text: 

644 self._undo_stack[-1] = (self._undo_stack[-1][0], self.cursor_position) 

645 else: 

646 self._undo_stack.append((self.text, self.cursor_position)) 

647 

648 # Saving anything to the undo stack, clears the redo stack. 

649 if clear_redo_stack: 

650 self._redo_stack = [] 

651 

652 def transform_lines( 

653 self, 

654 line_index_iterator: Iterable[int], 

655 transform_callback: Callable[[str], str], 

656 ) -> str: 

657 """ 

658 Transforms the text on a range of lines. 

659 When the iterator yield an index not in the range of lines that the 

660 document contains, it skips them silently. 

661 

662 To uppercase some lines:: 

663 

664 new_text = transform_lines(range(5,10), lambda text: text.upper()) 

665 

666 :param line_index_iterator: Iterator of line numbers (int) 

667 :param transform_callback: callable that takes the original text of a 

668 line, and return the new text for this line. 

669 

670 :returns: The new text. 

671 """ 

672 # Split lines 

673 lines = self.text.split("\n") 

674 

675 # Apply transformation 

676 for index in line_index_iterator: 

677 try: 

678 lines[index] = transform_callback(lines[index]) 

679 except IndexError: 

680 pass 

681 

682 return "\n".join(lines) 

683 

684 def transform_current_line(self, transform_callback: Callable[[str], str]) -> None: 

685 """ 

686 Apply the given transformation function to the current line. 

687 

688 :param transform_callback: callable that takes a string and return a new string. 

689 """ 

690 document = self.document 

691 a = document.cursor_position + document.get_start_of_line_position() 

692 b = document.cursor_position + document.get_end_of_line_position() 

693 self.text = ( 

694 document.text[:a] 

695 + transform_callback(document.text[a:b]) 

696 + document.text[b:] 

697 ) 

698 

699 def transform_region( 

700 self, from_: int, to: int, transform_callback: Callable[[str], str] 

701 ) -> None: 

702 """ 

703 Transform a part of the input string. 

704 

705 :param from_: (int) start position. 

706 :param to: (int) end position. 

707 :param transform_callback: Callable which accepts a string and returns 

708 the transformed string. 

709 """ 

710 assert from_ < to 

711 

712 self.text = "".join( 

713 [ 

714 self.text[:from_] 

715 + transform_callback(self.text[from_:to]) 

716 + self.text[to:] 

717 ] 

718 ) 

719 

720 def cursor_left(self, count: int = 1) -> None: 

721 self.cursor_position += self.document.get_cursor_left_position(count=count) 

722 

723 def cursor_right(self, count: int = 1) -> None: 

724 self.cursor_position += self.document.get_cursor_right_position(count=count) 

725 

726 def cursor_up(self, count: int = 1) -> None: 

727 """(for multiline edit). Move cursor to the previous line.""" 

728 original_column = self.preferred_column or self.document.cursor_position_col 

729 self.cursor_position += self.document.get_cursor_up_position( 

730 count=count, preferred_column=original_column 

731 ) 

732 

733 # Remember the original column for the next up/down movement. 

734 self.preferred_column = original_column 

735 

736 def cursor_down(self, count: int = 1) -> None: 

737 """(for multiline edit). Move cursor to the next line.""" 

738 original_column = self.preferred_column or self.document.cursor_position_col 

739 self.cursor_position += self.document.get_cursor_down_position( 

740 count=count, preferred_column=original_column 

741 ) 

742 

743 # Remember the original column for the next up/down movement. 

744 self.preferred_column = original_column 

745 

746 def auto_up( 

747 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False 

748 ) -> None: 

749 """ 

750 If we're not on the first line (of a multiline input) go a line up, 

751 otherwise go back in history. (If nothing is selected.) 

752 """ 

753 if self.complete_state: 

754 self.complete_previous(count=count) 

755 elif self.document.cursor_position_row > 0: 

756 self.cursor_up(count=count) 

757 elif not self.selection_state: 

758 self.history_backward(count=count) 

759 

760 # Go to the start of the line? 

761 if go_to_start_of_line_if_history_changes: 

762 self.cursor_position += self.document.get_start_of_line_position() 

763 

764 def auto_down( 

765 self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False 

766 ) -> None: 

767 """ 

768 If we're not on the last line (of a multiline input) go a line down, 

769 otherwise go forward in history. (If nothing is selected.) 

770 """ 

771 if self.complete_state: 

772 self.complete_next(count=count) 

773 elif self.document.cursor_position_row < self.document.line_count - 1: 

774 self.cursor_down(count=count) 

775 elif not self.selection_state: 

776 self.history_forward(count=count) 

777 

778 # Go to the start of the line? 

779 if go_to_start_of_line_if_history_changes: 

780 self.cursor_position += self.document.get_start_of_line_position() 

781 

782 def delete_before_cursor(self, count: int = 1) -> str: 

783 """ 

784 Delete specified number of characters before cursor and return the 

785 deleted text. 

786 """ 

787 assert count >= 0 

788 deleted = "" 

789 

790 if self.cursor_position > 0: 

791 deleted = self.text[self.cursor_position - count : self.cursor_position] 

792 

793 new_text = ( 

794 self.text[: self.cursor_position - count] 

795 + self.text[self.cursor_position :] 

796 ) 

797 new_cursor_position = self.cursor_position - len(deleted) 

798 

799 # Set new Document atomically. 

800 self.document = Document(new_text, new_cursor_position) 

801 

802 return deleted 

803 

804 def delete(self, count: int = 1) -> str: 

805 """ 

806 Delete specified number of characters and Return the deleted text. 

807 """ 

808 if self.cursor_position < len(self.text): 

809 deleted = self.document.text_after_cursor[:count] 

810 self.text = ( 

811 self.text[: self.cursor_position] 

812 + self.text[self.cursor_position + len(deleted) :] 

813 ) 

814 return deleted 

815 else: 

816 return "" 

817 

818 def join_next_line(self, separator: str = " ") -> None: 

819 """ 

820 Join the next line to the current one by deleting the line ending after 

821 the current line. 

822 """ 

823 if not self.document.on_last_line: 

824 self.cursor_position += self.document.get_end_of_line_position() 

825 self.delete() 

826 

827 # Remove spaces. 

828 self.text = ( 

829 self.document.text_before_cursor 

830 + separator 

831 + self.document.text_after_cursor.lstrip(" ") 

832 ) 

833 

834 def join_selected_lines(self, separator: str = " ") -> None: 

835 """ 

836 Join the selected lines. 

837 """ 

838 assert self.selection_state 

839 

840 # Get lines. 

841 from_, to = sorted( 

842 [self.cursor_position, self.selection_state.original_cursor_position] 

843 ) 

844 

845 before = self.text[:from_] 

846 lines = self.text[from_:to].splitlines() 

847 after = self.text[to:] 

848 

849 # Replace leading spaces with just one space. 

850 lines = [l.lstrip(" ") + separator for l in lines] 

851 

852 # Set new document. 

853 self.document = Document( 

854 text=before + "".join(lines) + after, 

855 cursor_position=len(before + "".join(lines[:-1])) - 1, 

856 ) 

857 

858 def swap_characters_before_cursor(self) -> None: 

859 """ 

860 Swap the last two characters before the cursor. 

861 """ 

862 pos = self.cursor_position 

863 

864 if pos >= 2: 

865 a = self.text[pos - 2] 

866 b = self.text[pos - 1] 

867 

868 self.text = self.text[: pos - 2] + b + a + self.text[pos:] 

869 

870 def go_to_history(self, index: int) -> None: 

871 """ 

872 Go to this item in the history. 

873 """ 

874 if index < len(self._working_lines): 

875 self.working_index = index 

876 self.cursor_position = len(self.text) 

877 

878 def complete_next(self, count: int = 1, disable_wrap_around: bool = False) -> None: 

879 """ 

880 Browse to the next completions. 

881 (Does nothing if there are no completion.) 

882 """ 

883 index: int | None 

884 

885 if self.complete_state: 

886 completions_count = len(self.complete_state.completions) 

887 

888 if self.complete_state.complete_index is None: 

889 index = 0 

890 elif self.complete_state.complete_index == completions_count - 1: 

891 index = None 

892 

893 if disable_wrap_around: 

894 return 

895 else: 

896 index = min( 

897 completions_count - 1, self.complete_state.complete_index + count 

898 ) 

899 self.go_to_completion(index) 

900 

901 def complete_previous( 

902 self, count: int = 1, disable_wrap_around: bool = False 

903 ) -> None: 

904 """ 

905 Browse to the previous completions. 

906 (Does nothing if there are no completion.) 

907 """ 

908 index: int | None 

909 

910 if self.complete_state: 

911 if self.complete_state.complete_index == 0: 

912 index = None 

913 

914 if disable_wrap_around: 

915 return 

916 elif self.complete_state.complete_index is None: 

917 index = len(self.complete_state.completions) - 1 

918 else: 

919 index = max(0, self.complete_state.complete_index - count) 

920 

921 self.go_to_completion(index) 

922 

923 def cancel_completion(self) -> None: 

924 """ 

925 Cancel completion, go back to the original text. 

926 """ 

927 if self.complete_state: 

928 self.go_to_completion(None) 

929 self.complete_state = None 

930 

931 def _set_completions(self, completions: list[Completion]) -> CompletionState: 

932 """ 

933 Start completions. (Generate list of completions and initialize.) 

934 

935 By default, no completion will be selected. 

936 """ 

937 self.complete_state = CompletionState( 

938 original_document=self.document, completions=completions 

939 ) 

940 

941 # Trigger event. This should eventually invalidate the layout. 

942 self.on_completions_changed.fire() 

943 

944 return self.complete_state 

945 

946 def start_history_lines_completion(self) -> None: 

947 """ 

948 Start a completion based on all the other lines in the document and the 

949 history. 

950 """ 

951 found_completions: set[str] = set() 

952 completions = [] 

953 

954 # For every line of the whole history, find matches with the current line. 

955 current_line = self.document.current_line_before_cursor.lstrip() 

956 

957 for i, string in enumerate(self._working_lines): 

958 for j, l in enumerate(string.split("\n")): 

959 l = l.strip() 

960 if l and l.startswith(current_line): 

961 # When a new line has been found. 

962 if l not in found_completions: 

963 found_completions.add(l) 

964 

965 # Create completion. 

966 if i == self.working_index: 

967 display_meta = "Current, line %s" % (j + 1) 

968 else: 

969 display_meta = f"History {i + 1}, line {j + 1}" 

970 

971 completions.append( 

972 Completion( 

973 text=l, 

974 start_position=-len(current_line), 

975 display_meta=display_meta, 

976 ) 

977 ) 

978 

979 self._set_completions(completions=completions[::-1]) 

980 self.go_to_completion(0) 

981 

982 def go_to_completion(self, index: int | None) -> None: 

983 """ 

984 Select a completion from the list of current completions. 

985 """ 

986 assert self.complete_state 

987 

988 # Set new completion 

989 state = self.complete_state 

990 state.go_to_index(index) 

991 

992 # Set text/cursor position 

993 new_text, new_cursor_position = state.new_text_and_position() 

994 self.document = Document(new_text, new_cursor_position) 

995 

996 # (changing text/cursor position will unset complete_state.) 

997 self.complete_state = state 

998 

999 def apply_completion(self, completion: Completion) -> None: 

1000 """ 

1001 Insert a given completion. 

1002 """ 

1003 # If there was already a completion active, cancel that one. 

1004 if self.complete_state: 

1005 self.go_to_completion(None) 

1006 self.complete_state = None 

1007 

1008 # Insert text from the given completion. 

1009 self.delete_before_cursor(-completion.start_position) 

1010 self.insert_text(completion.text) 

1011 

1012 def _set_history_search(self) -> None: 

1013 """ 

1014 Set `history_search_text`. 

1015 (The text before the cursor will be used for filtering the history.) 

1016 """ 

1017 if self.enable_history_search(): 

1018 if self.history_search_text is None: 

1019 self.history_search_text = self.document.text_before_cursor 

1020 else: 

1021 self.history_search_text = None 

1022 

1023 def _history_matches(self, i: int) -> bool: 

1024 """ 

1025 True when the current entry matches the history search. 

1026 (when we don't have history search, it's also True.) 

1027 """ 

1028 return self.history_search_text is None or self._working_lines[i].startswith( 

1029 self.history_search_text 

1030 ) 

1031 

1032 def history_forward(self, count: int = 1) -> None: 

1033 """ 

1034 Move forwards through the history. 

1035 

1036 :param count: Amount of items to move forward. 

1037 """ 

1038 self._set_history_search() 

1039 

1040 # Go forward in history. 

1041 found_something = False 

1042 

1043 for i in range(self.working_index + 1, len(self._working_lines)): 

1044 if self._history_matches(i): 

1045 self.working_index = i 

1046 count -= 1 

1047 found_something = True 

1048 if count == 0: 

1049 break 

1050 

1051 # If we found an entry, move cursor to the end of the first line. 

1052 if found_something: 

1053 self.cursor_position = 0 

1054 self.cursor_position += self.document.get_end_of_line_position() 

1055 

1056 def history_backward(self, count: int = 1) -> None: 

1057 """ 

1058 Move backwards through history. 

1059 """ 

1060 self._set_history_search() 

1061 

1062 # Go back in history. 

1063 found_something = False 

1064 

1065 for i in range(self.working_index - 1, -1, -1): 

1066 if self._history_matches(i): 

1067 self.working_index = i 

1068 count -= 1 

1069 found_something = True 

1070 if count == 0: 

1071 break 

1072 

1073 # If we move to another entry, move cursor to the end of the line. 

1074 if found_something: 

1075 self.cursor_position = len(self.text) 

1076 

1077 def yank_nth_arg(self, n: int | None = None, _yank_last_arg: bool = False) -> None: 

1078 """ 

1079 Pick nth word from previous history entry (depending on current 

1080 `yank_nth_arg_state`) and insert it at current position. Rotate through 

1081 history if called repeatedly. If no `n` has been given, take the first 

1082 argument. (The second word.) 

1083 

1084 :param n: (None or int), The index of the word from the previous line 

1085 to take. 

1086 """ 

1087 assert n is None or isinstance(n, int) 

1088 history_strings = self.history.get_strings() 

1089 

1090 if not len(history_strings): 

1091 return 

1092 

1093 # Make sure we have a `YankNthArgState`. 

1094 if self.yank_nth_arg_state is None: 

1095 state = YankNthArgState(n=-1 if _yank_last_arg else 1) 

1096 else: 

1097 state = self.yank_nth_arg_state 

1098 

1099 if n is not None: 

1100 state.n = n 

1101 

1102 # Get new history position. 

1103 new_pos = state.history_position - 1 

1104 if -new_pos > len(history_strings): 

1105 new_pos = -1 

1106 

1107 # Take argument from line. 

1108 line = history_strings[new_pos] 

1109 

1110 words = [w.strip() for w in _QUOTED_WORDS_RE.split(line)] 

1111 words = [w for w in words if w] 

1112 try: 

1113 word = words[state.n] 

1114 except IndexError: 

1115 word = "" 

1116 

1117 # Insert new argument. 

1118 if state.previous_inserted_word: 

1119 self.delete_before_cursor(len(state.previous_inserted_word)) 

1120 self.insert_text(word) 

1121 

1122 # Save state again for next completion. (Note that the 'insert' 

1123 # operation from above clears `self.yank_nth_arg_state`.) 

1124 state.previous_inserted_word = word 

1125 state.history_position = new_pos 

1126 self.yank_nth_arg_state = state 

1127 

1128 def yank_last_arg(self, n: int | None = None) -> None: 

1129 """ 

1130 Like `yank_nth_arg`, but if no argument has been given, yank the last 

1131 word by default. 

1132 """ 

1133 self.yank_nth_arg(n=n, _yank_last_arg=True) 

1134 

1135 def start_selection( 

1136 self, selection_type: SelectionType = SelectionType.CHARACTERS 

1137 ) -> None: 

1138 """ 

1139 Take the current cursor position as the start of this selection. 

1140 """ 

1141 self.selection_state = SelectionState(self.cursor_position, selection_type) 

1142 

1143 def copy_selection(self, _cut: bool = False) -> ClipboardData: 

1144 """ 

1145 Copy selected text and return :class:`.ClipboardData` instance. 

1146 

1147 Notice that this doesn't store the copied data on the clipboard yet. 

1148 You can store it like this: 

1149 

1150 .. code:: python 

1151 

1152 data = buffer.copy_selection() 

1153 get_app().clipboard.set_data(data) 

1154 """ 

1155 new_document, clipboard_data = self.document.cut_selection() 

1156 if _cut: 

1157 self.document = new_document 

1158 

1159 self.selection_state = None 

1160 return clipboard_data 

1161 

1162 def cut_selection(self) -> ClipboardData: 

1163 """ 

1164 Delete selected text and return :class:`.ClipboardData` instance. 

1165 """ 

1166 return self.copy_selection(_cut=True) 

1167 

1168 def paste_clipboard_data( 

1169 self, 

1170 data: ClipboardData, 

1171 paste_mode: PasteMode = PasteMode.EMACS, 

1172 count: int = 1, 

1173 ) -> None: 

1174 """ 

1175 Insert the data from the clipboard. 

1176 """ 

1177 assert isinstance(data, ClipboardData) 

1178 assert paste_mode in (PasteMode.VI_BEFORE, PasteMode.VI_AFTER, PasteMode.EMACS) 

1179 

1180 original_document = self.document 

1181 self.document = self.document.paste_clipboard_data( 

1182 data, paste_mode=paste_mode, count=count 

1183 ) 

1184 

1185 # Remember original document. This assignment should come at the end, 

1186 # because assigning to 'document' will erase it. 

1187 self.document_before_paste = original_document 

1188 

1189 def newline(self, copy_margin: bool = True) -> None: 

1190 """ 

1191 Insert a line ending at the current position. 

1192 """ 

1193 if copy_margin: 

1194 self.insert_text("\n" + self.document.leading_whitespace_in_current_line) 

1195 else: 

1196 self.insert_text("\n") 

1197 

1198 def insert_line_above(self, copy_margin: bool = True) -> None: 

1199 """ 

1200 Insert a new line above the current one. 

1201 """ 

1202 if copy_margin: 

1203 insert = self.document.leading_whitespace_in_current_line + "\n" 

1204 else: 

1205 insert = "\n" 

1206 

1207 self.cursor_position += self.document.get_start_of_line_position() 

1208 self.insert_text(insert) 

1209 self.cursor_position -= 1 

1210 

1211 def insert_line_below(self, copy_margin: bool = True) -> None: 

1212 """ 

1213 Insert a new line below the current one. 

1214 """ 

1215 if copy_margin: 

1216 insert = "\n" + self.document.leading_whitespace_in_current_line 

1217 else: 

1218 insert = "\n" 

1219 

1220 self.cursor_position += self.document.get_end_of_line_position() 

1221 self.insert_text(insert) 

1222 

1223 def insert_text( 

1224 self, 

1225 data: str, 

1226 overwrite: bool = False, 

1227 move_cursor: bool = True, 

1228 fire_event: bool = True, 

1229 ) -> None: 

1230 """ 

1231 Insert characters at cursor position. 

1232 

1233 :param fire_event: Fire `on_text_insert` event. This is mainly used to 

1234 trigger autocompletion while typing. 

1235 """ 

1236 # Original text & cursor position. 

1237 otext = self.text 

1238 ocpos = self.cursor_position 

1239 

1240 # In insert/text mode. 

1241 if overwrite: 

1242 # Don't overwrite the newline itself. Just before the line ending, 

1243 # it should act like insert mode. 

1244 overwritten_text = otext[ocpos : ocpos + len(data)] 

1245 if "\n" in overwritten_text: 

1246 overwritten_text = overwritten_text[: overwritten_text.find("\n")] 

1247 

1248 text = otext[:ocpos] + data + otext[ocpos + len(overwritten_text) :] 

1249 else: 

1250 text = otext[:ocpos] + data + otext[ocpos:] 

1251 

1252 if move_cursor: 

1253 cpos = self.cursor_position + len(data) 

1254 else: 

1255 cpos = self.cursor_position 

1256 

1257 # Set new document. 

1258 # (Set text and cursor position at the same time. Otherwise, setting 

1259 # the text will fire a change event before the cursor position has been 

1260 # set. It works better to have this atomic.) 

1261 self.document = Document(text, cpos) 

1262 

1263 # Fire 'on_text_insert' event. 

1264 if fire_event: # XXX: rename to `start_complete`. 

1265 self.on_text_insert.fire() 

1266 

1267 # Only complete when "complete_while_typing" is enabled. 

1268 if self.completer and self.complete_while_typing(): 

1269 get_app().create_background_task(self._async_completer()) 

1270 

1271 # Call auto_suggest. 

1272 if self.auto_suggest: 

1273 get_app().create_background_task(self._async_suggester()) 

1274 

1275 def undo(self) -> None: 

1276 # Pop from the undo-stack until we find a text that if different from 

1277 # the current text. (The current logic of `save_to_undo_stack` will 

1278 # cause that the top of the undo stack is usually the same as the 

1279 # current text, so in that case we have to pop twice.) 

1280 while self._undo_stack: 

1281 text, pos = self._undo_stack.pop() 

1282 

1283 if text != self.text: 

1284 # Push current text to redo stack. 

1285 self._redo_stack.append((self.text, self.cursor_position)) 

1286 

1287 # Set new text/cursor_position. 

1288 self.document = Document(text, cursor_position=pos) 

1289 break 

1290 

1291 def redo(self) -> None: 

1292 if self._redo_stack: 

1293 # Copy current state on undo stack. 

1294 self.save_to_undo_stack(clear_redo_stack=False) 

1295 

1296 # Pop state from redo stack. 

1297 text, pos = self._redo_stack.pop() 

1298 self.document = Document(text, cursor_position=pos) 

1299 

1300 def validate(self, set_cursor: bool = False) -> bool: 

1301 """ 

1302 Returns `True` if valid. 

1303 

1304 :param set_cursor: Set the cursor position, if an error was found. 

1305 """ 

1306 # Don't call the validator again, if it was already called for the 

1307 # current input. 

1308 if self.validation_state != ValidationState.UNKNOWN: 

1309 return self.validation_state == ValidationState.VALID 

1310 

1311 # Call validator. 

1312 if self.validator: 

1313 try: 

1314 self.validator.validate(self.document) 

1315 except ValidationError as e: 

1316 # Set cursor position (don't allow invalid values.) 

1317 if set_cursor: 

1318 self.cursor_position = min( 

1319 max(0, e.cursor_position), len(self.text) 

1320 ) 

1321 

1322 self.validation_state = ValidationState.INVALID 

1323 self.validation_error = e 

1324 return False 

1325 

1326 # Handle validation result. 

1327 self.validation_state = ValidationState.VALID 

1328 self.validation_error = None 

1329 return True 

1330 

1331 async def _validate_async(self) -> None: 

1332 """ 

1333 Asynchronous version of `validate()`. 

1334 This one doesn't set the cursor position. 

1335 

1336 We have both variants, because a synchronous version is required. 

1337 Handling the ENTER key needs to be completely synchronous, otherwise 

1338 stuff like type-ahead is going to give very weird results. (People 

1339 could type input while the ENTER key is still processed.) 

1340 

1341 An asynchronous version is required if we have `validate_while_typing` 

1342 enabled. 

1343 """ 

1344 while True: 

1345 # Don't call the validator again, if it was already called for the 

1346 # current input. 

1347 if self.validation_state != ValidationState.UNKNOWN: 

1348 return 

1349 

1350 # Call validator. 

1351 error = None 

1352 document = self.document 

1353 

1354 if self.validator: 

1355 try: 

1356 await self.validator.validate_async(self.document) 

1357 except ValidationError as e: 

1358 error = e 

1359 

1360 # If the document changed during the validation, try again. 

1361 if self.document != document: 

1362 continue 

1363 

1364 # Handle validation result. 

1365 if error: 

1366 self.validation_state = ValidationState.INVALID 

1367 else: 

1368 self.validation_state = ValidationState.VALID 

1369 

1370 self.validation_error = error 

1371 get_app().invalidate() # Trigger redraw (display error). 

1372 

1373 def append_to_history(self) -> None: 

1374 """ 

1375 Append the current input to the history. 

1376 """ 

1377 # Save at the tail of the history. (But don't if the last entry the 

1378 # history is already the same.) 

1379 if self.text: 

1380 history_strings = self.history.get_strings() 

1381 if not len(history_strings) or history_strings[-1] != self.text: 

1382 self.history.append_string(self.text) 

1383 

1384 def _search( 

1385 self, 

1386 search_state: SearchState, 

1387 include_current_position: bool = False, 

1388 count: int = 1, 

1389 ) -> tuple[int, int] | None: 

1390 """ 

1391 Execute search. Return (working_index, cursor_position) tuple when this 

1392 search is applied. Returns `None` when this text cannot be found. 

1393 """ 

1394 assert count > 0 

1395 

1396 text = search_state.text 

1397 direction = search_state.direction 

1398 ignore_case = search_state.ignore_case() 

1399 

1400 def search_once( 

1401 working_index: int, document: Document 

1402 ) -> tuple[int, Document] | None: 

1403 """ 

1404 Do search one time. 

1405 Return (working_index, document) or `None` 

1406 """ 

1407 if direction == SearchDirection.FORWARD: 

1408 # Try find at the current input. 

1409 new_index = document.find( 

1410 text, 

1411 include_current_position=include_current_position, 

1412 ignore_case=ignore_case, 

1413 ) 

1414 

1415 if new_index is not None: 

1416 return ( 

1417 working_index, 

1418 Document(document.text, document.cursor_position + new_index), 

1419 ) 

1420 else: 

1421 # No match, go forward in the history. (Include len+1 to wrap around.) 

1422 # (Here we should always include all cursor positions, because 

1423 # it's a different line.) 

1424 for i in range(working_index + 1, len(self._working_lines) + 1): 

1425 i %= len(self._working_lines) 

1426 

1427 document = Document(self._working_lines[i], 0) 

1428 new_index = document.find( 

1429 text, include_current_position=True, ignore_case=ignore_case 

1430 ) 

1431 if new_index is not None: 

1432 return (i, Document(document.text, new_index)) 

1433 else: 

1434 # Try find at the current input. 

1435 new_index = document.find_backwards(text, ignore_case=ignore_case) 

1436 

1437 if new_index is not None: 

1438 return ( 

1439 working_index, 

1440 Document(document.text, document.cursor_position + new_index), 

1441 ) 

1442 else: 

1443 # No match, go back in the history. (Include -1 to wrap around.) 

1444 for i in range(working_index - 1, -2, -1): 

1445 i %= len(self._working_lines) 

1446 

1447 document = Document( 

1448 self._working_lines[i], len(self._working_lines[i]) 

1449 ) 

1450 new_index = document.find_backwards( 

1451 text, ignore_case=ignore_case 

1452 ) 

1453 if new_index is not None: 

1454 return ( 

1455 i, 

1456 Document(document.text, len(document.text) + new_index), 

1457 ) 

1458 return None 

1459 

1460 # Do 'count' search iterations. 

1461 working_index = self.working_index 

1462 document = self.document 

1463 for _ in range(count): 

1464 result = search_once(working_index, document) 

1465 if result is None: 

1466 return None # Nothing found. 

1467 else: 

1468 working_index, document = result 

1469 

1470 return (working_index, document.cursor_position) 

1471 

1472 def document_for_search(self, search_state: SearchState) -> Document: 

1473 """ 

1474 Return a :class:`~prompt_toolkit.document.Document` instance that has 

1475 the text/cursor position for this search, if we would apply it. This 

1476 will be used in the 

1477 :class:`~prompt_toolkit.layout.BufferControl` to display feedback while 

1478 searching. 

1479 """ 

1480 search_result = self._search(search_state, include_current_position=True) 

1481 

1482 if search_result is None: 

1483 return self.document 

1484 else: 

1485 working_index, cursor_position = search_result 

1486 

1487 # Keep selection, when `working_index` was not changed. 

1488 if working_index == self.working_index: 

1489 selection = self.selection_state 

1490 else: 

1491 selection = None 

1492 

1493 return Document( 

1494 self._working_lines[working_index], cursor_position, selection=selection 

1495 ) 

1496 

1497 def get_search_position( 

1498 self, 

1499 search_state: SearchState, 

1500 include_current_position: bool = True, 

1501 count: int = 1, 

1502 ) -> int: 

1503 """ 

1504 Get the cursor position for this search. 

1505 (This operation won't change the `working_index`. It's won't go through 

1506 the history. Vi text objects can't span multiple items.) 

1507 """ 

1508 search_result = self._search( 

1509 search_state, include_current_position=include_current_position, count=count 

1510 ) 

1511 

1512 if search_result is None: 

1513 return self.cursor_position 

1514 else: 

1515 working_index, cursor_position = search_result 

1516 return cursor_position 

1517 

1518 def apply_search( 

1519 self, 

1520 search_state: SearchState, 

1521 include_current_position: bool = True, 

1522 count: int = 1, 

1523 ) -> None: 

1524 """ 

1525 Apply search. If something is found, set `working_index` and 

1526 `cursor_position`. 

1527 """ 

1528 search_result = self._search( 

1529 search_state, include_current_position=include_current_position, count=count 

1530 ) 

1531 

1532 if search_result is not None: 

1533 working_index, cursor_position = search_result 

1534 self.working_index = working_index 

1535 self.cursor_position = cursor_position 

1536 

1537 def exit_selection(self) -> None: 

1538 self.selection_state = None 

1539 

1540 def _editor_simple_tempfile(self) -> tuple[str, Callable[[], None]]: 

1541 """ 

1542 Simple (file) tempfile implementation. 

1543 Return (tempfile, cleanup_func). 

1544 """ 

1545 suffix = to_str(self.tempfile_suffix) 

1546 descriptor, filename = tempfile.mkstemp(suffix) 

1547 

1548 os.write(descriptor, self.text.encode("utf-8")) 

1549 os.close(descriptor) 

1550 

1551 def cleanup() -> None: 

1552 os.unlink(filename) 

1553 

1554 return filename, cleanup 

1555 

1556 def _editor_complex_tempfile(self) -> tuple[str, Callable[[], None]]: 

1557 # Complex (directory) tempfile implementation. 

1558 headtail = to_str(self.tempfile) 

1559 if not headtail: 

1560 # Revert to simple case. 

1561 return self._editor_simple_tempfile() 

1562 headtail = str(headtail) 

1563 

1564 # Try to make according to tempfile logic. 

1565 head, tail = os.path.split(headtail) 

1566 if os.path.isabs(head): 

1567 head = head[1:] 

1568 

1569 dirpath = tempfile.mkdtemp() 

1570 if head: 

1571 dirpath = os.path.join(dirpath, head) 

1572 # Assume there is no issue creating dirs in this temp dir. 

1573 os.makedirs(dirpath) 

1574 

1575 # Open the filename and write current text. 

1576 filename = os.path.join(dirpath, tail) 

1577 with open(filename, "w", encoding="utf-8") as fh: 

1578 fh.write(self.text) 

1579 

1580 def cleanup() -> None: 

1581 shutil.rmtree(dirpath) 

1582 

1583 return filename, cleanup 

1584 

1585 def open_in_editor(self, validate_and_handle: bool = False) -> asyncio.Task[None]: 

1586 """ 

1587 Open code in editor. 

1588 

1589 This returns a future, and runs in a thread executor. 

1590 """ 

1591 if self.read_only(): 

1592 raise EditReadOnlyBuffer() 

1593 

1594 # Write current text to temporary file 

1595 if self.tempfile: 

1596 filename, cleanup_func = self._editor_complex_tempfile() 

1597 else: 

1598 filename, cleanup_func = self._editor_simple_tempfile() 

1599 

1600 async def run() -> None: 

1601 try: 

1602 # Open in editor 

1603 # (We need to use `run_in_terminal`, because not all editors go to 

1604 # the alternate screen buffer, and some could influence the cursor 

1605 # position.) 

1606 succes = await run_in_terminal( 

1607 lambda: self._open_file_in_editor(filename), in_executor=True 

1608 ) 

1609 

1610 # Read content again. 

1611 if succes: 

1612 with open(filename, "rb") as f: 

1613 text = f.read().decode("utf-8") 

1614 

1615 # Drop trailing newline. (Editors are supposed to add it at the 

1616 # end, but we don't need it.) 

1617 if text.endswith("\n"): 

1618 text = text[:-1] 

1619 

1620 self.document = Document(text=text, cursor_position=len(text)) 

1621 

1622 # Accept the input. 

1623 if validate_and_handle: 

1624 self.validate_and_handle() 

1625 

1626 finally: 

1627 # Clean up temp dir/file. 

1628 cleanup_func() 

1629 

1630 return get_app().create_background_task(run()) 

1631 

1632 def _open_file_in_editor(self, filename: str) -> bool: 

1633 """ 

1634 Call editor executable. 

1635 

1636 Return True when we received a zero return code. 

1637 """ 

1638 # If the 'VISUAL' or 'EDITOR' environment variable has been set, use that. 

1639 # Otherwise, fall back to the first available editor that we can find. 

1640 visual = os.environ.get("VISUAL") 

1641 editor = os.environ.get("EDITOR") 

1642 

1643 editors = [ 

1644 visual, 

1645 editor, 

1646 # Order of preference. 

1647 "/usr/bin/editor", 

1648 "/usr/bin/nano", 

1649 "/usr/bin/pico", 

1650 "/usr/bin/vi", 

1651 "/usr/bin/emacs", 

1652 ] 

1653 

1654 for e in editors: 

1655 if e: 

1656 try: 

1657 # Use 'shlex.split()', because $VISUAL can contain spaces 

1658 # and quotes. 

1659 returncode = subprocess.call(shlex.split(e) + [filename]) 

1660 return returncode == 0 

1661 

1662 except OSError: 

1663 # Executable does not exist, try the next one. 

1664 pass 

1665 

1666 return False 

1667 

1668 def start_completion( 

1669 self, 

1670 select_first: bool = False, 

1671 select_last: bool = False, 

1672 insert_common_part: bool = False, 

1673 complete_event: CompleteEvent | None = None, 

1674 ) -> None: 

1675 """ 

1676 Start asynchronous autocompletion of this buffer. 

1677 (This will do nothing if a previous completion was still in progress.) 

1678 """ 

1679 # Only one of these options can be selected. 

1680 assert select_first + select_last + insert_common_part <= 1 

1681 

1682 get_app().create_background_task( 

1683 self._async_completer( 

1684 select_first=select_first, 

1685 select_last=select_last, 

1686 insert_common_part=insert_common_part, 

1687 complete_event=complete_event 

1688 or CompleteEvent(completion_requested=True), 

1689 ) 

1690 ) 

1691 

1692 def _create_completer_coroutine(self) -> Callable[..., Coroutine[Any, Any, None]]: 

1693 """ 

1694 Create function for asynchronous autocompletion. 

1695 

1696 (This consumes the asynchronous completer generator, which possibly 

1697 runs the completion algorithm in another thread.) 

1698 """ 

1699 

1700 def completion_does_nothing(document: Document, completion: Completion) -> bool: 

1701 """ 

1702 Return `True` if applying this completion doesn't have any effect. 

1703 (When it doesn't insert any new text. 

1704 """ 

1705 text_before_cursor = document.text_before_cursor 

1706 replaced_text = text_before_cursor[ 

1707 len(text_before_cursor) + completion.start_position : 

1708 ] 

1709 return replaced_text == completion.text 

1710 

1711 @_only_one_at_a_time 

1712 async def async_completer( 

1713 select_first: bool = False, 

1714 select_last: bool = False, 

1715 insert_common_part: bool = False, 

1716 complete_event: CompleteEvent | None = None, 

1717 ) -> None: 

1718 document = self.document 

1719 complete_event = complete_event or CompleteEvent(text_inserted=True) 

1720 

1721 # Don't complete when we already have completions. 

1722 if self.complete_state or not self.completer: 

1723 return 

1724 

1725 # Create an empty CompletionState. 

1726 complete_state = CompletionState(original_document=self.document) 

1727 self.complete_state = complete_state 

1728 

1729 def proceed() -> bool: 

1730 """Keep retrieving completions. Input text has not yet changed 

1731 while generating completions.""" 

1732 return self.complete_state == complete_state 

1733 

1734 refresh_needed = asyncio.Event() 

1735 

1736 async def refresh_while_loading() -> None: 

1737 """Background loop to refresh the UI at most 3 times a second 

1738 while the completion are loading. Calling 

1739 `on_completions_changed.fire()` for every completion that we 

1740 receive is too expensive when there are many completions. (We 

1741 could tune `Application.max_render_postpone_time` and 

1742 `Application.min_redraw_interval`, but having this here is a 

1743 better approach.) 

1744 """ 

1745 while True: 

1746 self.on_completions_changed.fire() 

1747 refresh_needed.clear() 

1748 await asyncio.sleep(0.3) 

1749 await refresh_needed.wait() 

1750 

1751 refresh_task = asyncio.ensure_future(refresh_while_loading()) 

1752 try: 

1753 # Load. 

1754 async with aclosing( 

1755 self.completer.get_completions_async(document, complete_event) 

1756 ) as async_generator: 

1757 async for completion in async_generator: 

1758 complete_state.completions.append(completion) 

1759 refresh_needed.set() 

1760 

1761 # If the input text changes, abort. 

1762 if not proceed(): 

1763 break 

1764 finally: 

1765 refresh_task.cancel() 

1766 

1767 # Refresh one final time after we got everything. 

1768 self.on_completions_changed.fire() 

1769 

1770 completions = complete_state.completions 

1771 

1772 # When there is only one completion, which has nothing to add, ignore it. 

1773 if len(completions) == 1 and completion_does_nothing( 

1774 document, completions[0] 

1775 ): 

1776 del completions[:] 

1777 

1778 # Set completions if the text was not yet changed. 

1779 if proceed(): 

1780 # When no completions were found, or when the user selected 

1781 # already a completion by using the arrow keys, don't do anything. 

1782 if ( 

1783 not self.complete_state 

1784 or self.complete_state.complete_index is not None 

1785 ): 

1786 return 

1787 

1788 # When there are no completions, reset completion state anyway. 

1789 if not completions: 

1790 self.complete_state = None 

1791 # Render the ui if the completion menu was shown 

1792 # it is needed especially if there is one completion and it was deleted. 

1793 self.on_completions_changed.fire() 

1794 return 

1795 

1796 # Select first/last or insert common part, depending on the key 

1797 # binding. (For this we have to wait until all completions are 

1798 # loaded.) 

1799 

1800 if select_first: 

1801 self.go_to_completion(0) 

1802 

1803 elif select_last: 

1804 self.go_to_completion(len(completions) - 1) 

1805 

1806 elif insert_common_part: 

1807 common_part = get_common_complete_suffix(document, completions) 

1808 if common_part: 

1809 # Insert the common part, update completions. 

1810 self.insert_text(common_part) 

1811 if len(completions) > 1: 

1812 # (Don't call `async_completer` again, but 

1813 # recalculate completions. See: 

1814 # https://github.com/ipython/ipython/issues/9658) 

1815 completions[:] = [ 

1816 c.new_completion_from_position(len(common_part)) 

1817 for c in completions 

1818 ] 

1819 

1820 self._set_completions(completions=completions) 

1821 else: 

1822 self.complete_state = None 

1823 else: 

1824 # When we were asked to insert the "common" 

1825 # prefix, but there was no common suffix but 

1826 # still exactly one match, then select the 

1827 # first. (It could be that we have a completion 

1828 # which does * expansion, like '*.py', with 

1829 # exactly one match.) 

1830 if len(completions) == 1: 

1831 self.go_to_completion(0) 

1832 

1833 else: 

1834 # If the last operation was an insert, (not a delete), restart 

1835 # the completion coroutine. 

1836 

1837 if self.document.text_before_cursor == document.text_before_cursor: 

1838 return # Nothing changed. 

1839 

1840 if self.document.text_before_cursor.startswith( 

1841 document.text_before_cursor 

1842 ): 

1843 raise _Retry 

1844 

1845 return async_completer 

1846 

1847 def _create_auto_suggest_coroutine(self) -> Callable[[], Coroutine[Any, Any, None]]: 

1848 """ 

1849 Create function for asynchronous auto suggestion. 

1850 (This can be in another thread.) 

1851 """ 

1852 

1853 @_only_one_at_a_time 

1854 async def async_suggestor() -> None: 

1855 document = self.document 

1856 

1857 # Don't suggest when we already have a suggestion. 

1858 if self.suggestion or not self.auto_suggest: 

1859 return 

1860 

1861 suggestion = await self.auto_suggest.get_suggestion_async(self, document) 

1862 

1863 # Set suggestion only if the text was not yet changed. 

1864 if self.document == document: 

1865 # Set suggestion and redraw interface. 

1866 self.suggestion = suggestion 

1867 self.on_suggestion_set.fire() 

1868 else: 

1869 # Otherwise, restart thread. 

1870 raise _Retry 

1871 

1872 return async_suggestor 

1873 

1874 def _create_auto_validate_coroutine( 

1875 self, 

1876 ) -> Callable[[], Coroutine[Any, Any, None]]: 

1877 """ 

1878 Create a function for asynchronous validation while typing. 

1879 (This can be in another thread.) 

1880 """ 

1881 

1882 @_only_one_at_a_time 

1883 async def async_validator() -> None: 

1884 await self._validate_async() 

1885 

1886 return async_validator 

1887 

1888 def validate_and_handle(self) -> None: 

1889 """ 

1890 Validate buffer and handle the accept action. 

1891 """ 

1892 valid = self.validate(set_cursor=True) 

1893 

1894 # When the validation succeeded, accept the input. 

1895 if valid: 

1896 if self.accept_handler: 

1897 keep_text = self.accept_handler(self) 

1898 else: 

1899 keep_text = False 

1900 

1901 self.append_to_history() 

1902 

1903 if not keep_text: 

1904 self.reset() 

1905 

1906 

1907_T = TypeVar("_T", bound=Callable[..., Awaitable[None]]) 

1908 

1909 

1910def _only_one_at_a_time(coroutine: _T) -> _T: 

1911 """ 

1912 Decorator that only starts the coroutine only if the previous call has 

1913 finished. (Used to make sure that we have only one autocompleter, auto 

1914 suggestor and validator running at a time.) 

1915 

1916 When the coroutine raises `_Retry`, it is restarted. 

1917 """ 

1918 running = False 

1919 

1920 @wraps(coroutine) 

1921 async def new_coroutine(*a: Any, **kw: Any) -> Any: 

1922 nonlocal running 

1923 

1924 # Don't start a new function, if the previous is still in progress. 

1925 if running: 

1926 return 

1927 

1928 running = True 

1929 

1930 try: 

1931 while True: 

1932 try: 

1933 await coroutine(*a, **kw) 

1934 except _Retry: 

1935 continue 

1936 else: 

1937 return None 

1938 finally: 

1939 running = False 

1940 

1941 return cast(_T, new_coroutine) 

1942 

1943 

1944class _Retry(Exception): 

1945 "Retry in `_only_one_at_a_time`." 

1946 

1947 

1948def indent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: 

1949 """ 

1950 Indent text of a :class:`.Buffer` object. 

1951 """ 

1952 current_row = buffer.document.cursor_position_row 

1953 line_range = range(from_row, to_row) 

1954 

1955 # Apply transformation. 

1956 new_text = buffer.transform_lines(line_range, lambda l: " " * count + l) 

1957 buffer.document = Document( 

1958 new_text, Document(new_text).translate_row_col_to_index(current_row, 0) 

1959 ) 

1960 

1961 # Go to the start of the line. 

1962 buffer.cursor_position += buffer.document.get_start_of_line_position( 

1963 after_whitespace=True 

1964 ) 

1965 

1966 

1967def unindent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: 

1968 """ 

1969 Unindent text of a :class:`.Buffer` object. 

1970 """ 

1971 current_row = buffer.document.cursor_position_row 

1972 line_range = range(from_row, to_row) 

1973 

1974 def transform(text: str) -> str: 

1975 remove = " " * count 

1976 if text.startswith(remove): 

1977 return text[len(remove) :] 

1978 else: 

1979 return text.lstrip() 

1980 

1981 # Apply transformation. 

1982 new_text = buffer.transform_lines(line_range, transform) 

1983 buffer.document = Document( 

1984 new_text, Document(new_text).translate_row_col_to_index(current_row, 0) 

1985 ) 

1986 

1987 # Go to the start of the line. 

1988 buffer.cursor_position += buffer.document.get_start_of_line_position( 

1989 after_whitespace=True 

1990 ) 

1991 

1992 

1993def reshape_text(buffer: Buffer, from_row: int, to_row: int) -> None: 

1994 """ 

1995 Reformat text, taking the width into account. 

1996 `to_row` is included. 

1997 (Vi 'gq' operator.) 

1998 """ 

1999 lines = buffer.text.splitlines(True) 

2000 lines_before = lines[:from_row] 

2001 lines_after = lines[to_row + 1 :] 

2002 lines_to_reformat = lines[from_row : to_row + 1] 

2003 

2004 if lines_to_reformat: 

2005 # Take indentation from the first line. 

2006 match = re.search(r"^\s*", lines_to_reformat[0]) 

2007 length = match.end() if match else 0 # `match` can't be None, actually. 

2008 

2009 indent = lines_to_reformat[0][:length].replace("\n", "") 

2010 

2011 # Now, take all the 'words' from the lines to be reshaped. 

2012 words = "".join(lines_to_reformat).split() 

2013 

2014 # And reshape. 

2015 width = (buffer.text_width or 80) - len(indent) 

2016 reshaped_text = [indent] 

2017 current_width = 0 

2018 for w in words: 

2019 if current_width: 

2020 if len(w) + current_width + 1 > width: 

2021 reshaped_text.append("\n") 

2022 reshaped_text.append(indent) 

2023 current_width = 0 

2024 else: 

2025 reshaped_text.append(" ") 

2026 current_width += 1 

2027 

2028 reshaped_text.append(w) 

2029 current_width += len(w) 

2030 

2031 if reshaped_text[-1] != "\n": 

2032 reshaped_text.append("\n") 

2033 

2034 # Apply result. 

2035 buffer.document = Document( 

2036 text="".join(lines_before + reshaped_text + lines_after), 

2037 cursor_position=len("".join(lines_before + reshaped_text)), 

2038 )