Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/layout/processors.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

381 statements  

1""" 

2Processors are little transformation blocks that transform the fragments list 

3from a buffer before the BufferControl will render it to the screen. 

4 

5They can insert fragments before or after, or highlight fragments by replacing the 

6fragment types. 

7""" 

8 

9from __future__ import annotations 

10 

11import re 

12from abc import ABCMeta, abstractmethod 

13from typing import TYPE_CHECKING, Callable, Hashable, cast 

14 

15from prompt_toolkit.application.current import get_app 

16from prompt_toolkit.cache import SimpleCache 

17from prompt_toolkit.document import Document 

18from prompt_toolkit.filters import FilterOrBool, to_filter, vi_insert_multiple_mode 

19from prompt_toolkit.formatted_text import ( 

20 AnyFormattedText, 

21 StyleAndTextTuples, 

22 to_formatted_text, 

23) 

24from prompt_toolkit.formatted_text.utils import fragment_list_len, fragment_list_to_text 

25from prompt_toolkit.search import SearchDirection 

26from prompt_toolkit.utils import to_int, to_str 

27 

28from .utils import explode_text_fragments 

29 

30if TYPE_CHECKING: 

31 from .controls import BufferControl, UIContent 

32 

33__all__ = [ 

34 "Processor", 

35 "TransformationInput", 

36 "Transformation", 

37 "DummyProcessor", 

38 "HighlightSearchProcessor", 

39 "HighlightIncrementalSearchProcessor", 

40 "HighlightSelectionProcessor", 

41 "PasswordProcessor", 

42 "HighlightMatchingBracketProcessor", 

43 "DisplayMultipleCursors", 

44 "BeforeInput", 

45 "ShowArg", 

46 "AfterInput", 

47 "AppendAutoSuggestion", 

48 "ConditionalProcessor", 

49 "ShowLeadingWhiteSpaceProcessor", 

50 "ShowTrailingWhiteSpaceProcessor", 

51 "TabsProcessor", 

52 "ReverseSearchProcessor", 

53 "DynamicProcessor", 

54 "merge_processors", 

55] 

56 

57 

58class Processor(metaclass=ABCMeta): 

59 """ 

60 Manipulate the fragments for a given line in a 

61 :class:`~prompt_toolkit.layout.controls.BufferControl`. 

62 """ 

63 

64 @abstractmethod 

65 def apply_transformation( 

66 self, transformation_input: TransformationInput 

67 ) -> Transformation: 

68 """ 

69 Apply transformation. Returns a :class:`.Transformation` instance. 

70 

71 :param transformation_input: :class:`.TransformationInput` object. 

72 """ 

73 return Transformation(transformation_input.fragments) 

74 

75 

76SourceToDisplay = Callable[[int], int] 

77DisplayToSource = Callable[[int], int] 

78 

79 

80class TransformationInput: 

81 """ 

82 :param buffer_control: :class:`.BufferControl` instance. 

83 :param lineno: The number of the line to which we apply the processor. 

84 :param source_to_display: A function that returns the position in the 

85 `fragments` for any position in the source string. (This takes 

86 previous processors into account.) 

87 :param fragments: List of fragments that we can transform. (Received from the 

88 previous processor.) 

89 :param get_line: Optional ; a callable that returns the fragments of another 

90 line in the current buffer; This can be used to create processors capable 

91 of affecting transforms across multiple lines. 

92 """ 

93 

94 def __init__( 

95 self, 

96 buffer_control: BufferControl, 

97 document: Document, 

98 lineno: int, 

99 source_to_display: SourceToDisplay, 

100 fragments: StyleAndTextTuples, 

101 width: int, 

102 height: int, 

103 get_line: Callable[[int], StyleAndTextTuples] | None = None, 

104 ) -> None: 

105 self.buffer_control = buffer_control 

106 self.document = document 

107 self.lineno = lineno 

108 self.source_to_display = source_to_display 

109 self.fragments = fragments 

110 self.width = width 

111 self.height = height 

112 self.get_line = get_line 

113 

114 def unpack( 

115 self, 

116 ) -> tuple[ 

117 BufferControl, Document, int, SourceToDisplay, StyleAndTextTuples, int, int 

118 ]: 

119 return ( 

120 self.buffer_control, 

121 self.document, 

122 self.lineno, 

123 self.source_to_display, 

124 self.fragments, 

125 self.width, 

126 self.height, 

127 ) 

128 

129 

130class Transformation: 

131 """ 

132 Transformation result, as returned by :meth:`.Processor.apply_transformation`. 

133 

134 Important: Always make sure that the length of `document.text` is equal to 

135 the length of all the text in `fragments`! 

136 

137 :param fragments: The transformed fragments. To be displayed, or to pass to 

138 the next processor. 

139 :param source_to_display: Cursor position transformation from original 

140 string to transformed string. 

141 :param display_to_source: Cursor position transformed from source string to 

142 original string. 

143 """ 

144 

145 def __init__( 

146 self, 

147 fragments: StyleAndTextTuples, 

148 source_to_display: SourceToDisplay | None = None, 

149 display_to_source: DisplayToSource | None = None, 

150 ) -> None: 

151 self.fragments = fragments 

152 self.source_to_display = source_to_display or (lambda i: i) 

153 self.display_to_source = display_to_source or (lambda i: i) 

154 

155 

156class DummyProcessor(Processor): 

157 """ 

158 A `Processor` that doesn't do anything. 

159 """ 

160 

161 def apply_transformation( 

162 self, transformation_input: TransformationInput 

163 ) -> Transformation: 

164 return Transformation(transformation_input.fragments) 

165 

166 

167class HighlightSearchProcessor(Processor): 

168 """ 

169 Processor that highlights search matches in the document. 

170 Note that this doesn't support multiline search matches yet. 

171 

172 The style classes 'search' and 'search.current' will be applied to the 

173 content. 

174 """ 

175 

176 _classname = "search" 

177 _classname_current = "search.current" 

178 

179 def _get_search_text(self, buffer_control: BufferControl) -> str: 

180 """ 

181 The text we are searching for. 

182 """ 

183 return buffer_control.search_state.text 

184 

185 def apply_transformation( 

186 self, transformation_input: TransformationInput 

187 ) -> Transformation: 

188 ( 

189 buffer_control, 

190 document, 

191 lineno, 

192 source_to_display, 

193 fragments, 

194 _, 

195 _, 

196 ) = transformation_input.unpack() 

197 

198 search_text = self._get_search_text(buffer_control) 

199 searchmatch_fragment = f" class:{self._classname} " 

200 searchmatch_current_fragment = f" class:{self._classname_current} " 

201 

202 if search_text and not get_app().is_done: 

203 # For each search match, replace the style string. 

204 line_text = fragment_list_to_text(fragments) 

205 fragments = explode_text_fragments(fragments) 

206 

207 if buffer_control.search_state.ignore_case(): 

208 flags = re.IGNORECASE 

209 else: 

210 flags = re.RegexFlag(0) 

211 

212 # Get cursor column. 

213 cursor_column: int | None 

214 if document.cursor_position_row == lineno: 

215 cursor_column = source_to_display(document.cursor_position_col) 

216 else: 

217 cursor_column = None 

218 

219 for match in re.finditer(re.escape(search_text), line_text, flags=flags): 

220 if cursor_column is not None: 

221 on_cursor = match.start() <= cursor_column < match.end() 

222 else: 

223 on_cursor = False 

224 

225 for i in range(match.start(), match.end()): 

226 old_fragment, text, *_ = fragments[i] 

227 if on_cursor: 

228 fragments[i] = ( 

229 old_fragment + searchmatch_current_fragment, 

230 fragments[i][1], 

231 ) 

232 else: 

233 fragments[i] = ( 

234 old_fragment + searchmatch_fragment, 

235 fragments[i][1], 

236 ) 

237 

238 return Transformation(fragments) 

239 

240 

241class HighlightIncrementalSearchProcessor(HighlightSearchProcessor): 

242 """ 

243 Highlight the search terms that are used for highlighting the incremental 

244 search. The style class 'incsearch' will be applied to the content. 

245 

246 Important: this requires the `preview_search=True` flag to be set for the 

247 `BufferControl`. Otherwise, the cursor position won't be set to the search 

248 match while searching, and nothing happens. 

249 """ 

250 

251 _classname = "incsearch" 

252 _classname_current = "incsearch.current" 

253 

254 def _get_search_text(self, buffer_control: BufferControl) -> str: 

255 """ 

256 The text we are searching for. 

257 """ 

258 # When the search buffer has focus, take that text. 

259 search_buffer = buffer_control.search_buffer 

260 if search_buffer is not None and search_buffer.text: 

261 return search_buffer.text 

262 return "" 

263 

264 

265class HighlightSelectionProcessor(Processor): 

266 """ 

267 Processor that highlights the selection in the document. 

268 """ 

269 

270 def apply_transformation( 

271 self, transformation_input: TransformationInput 

272 ) -> Transformation: 

273 ( 

274 buffer_control, 

275 document, 

276 lineno, 

277 source_to_display, 

278 fragments, 

279 _, 

280 _, 

281 ) = transformation_input.unpack() 

282 

283 selected_fragment = " class:selected " 

284 

285 # In case of selection, highlight all matches. 

286 selection_at_line = document.selection_range_at_line(lineno) 

287 

288 if selection_at_line: 

289 from_, to = selection_at_line 

290 from_ = source_to_display(from_) 

291 to = source_to_display(to) 

292 

293 fragments = explode_text_fragments(fragments) 

294 

295 if from_ == 0 and to == 0 and len(fragments) == 0: 

296 # When this is an empty line, insert a space in order to 

297 # visualize the selection. 

298 return Transformation([(selected_fragment, " ")]) 

299 else: 

300 for i in range(from_, to): 

301 if i < len(fragments): 

302 old_fragment, old_text, *_ = fragments[i] 

303 fragments[i] = (old_fragment + selected_fragment, old_text) 

304 elif i == len(fragments): 

305 fragments.append((selected_fragment, " ")) 

306 

307 return Transformation(fragments) 

308 

309 

310class PasswordProcessor(Processor): 

311 """ 

312 Processor that masks the input. (For passwords.) 

313 

314 :param char: (string) Character to be used. "*" by default. 

315 """ 

316 

317 def __init__(self, char: str = "*") -> None: 

318 self.char = char 

319 

320 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

321 fragments: StyleAndTextTuples = cast( 

322 StyleAndTextTuples, 

323 [ 

324 (style, self.char * len(text), *handler) 

325 for style, text, *handler in ti.fragments 

326 ], 

327 ) 

328 

329 return Transformation(fragments) 

330 

331 

332class HighlightMatchingBracketProcessor(Processor): 

333 """ 

334 When the cursor is on or right after a bracket, it highlights the matching 

335 bracket. 

336 

337 :param max_cursor_distance: Only highlight matching brackets when the 

338 cursor is within this distance. (From inside a `Processor`, we can't 

339 know which lines will be visible on the screen. But we also don't want 

340 to scan the whole document for matching brackets on each key press, so 

341 we limit to this value.) 

342 """ 

343 

344 _closing_braces = "])}>" 

345 

346 def __init__( 

347 self, chars: str = "[](){}<>", max_cursor_distance: int = 1000 

348 ) -> None: 

349 self.chars = chars 

350 self.max_cursor_distance = max_cursor_distance 

351 

352 self._positions_cache: SimpleCache[Hashable, list[tuple[int, int]]] = ( 

353 SimpleCache(maxsize=8) 

354 ) 

355 

356 def _get_positions_to_highlight(self, document: Document) -> list[tuple[int, int]]: 

357 """ 

358 Return a list of (row, col) tuples that need to be highlighted. 

359 """ 

360 pos: int | None 

361 

362 # Try for the character under the cursor. 

363 if document.current_char and document.current_char in self.chars: 

364 pos = document.find_matching_bracket_position( 

365 start_pos=document.cursor_position - self.max_cursor_distance, 

366 end_pos=document.cursor_position + self.max_cursor_distance, 

367 ) 

368 

369 # Try for the character before the cursor. 

370 elif ( 

371 document.char_before_cursor 

372 and document.char_before_cursor in self._closing_braces 

373 and document.char_before_cursor in self.chars 

374 ): 

375 document = Document(document.text, document.cursor_position - 1) 

376 

377 pos = document.find_matching_bracket_position( 

378 start_pos=document.cursor_position - self.max_cursor_distance, 

379 end_pos=document.cursor_position + self.max_cursor_distance, 

380 ) 

381 else: 

382 pos = None 

383 

384 # Return a list of (row, col) tuples that need to be highlighted. 

385 if pos: 

386 pos += document.cursor_position # pos is relative. 

387 row, col = document.translate_index_to_position(pos) 

388 return [ 

389 (row, col), 

390 (document.cursor_position_row, document.cursor_position_col), 

391 ] 

392 else: 

393 return [] 

394 

395 def apply_transformation( 

396 self, transformation_input: TransformationInput 

397 ) -> Transformation: 

398 ( 

399 buffer_control, 

400 document, 

401 lineno, 

402 source_to_display, 

403 fragments, 

404 _, 

405 _, 

406 ) = transformation_input.unpack() 

407 

408 # When the application is in the 'done' state, don't highlight. 

409 if get_app().is_done: 

410 return Transformation(fragments) 

411 

412 # Get the highlight positions. 

413 key = (get_app().render_counter, document.text, document.cursor_position) 

414 positions = self._positions_cache.get( 

415 key, lambda: self._get_positions_to_highlight(document) 

416 ) 

417 

418 # Apply if positions were found at this line. 

419 if positions: 

420 for row, col in positions: 

421 if row == lineno: 

422 col = source_to_display(col) 

423 fragments = explode_text_fragments(fragments) 

424 style, text, *_ = fragments[col] 

425 

426 if col == document.cursor_position_col: 

427 style += " class:matching-bracket.cursor " 

428 else: 

429 style += " class:matching-bracket.other " 

430 

431 fragments[col] = (style, text) 

432 

433 return Transformation(fragments) 

434 

435 

436class DisplayMultipleCursors(Processor): 

437 """ 

438 When we're in Vi block insert mode, display all the cursors. 

439 """ 

440 

441 def apply_transformation( 

442 self, transformation_input: TransformationInput 

443 ) -> Transformation: 

444 ( 

445 buffer_control, 

446 document, 

447 lineno, 

448 source_to_display, 

449 fragments, 

450 _, 

451 _, 

452 ) = transformation_input.unpack() 

453 

454 buff = buffer_control.buffer 

455 

456 if vi_insert_multiple_mode(): 

457 cursor_positions = buff.multiple_cursor_positions 

458 fragments = explode_text_fragments(fragments) 

459 

460 # If any cursor appears on the current line, highlight that. 

461 start_pos = document.translate_row_col_to_index(lineno, 0) 

462 end_pos = start_pos + len(document.lines[lineno]) 

463 

464 fragment_suffix = " class:multiple-cursors" 

465 

466 for p in cursor_positions: 

467 if start_pos <= p <= end_pos: 

468 column = source_to_display(p - start_pos) 

469 

470 # Replace fragment. 

471 try: 

472 style, text, *_ = fragments[column] 

473 except IndexError: 

474 # Cursor needs to be displayed after the current text. 

475 fragments.append((fragment_suffix, " ")) 

476 else: 

477 style += fragment_suffix 

478 fragments[column] = (style, text) 

479 

480 return Transformation(fragments) 

481 else: 

482 return Transformation(fragments) 

483 

484 

485class BeforeInput(Processor): 

486 """ 

487 Insert text before the input. 

488 

489 :param text: This can be either plain text or formatted text 

490 (or a callable that returns any of those). 

491 :param style: style to be applied to this prompt/prefix. 

492 """ 

493 

494 def __init__(self, text: AnyFormattedText, style: str = "") -> None: 

495 self.text = text 

496 self.style = style 

497 

498 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

499 source_to_display: SourceToDisplay | None 

500 display_to_source: DisplayToSource | None 

501 

502 if ti.lineno == 0: 

503 # Get fragments. 

504 fragments_before = to_formatted_text(self.text, self.style) 

505 fragments = fragments_before + ti.fragments 

506 

507 shift_position = fragment_list_len(fragments_before) 

508 source_to_display = lambda i: i + shift_position 

509 display_to_source = lambda i: i - shift_position 

510 else: 

511 fragments = ti.fragments 

512 source_to_display = None 

513 display_to_source = None 

514 

515 return Transformation( 

516 fragments, 

517 source_to_display=source_to_display, 

518 display_to_source=display_to_source, 

519 ) 

520 

521 def __repr__(self) -> str: 

522 return f"BeforeInput({self.text!r}, {self.style!r})" 

523 

524 

525class ShowArg(BeforeInput): 

526 """ 

527 Display the 'arg' in front of the input. 

528 

529 This was used by the `PromptSession`, but now it uses the 

530 `Window.get_line_prefix` function instead. 

531 """ 

532 

533 def __init__(self) -> None: 

534 super().__init__(self._get_text_fragments) 

535 

536 def _get_text_fragments(self) -> StyleAndTextTuples: 

537 app = get_app() 

538 if app.key_processor.arg is None: 

539 return [] 

540 else: 

541 arg = app.key_processor.arg 

542 

543 return [ 

544 ("class:prompt.arg", "(arg: "), 

545 ("class:prompt.arg.text", str(arg)), 

546 ("class:prompt.arg", ") "), 

547 ] 

548 

549 def __repr__(self) -> str: 

550 return "ShowArg()" 

551 

552 

553class AfterInput(Processor): 

554 """ 

555 Insert text after the input. 

556 

557 :param text: This can be either plain text or formatted text 

558 (or a callable that returns any of those). 

559 :param style: style to be applied to this prompt/prefix. 

560 """ 

561 

562 def __init__(self, text: AnyFormattedText, style: str = "") -> None: 

563 self.text = text 

564 self.style = style 

565 

566 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

567 # Insert fragments after the last line. 

568 if ti.lineno == ti.document.line_count - 1: 

569 # Get fragments. 

570 fragments_after = to_formatted_text(self.text, self.style) 

571 return Transformation(fragments=ti.fragments + fragments_after) 

572 else: 

573 return Transformation(fragments=ti.fragments) 

574 

575 def __repr__(self) -> str: 

576 return f"{self.__class__.__name__}({self.text!r}, style={self.style!r})" 

577 

578 

579class AppendAutoSuggestion(Processor): 

580 """ 

581 Append the auto suggestion to the input. 

582 (The user can then press the right arrow the insert the suggestion.) 

583 """ 

584 

585 def __init__(self, style: str = "class:auto-suggestion") -> None: 

586 self.style = style 

587 

588 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

589 # Insert fragments after the last line. 

590 if ti.lineno == ti.document.line_count - 1: 

591 buffer = ti.buffer_control.buffer 

592 

593 if buffer.suggestion and ti.document.is_cursor_at_the_end: 

594 suggestion = buffer.suggestion.text 

595 else: 

596 suggestion = "" 

597 

598 return Transformation(fragments=ti.fragments + [(self.style, suggestion)]) 

599 else: 

600 return Transformation(fragments=ti.fragments) 

601 

602 

603class ShowLeadingWhiteSpaceProcessor(Processor): 

604 """ 

605 Make leading whitespace visible. 

606 

607 :param get_char: Callable that returns one character. 

608 """ 

609 

610 def __init__( 

611 self, 

612 get_char: Callable[[], str] | None = None, 

613 style: str = "class:leading-whitespace", 

614 ) -> None: 

615 def default_get_char() -> str: 

616 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?": 

617 return "." 

618 else: 

619 return "\xb7" 

620 

621 self.style = style 

622 self.get_char = get_char or default_get_char 

623 

624 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

625 fragments = ti.fragments 

626 

627 # Walk through all te fragments. 

628 if fragments and fragment_list_to_text(fragments).startswith(" "): 

629 t = (self.style, self.get_char()) 

630 fragments = explode_text_fragments(fragments) 

631 

632 for i in range(len(fragments)): 

633 if fragments[i][1] == " ": 

634 fragments[i] = t 

635 else: 

636 break 

637 

638 return Transformation(fragments) 

639 

640 

641class ShowTrailingWhiteSpaceProcessor(Processor): 

642 """ 

643 Make trailing whitespace visible. 

644 

645 :param get_char: Callable that returns one character. 

646 """ 

647 

648 def __init__( 

649 self, 

650 get_char: Callable[[], str] | None = None, 

651 style: str = "class:training-whitespace", 

652 ) -> None: 

653 def default_get_char() -> str: 

654 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?": 

655 return "." 

656 else: 

657 return "\xb7" 

658 

659 self.style = style 

660 self.get_char = get_char or default_get_char 

661 

662 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

663 fragments = ti.fragments 

664 

665 if fragments and fragments[-1][1].endswith(" "): 

666 t = (self.style, self.get_char()) 

667 fragments = explode_text_fragments(fragments) 

668 

669 # Walk backwards through all te fragments and replace whitespace. 

670 for i in range(len(fragments) - 1, -1, -1): 

671 char = fragments[i][1] 

672 if char == " ": 

673 fragments[i] = t 

674 else: 

675 break 

676 

677 return Transformation(fragments) 

678 

679 

680class TabsProcessor(Processor): 

681 """ 

682 Render tabs as spaces (instead of ^I) or make them visible (for instance, 

683 by replacing them with dots.) 

684 

685 :param tabstop: Horizontal space taken by a tab. (`int` or callable that 

686 returns an `int`). 

687 :param char1: Character or callable that returns a character (text of 

688 length one). This one is used for the first space taken by the tab. 

689 :param char2: Like `char1`, but for the rest of the space. 

690 """ 

691 

692 def __init__( 

693 self, 

694 tabstop: int | Callable[[], int] = 4, 

695 char1: str | Callable[[], str] = "|", 

696 char2: str | Callable[[], str] = "\u2508", 

697 style: str = "class:tab", 

698 ) -> None: 

699 self.char1 = char1 

700 self.char2 = char2 

701 self.tabstop = tabstop 

702 self.style = style 

703 

704 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

705 tabstop = to_int(self.tabstop) 

706 style = self.style 

707 

708 # Create separator for tabs. 

709 separator1 = to_str(self.char1) 

710 separator2 = to_str(self.char2) 

711 

712 # Transform fragments. 

713 fragments = explode_text_fragments(ti.fragments) 

714 

715 position_mappings = {} 

716 result_fragments: StyleAndTextTuples = [] 

717 pos = 0 

718 

719 for i, fragment_and_text in enumerate(fragments): 

720 position_mappings[i] = pos 

721 

722 if fragment_and_text[1] == "\t": 

723 # Calculate how many characters we have to insert. 

724 count = tabstop - (pos % tabstop) 

725 if count == 0: 

726 count = tabstop 

727 

728 # Insert tab. 

729 result_fragments.append((style, separator1)) 

730 result_fragments.append((style, separator2 * (count - 1))) 

731 pos += count 

732 else: 

733 result_fragments.append(fragment_and_text) 

734 pos += 1 

735 

736 position_mappings[len(fragments)] = pos 

737 # Add `pos+1` to mapping, because the cursor can be right after the 

738 # line as well. 

739 position_mappings[len(fragments) + 1] = pos + 1 

740 

741 def source_to_display(from_position: int) -> int: 

742 "Maps original cursor position to the new one." 

743 return position_mappings[from_position] 

744 

745 def display_to_source(display_pos: int) -> int: 

746 "Maps display cursor position to the original one." 

747 position_mappings_reversed = {v: k for k, v in position_mappings.items()} 

748 

749 while display_pos >= 0: 

750 try: 

751 return position_mappings_reversed[display_pos] 

752 except KeyError: 

753 display_pos -= 1 

754 return 0 

755 

756 return Transformation( 

757 result_fragments, 

758 source_to_display=source_to_display, 

759 display_to_source=display_to_source, 

760 ) 

761 

762 

763class ReverseSearchProcessor(Processor): 

764 """ 

765 Process to display the "(reverse-i-search)`...`:..." stuff around 

766 the search buffer. 

767 

768 Note: This processor is meant to be applied to the BufferControl that 

769 contains the search buffer, it's not meant for the original input. 

770 """ 

771 

772 _excluded_input_processors: list[type[Processor]] = [ 

773 HighlightSearchProcessor, 

774 HighlightSelectionProcessor, 

775 BeforeInput, 

776 AfterInput, 

777 ] 

778 

779 def _get_main_buffer(self, buffer_control: BufferControl) -> BufferControl | None: 

780 from prompt_toolkit.layout.controls import BufferControl 

781 

782 prev_control = get_app().layout.search_target_buffer_control 

783 if ( 

784 isinstance(prev_control, BufferControl) 

785 and prev_control.search_buffer_control == buffer_control 

786 ): 

787 return prev_control 

788 return None 

789 

790 def _content( 

791 self, main_control: BufferControl, ti: TransformationInput 

792 ) -> UIContent: 

793 from prompt_toolkit.layout.controls import BufferControl 

794 

795 # Emulate the BufferControl through which we are searching. 

796 # For this we filter out some of the input processors. 

797 excluded_processors = tuple(self._excluded_input_processors) 

798 

799 def filter_processor(item: Processor) -> Processor | None: 

800 """Filter processors from the main control that we want to disable 

801 here. This returns either an accepted processor or None.""" 

802 # For a `_MergedProcessor`, check each individual processor, recursively. 

803 if isinstance(item, _MergedProcessor): 

804 accepted_processors = [filter_processor(p) for p in item.processors] 

805 return merge_processors( 

806 [p for p in accepted_processors if p is not None] 

807 ) 

808 

809 # For a `ConditionalProcessor`, check the body. 

810 elif isinstance(item, ConditionalProcessor): 

811 p = filter_processor(item.processor) 

812 if p: 

813 return ConditionalProcessor(p, item.filter) 

814 

815 # Otherwise, check the processor itself. 

816 else: 

817 if not isinstance(item, excluded_processors): 

818 return item 

819 

820 return None 

821 

822 filtered_processor = filter_processor( 

823 merge_processors(main_control.input_processors or []) 

824 ) 

825 highlight_processor = HighlightIncrementalSearchProcessor() 

826 

827 if filtered_processor: 

828 new_processors = [filtered_processor, highlight_processor] 

829 else: 

830 new_processors = [highlight_processor] 

831 

832 from .controls import SearchBufferControl 

833 

834 assert isinstance(ti.buffer_control, SearchBufferControl) 

835 

836 buffer_control = BufferControl( 

837 buffer=main_control.buffer, 

838 input_processors=new_processors, 

839 include_default_input_processors=False, 

840 lexer=main_control.lexer, 

841 preview_search=True, 

842 search_buffer_control=ti.buffer_control, 

843 ) 

844 

845 return buffer_control.create_content(ti.width, ti.height, preview_search=True) 

846 

847 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

848 from .controls import SearchBufferControl 

849 

850 assert isinstance(ti.buffer_control, SearchBufferControl), ( 

851 "`ReverseSearchProcessor` should be applied to a `SearchBufferControl` only." 

852 ) 

853 

854 source_to_display: SourceToDisplay | None 

855 display_to_source: DisplayToSource | None 

856 

857 main_control = self._get_main_buffer(ti.buffer_control) 

858 

859 if ti.lineno == 0 and main_control: 

860 content = self._content(main_control, ti) 

861 

862 # Get the line from the original document for this search. 

863 line_fragments = content.get_line(content.cursor_position.y) 

864 

865 if main_control.search_state.direction == SearchDirection.FORWARD: 

866 direction_text = "i-search" 

867 else: 

868 direction_text = "reverse-i-search" 

869 

870 fragments_before: StyleAndTextTuples = [ 

871 ("class:prompt.search", "("), 

872 ("class:prompt.search", direction_text), 

873 ("class:prompt.search", ")`"), 

874 ] 

875 

876 fragments = ( 

877 fragments_before 

878 + [ 

879 ("class:prompt.search.text", fragment_list_to_text(ti.fragments)), 

880 ("", "': "), 

881 ] 

882 + line_fragments 

883 ) 

884 

885 shift_position = fragment_list_len(fragments_before) 

886 source_to_display = lambda i: i + shift_position 

887 display_to_source = lambda i: i - shift_position 

888 else: 

889 source_to_display = None 

890 display_to_source = None 

891 fragments = ti.fragments 

892 

893 return Transformation( 

894 fragments, 

895 source_to_display=source_to_display, 

896 display_to_source=display_to_source, 

897 ) 

898 

899 

900class ConditionalProcessor(Processor): 

901 """ 

902 Processor that applies another processor, according to a certain condition. 

903 Example:: 

904 

905 # Create a function that returns whether or not the processor should 

906 # currently be applied. 

907 def highlight_enabled(): 

908 return true_or_false 

909 

910 # Wrapped it in a `ConditionalProcessor` for usage in a `BufferControl`. 

911 BufferControl(input_processors=[ 

912 ConditionalProcessor(HighlightSearchProcessor(), 

913 Condition(highlight_enabled))]) 

914 

915 :param processor: :class:`.Processor` instance. 

916 :param filter: :class:`~prompt_toolkit.filters.Filter` instance. 

917 """ 

918 

919 def __init__(self, processor: Processor, filter: FilterOrBool) -> None: 

920 self.processor = processor 

921 self.filter = to_filter(filter) 

922 

923 def apply_transformation( 

924 self, transformation_input: TransformationInput 

925 ) -> Transformation: 

926 # Run processor when enabled. 

927 if self.filter(): 

928 return self.processor.apply_transformation(transformation_input) 

929 else: 

930 return Transformation(transformation_input.fragments) 

931 

932 def __repr__(self) -> str: 

933 return f"{self.__class__.__name__}(processor={self.processor!r}, filter={self.filter!r})" 

934 

935 

936class DynamicProcessor(Processor): 

937 """ 

938 Processor class that dynamically returns any Processor. 

939 

940 :param get_processor: Callable that returns a :class:`.Processor` instance. 

941 """ 

942 

943 def __init__(self, get_processor: Callable[[], Processor | None]) -> None: 

944 self.get_processor = get_processor 

945 

946 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

947 processor = self.get_processor() or DummyProcessor() 

948 return processor.apply_transformation(ti) 

949 

950 

951def merge_processors(processors: list[Processor]) -> Processor: 

952 """ 

953 Merge multiple `Processor` objects into one. 

954 """ 

955 if len(processors) == 0: 

956 return DummyProcessor() 

957 

958 if len(processors) == 1: 

959 return processors[0] # Nothing to merge. 

960 

961 return _MergedProcessor(processors) 

962 

963 

964class _MergedProcessor(Processor): 

965 """ 

966 Processor that groups multiple other `Processor` objects, but exposes an 

967 API as if it is one `Processor`. 

968 """ 

969 

970 def __init__(self, processors: list[Processor]): 

971 self.processors = processors 

972 

973 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

974 source_to_display_functions = [ti.source_to_display] 

975 display_to_source_functions = [] 

976 fragments = ti.fragments 

977 

978 def source_to_display(i: int) -> int: 

979 """Translate x position from the buffer to the x position in the 

980 processor fragments list.""" 

981 for f in source_to_display_functions: 

982 i = f(i) 

983 return i 

984 

985 for p in self.processors: 

986 transformation = p.apply_transformation( 

987 TransformationInput( 

988 ti.buffer_control, 

989 ti.document, 

990 ti.lineno, 

991 source_to_display, 

992 fragments, 

993 ti.width, 

994 ti.height, 

995 ti.get_line, 

996 ) 

997 ) 

998 fragments = transformation.fragments 

999 display_to_source_functions.append(transformation.display_to_source) 

1000 source_to_display_functions.append(transformation.source_to_display) 

1001 

1002 def display_to_source(i: int) -> int: 

1003 for f in reversed(display_to_source_functions): 

1004 i = f(i) 

1005 return i 

1006 

1007 # In the case of a nested _MergedProcessor, each processor wants to 

1008 # receive a 'source_to_display' function (as part of the 

1009 # TransformationInput) that has everything in the chain before 

1010 # included, because it can be called as part of the 

1011 # `apply_transformation` function. However, this first 

1012 # `source_to_display` should not be part of the output that we are 

1013 # returning. (This is the most consistent with `display_to_source`.) 

1014 del source_to_display_functions[:1] 

1015 

1016 return Transformation(fragments, source_to_display, display_to_source)