Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/layout/processors.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

382 statements  

1""" 

2Processors are little transformation blocks that transform the fragments list 

3from a buffer before the BufferControl will render it to the screen. 

4 

5They can insert fragments before or after, or highlight fragments by replacing the 

6fragment types. 

7""" 

8 

9from __future__ import annotations 

10 

11import re 

12from abc import ABCMeta, abstractmethod 

13from collections.abc import Callable, Hashable 

14from typing import TYPE_CHECKING, cast 

15 

16from prompt_toolkit.application.current import get_app 

17from prompt_toolkit.cache import SimpleCache 

18from prompt_toolkit.document import Document 

19from prompt_toolkit.filters import FilterOrBool, to_filter, vi_insert_multiple_mode 

20from prompt_toolkit.formatted_text import ( 

21 AnyFormattedText, 

22 StyleAndTextTuples, 

23 to_formatted_text, 

24) 

25from prompt_toolkit.formatted_text.utils import fragment_list_len, fragment_list_to_text 

26from prompt_toolkit.search import SearchDirection 

27from prompt_toolkit.utils import to_int, to_str 

28 

29from .utils import explode_text_fragments 

30 

31if TYPE_CHECKING: 

32 from .controls import BufferControl, UIContent 

33 

34__all__ = [ 

35 "Processor", 

36 "TransformationInput", 

37 "Transformation", 

38 "DummyProcessor", 

39 "HighlightSearchProcessor", 

40 "HighlightIncrementalSearchProcessor", 

41 "HighlightSelectionProcessor", 

42 "PasswordProcessor", 

43 "HighlightMatchingBracketProcessor", 

44 "DisplayMultipleCursors", 

45 "BeforeInput", 

46 "ShowArg", 

47 "AfterInput", 

48 "AppendAutoSuggestion", 

49 "ConditionalProcessor", 

50 "ShowLeadingWhiteSpaceProcessor", 

51 "ShowTrailingWhiteSpaceProcessor", 

52 "TabsProcessor", 

53 "ReverseSearchProcessor", 

54 "DynamicProcessor", 

55 "merge_processors", 

56] 

57 

58 

59class Processor(metaclass=ABCMeta): 

60 """ 

61 Manipulate the fragments for a given line in a 

62 :class:`~prompt_toolkit.layout.controls.BufferControl`. 

63 """ 

64 

65 @abstractmethod 

66 def apply_transformation( 

67 self, transformation_input: TransformationInput 

68 ) -> Transformation: 

69 """ 

70 Apply transformation. Returns a :class:`.Transformation` instance. 

71 

72 :param transformation_input: :class:`.TransformationInput` object. 

73 """ 

74 return Transformation(transformation_input.fragments) 

75 

76 

77SourceToDisplay = Callable[[int], int] 

78DisplayToSource = Callable[[int], int] 

79 

80 

81class TransformationInput: 

82 """ 

83 :param buffer_control: :class:`.BufferControl` instance. 

84 :param lineno: The number of the line to which we apply the processor. 

85 :param source_to_display: A function that returns the position in the 

86 `fragments` for any position in the source string. (This takes 

87 previous processors into account.) 

88 :param fragments: List of fragments that we can transform. (Received from the 

89 previous processor.) 

90 :param get_line: Optional ; a callable that returns the fragments of another 

91 line in the current buffer; This can be used to create processors capable 

92 of affecting transforms across multiple lines. 

93 """ 

94 

95 def __init__( 

96 self, 

97 buffer_control: BufferControl, 

98 document: Document, 

99 lineno: int, 

100 source_to_display: SourceToDisplay, 

101 fragments: StyleAndTextTuples, 

102 width: int, 

103 height: int, 

104 get_line: Callable[[int], StyleAndTextTuples] | None = None, 

105 ) -> None: 

106 self.buffer_control = buffer_control 

107 self.document = document 

108 self.lineno = lineno 

109 self.source_to_display = source_to_display 

110 self.fragments = fragments 

111 self.width = width 

112 self.height = height 

113 self.get_line = get_line 

114 

115 def unpack( 

116 self, 

117 ) -> tuple[ 

118 BufferControl, Document, int, SourceToDisplay, StyleAndTextTuples, int, int 

119 ]: 

120 return ( 

121 self.buffer_control, 

122 self.document, 

123 self.lineno, 

124 self.source_to_display, 

125 self.fragments, 

126 self.width, 

127 self.height, 

128 ) 

129 

130 

131class Transformation: 

132 """ 

133 Transformation result, as returned by :meth:`.Processor.apply_transformation`. 

134 

135 Important: Always make sure that the length of `document.text` is equal to 

136 the length of all the text in `fragments`! 

137 

138 :param fragments: The transformed fragments. To be displayed, or to pass to 

139 the next processor. 

140 :param source_to_display: Cursor position transformation from original 

141 string to transformed string. 

142 :param display_to_source: Cursor position transformed from source string to 

143 original string. 

144 """ 

145 

146 def __init__( 

147 self, 

148 fragments: StyleAndTextTuples, 

149 source_to_display: SourceToDisplay | None = None, 

150 display_to_source: DisplayToSource | None = None, 

151 ) -> None: 

152 self.fragments = fragments 

153 self.source_to_display = source_to_display or (lambda i: i) 

154 self.display_to_source = display_to_source or (lambda i: i) 

155 

156 

157class DummyProcessor(Processor): 

158 """ 

159 A `Processor` that doesn't do anything. 

160 """ 

161 

162 def apply_transformation( 

163 self, transformation_input: TransformationInput 

164 ) -> Transformation: 

165 return Transformation(transformation_input.fragments) 

166 

167 

168class HighlightSearchProcessor(Processor): 

169 """ 

170 Processor that highlights search matches in the document. 

171 Note that this doesn't support multiline search matches yet. 

172 

173 The style classes 'search' and 'search.current' will be applied to the 

174 content. 

175 """ 

176 

177 _classname = "search" 

178 _classname_current = "search.current" 

179 

180 def _get_search_text(self, buffer_control: BufferControl) -> str: 

181 """ 

182 The text we are searching for. 

183 """ 

184 return buffer_control.search_state.text 

185 

186 def apply_transformation( 

187 self, transformation_input: TransformationInput 

188 ) -> Transformation: 

189 ( 

190 buffer_control, 

191 document, 

192 lineno, 

193 source_to_display, 

194 fragments, 

195 _, 

196 _, 

197 ) = transformation_input.unpack() 

198 

199 search_text = self._get_search_text(buffer_control) 

200 searchmatch_fragment = f" class:{self._classname} " 

201 searchmatch_current_fragment = f" class:{self._classname_current} " 

202 

203 if search_text and not get_app().is_done: 

204 # For each search match, replace the style string. 

205 line_text = fragment_list_to_text(fragments) 

206 fragments = explode_text_fragments(fragments) 

207 

208 if buffer_control.search_state.ignore_case(): 

209 flags = re.IGNORECASE 

210 else: 

211 flags = re.RegexFlag(0) 

212 

213 # Get cursor column. 

214 cursor_column: int | None 

215 if document.cursor_position_row == lineno: 

216 cursor_column = source_to_display(document.cursor_position_col) 

217 else: 

218 cursor_column = None 

219 

220 for match in re.finditer(re.escape(search_text), line_text, flags=flags): 

221 if cursor_column is not None: 

222 on_cursor = match.start() <= cursor_column < match.end() 

223 else: 

224 on_cursor = False 

225 

226 for i in range(match.start(), match.end()): 

227 old_fragment, text, *_ = fragments[i] 

228 if on_cursor: 

229 fragments[i] = ( 

230 old_fragment + searchmatch_current_fragment, 

231 fragments[i][1], 

232 ) 

233 else: 

234 fragments[i] = ( 

235 old_fragment + searchmatch_fragment, 

236 fragments[i][1], 

237 ) 

238 

239 return Transformation(fragments) 

240 

241 

242class HighlightIncrementalSearchProcessor(HighlightSearchProcessor): 

243 """ 

244 Highlight the search terms that are used for highlighting the incremental 

245 search. The style class 'incsearch' will be applied to the content. 

246 

247 Important: this requires the `preview_search=True` flag to be set for the 

248 `BufferControl`. Otherwise, the cursor position won't be set to the search 

249 match while searching, and nothing happens. 

250 """ 

251 

252 _classname = "incsearch" 

253 _classname_current = "incsearch.current" 

254 

255 def _get_search_text(self, buffer_control: BufferControl) -> str: 

256 """ 

257 The text we are searching for. 

258 """ 

259 # When the search buffer has focus, take that text. 

260 search_buffer = buffer_control.search_buffer 

261 if search_buffer is not None and search_buffer.text: 

262 return search_buffer.text 

263 return "" 

264 

265 

266class HighlightSelectionProcessor(Processor): 

267 """ 

268 Processor that highlights the selection in the document. 

269 """ 

270 

271 def apply_transformation( 

272 self, transformation_input: TransformationInput 

273 ) -> Transformation: 

274 ( 

275 buffer_control, 

276 document, 

277 lineno, 

278 source_to_display, 

279 fragments, 

280 _, 

281 _, 

282 ) = transformation_input.unpack() 

283 

284 selected_fragment = " class:selected " 

285 

286 # In case of selection, highlight all matches. 

287 selection_at_line = document.selection_range_at_line(lineno) 

288 

289 if selection_at_line: 

290 from_, to = selection_at_line 

291 from_ = source_to_display(from_) 

292 to = source_to_display(to) 

293 

294 fragments = explode_text_fragments(fragments) 

295 

296 if from_ == 0 and to == 0 and len(fragments) == 0: 

297 # When this is an empty line, insert a space in order to 

298 # visualize the selection. 

299 return Transformation([(selected_fragment, " ")]) 

300 else: 

301 for i in range(from_, to): 

302 if i < len(fragments): 

303 old_fragment, old_text, *_ = fragments[i] 

304 fragments[i] = (old_fragment + selected_fragment, old_text) 

305 elif i == len(fragments): 

306 fragments.append((selected_fragment, " ")) 

307 

308 return Transformation(fragments) 

309 

310 

311class PasswordProcessor(Processor): 

312 """ 

313 Processor that masks the input. (For passwords.) 

314 

315 :param char: (string) Character to be used. "*" by default. 

316 """ 

317 

318 def __init__(self, char: str = "*") -> None: 

319 self.char = char 

320 

321 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

322 fragments: StyleAndTextTuples = cast( 

323 StyleAndTextTuples, 

324 [ 

325 (style, self.char * len(text), *handler) 

326 for style, text, *handler in ti.fragments 

327 ], 

328 ) 

329 

330 return Transformation(fragments) 

331 

332 

333class HighlightMatchingBracketProcessor(Processor): 

334 """ 

335 When the cursor is on or right after a bracket, it highlights the matching 

336 bracket. 

337 

338 :param max_cursor_distance: Only highlight matching brackets when the 

339 cursor is within this distance. (From inside a `Processor`, we can't 

340 know which lines will be visible on the screen. But we also don't want 

341 to scan the whole document for matching brackets on each key press, so 

342 we limit to this value.) 

343 """ 

344 

345 _closing_braces = "])}>" 

346 

347 def __init__( 

348 self, chars: str = "[](){}<>", max_cursor_distance: int = 1000 

349 ) -> None: 

350 self.chars = chars 

351 self.max_cursor_distance = max_cursor_distance 

352 

353 self._positions_cache: SimpleCache[Hashable, list[tuple[int, int]]] = ( 

354 SimpleCache(maxsize=8) 

355 ) 

356 

357 def _get_positions_to_highlight(self, document: Document) -> list[tuple[int, int]]: 

358 """ 

359 Return a list of (row, col) tuples that need to be highlighted. 

360 """ 

361 pos: int | None 

362 

363 # Try for the character under the cursor. 

364 if document.current_char and document.current_char in self.chars: 

365 pos = document.find_matching_bracket_position( 

366 start_pos=document.cursor_position - self.max_cursor_distance, 

367 end_pos=document.cursor_position + self.max_cursor_distance, 

368 ) 

369 

370 # Try for the character before the cursor. 

371 elif ( 

372 document.char_before_cursor 

373 and document.char_before_cursor in self._closing_braces 

374 and document.char_before_cursor in self.chars 

375 ): 

376 document = Document(document.text, document.cursor_position - 1) 

377 

378 pos = document.find_matching_bracket_position( 

379 start_pos=document.cursor_position - self.max_cursor_distance, 

380 end_pos=document.cursor_position + self.max_cursor_distance, 

381 ) 

382 else: 

383 pos = None 

384 

385 # Return a list of (row, col) tuples that need to be highlighted. 

386 if pos: 

387 pos += document.cursor_position # pos is relative. 

388 row, col = document.translate_index_to_position(pos) 

389 return [ 

390 (row, col), 

391 (document.cursor_position_row, document.cursor_position_col), 

392 ] 

393 else: 

394 return [] 

395 

396 def apply_transformation( 

397 self, transformation_input: TransformationInput 

398 ) -> Transformation: 

399 ( 

400 buffer_control, 

401 document, 

402 lineno, 

403 source_to_display, 

404 fragments, 

405 _, 

406 _, 

407 ) = transformation_input.unpack() 

408 

409 # When the application is in the 'done' state, don't highlight. 

410 if get_app().is_done: 

411 return Transformation(fragments) 

412 

413 # Get the highlight positions. 

414 key = (get_app().render_counter, document.text, document.cursor_position) 

415 positions = self._positions_cache.get( 

416 key, lambda: self._get_positions_to_highlight(document) 

417 ) 

418 

419 # Apply if positions were found at this line. 

420 if positions: 

421 for row, col in positions: 

422 if row == lineno: 

423 col = source_to_display(col) 

424 fragments = explode_text_fragments(fragments) 

425 style, text, *_ = fragments[col] 

426 

427 if col == document.cursor_position_col: 

428 style += " class:matching-bracket.cursor " 

429 else: 

430 style += " class:matching-bracket.other " 

431 

432 fragments[col] = (style, text) 

433 

434 return Transformation(fragments) 

435 

436 

437class DisplayMultipleCursors(Processor): 

438 """ 

439 When we're in Vi block insert mode, display all the cursors. 

440 """ 

441 

442 def apply_transformation( 

443 self, transformation_input: TransformationInput 

444 ) -> Transformation: 

445 ( 

446 buffer_control, 

447 document, 

448 lineno, 

449 source_to_display, 

450 fragments, 

451 _, 

452 _, 

453 ) = transformation_input.unpack() 

454 

455 buff = buffer_control.buffer 

456 

457 if vi_insert_multiple_mode(): 

458 cursor_positions = buff.multiple_cursor_positions 

459 fragments = explode_text_fragments(fragments) 

460 

461 # If any cursor appears on the current line, highlight that. 

462 start_pos = document.translate_row_col_to_index(lineno, 0) 

463 end_pos = start_pos + len(document.lines[lineno]) 

464 

465 fragment_suffix = " class:multiple-cursors" 

466 

467 for p in cursor_positions: 

468 if start_pos <= p <= end_pos: 

469 column = source_to_display(p - start_pos) 

470 

471 # Replace fragment. 

472 try: 

473 style, text, *_ = fragments[column] 

474 except IndexError: 

475 # Cursor needs to be displayed after the current text. 

476 fragments.append((fragment_suffix, " ")) 

477 else: 

478 style += fragment_suffix 

479 fragments[column] = (style, text) 

480 

481 return Transformation(fragments) 

482 else: 

483 return Transformation(fragments) 

484 

485 

486class BeforeInput(Processor): 

487 """ 

488 Insert text before the input. 

489 

490 :param text: This can be either plain text or formatted text 

491 (or a callable that returns any of those). 

492 :param style: style to be applied to this prompt/prefix. 

493 """ 

494 

495 def __init__(self, text: AnyFormattedText, style: str = "") -> None: 

496 self.text = text 

497 self.style = style 

498 

499 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

500 source_to_display: SourceToDisplay | None 

501 display_to_source: DisplayToSource | None 

502 

503 if ti.lineno == 0: 

504 # Get fragments. 

505 fragments_before = to_formatted_text(self.text, self.style) 

506 fragments = fragments_before + ti.fragments 

507 

508 shift_position = fragment_list_len(fragments_before) 

509 source_to_display = lambda i: i + shift_position 

510 display_to_source = lambda i: i - shift_position 

511 else: 

512 fragments = ti.fragments 

513 source_to_display = None 

514 display_to_source = None 

515 

516 return Transformation( 

517 fragments, 

518 source_to_display=source_to_display, 

519 display_to_source=display_to_source, 

520 ) 

521 

522 def __repr__(self) -> str: 

523 return f"BeforeInput({self.text!r}, {self.style!r})" 

524 

525 

526class ShowArg(BeforeInput): 

527 """ 

528 Display the 'arg' in front of the input. 

529 

530 This was used by the `PromptSession`, but now it uses the 

531 `Window.get_line_prefix` function instead. 

532 """ 

533 

534 def __init__(self) -> None: 

535 super().__init__(self._get_text_fragments) 

536 

537 def _get_text_fragments(self) -> StyleAndTextTuples: 

538 app = get_app() 

539 if app.key_processor.arg is None: 

540 return [] 

541 else: 

542 arg = app.key_processor.arg 

543 

544 return [ 

545 ("class:prompt.arg", "(arg: "), 

546 ("class:prompt.arg.text", str(arg)), 

547 ("class:prompt.arg", ") "), 

548 ] 

549 

550 def __repr__(self) -> str: 

551 return "ShowArg()" 

552 

553 

554class AfterInput(Processor): 

555 """ 

556 Insert text after the input. 

557 

558 :param text: This can be either plain text or formatted text 

559 (or a callable that returns any of those). 

560 :param style: style to be applied to this prompt/prefix. 

561 """ 

562 

563 def __init__(self, text: AnyFormattedText, style: str = "") -> None: 

564 self.text = text 

565 self.style = style 

566 

567 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

568 # Insert fragments after the last line. 

569 if ti.lineno == ti.document.line_count - 1: 

570 # Get fragments. 

571 fragments_after = to_formatted_text(self.text, self.style) 

572 return Transformation(fragments=ti.fragments + fragments_after) 

573 else: 

574 return Transformation(fragments=ti.fragments) 

575 

576 def __repr__(self) -> str: 

577 return f"{self.__class__.__name__}({self.text!r}, style={self.style!r})" 

578 

579 

580class AppendAutoSuggestion(Processor): 

581 """ 

582 Append the auto suggestion to the input. 

583 (The user can then press the right arrow the insert the suggestion.) 

584 """ 

585 

586 def __init__(self, style: str = "class:auto-suggestion") -> None: 

587 self.style = style 

588 

589 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

590 # Insert fragments after the last line. 

591 if ti.lineno == ti.document.line_count - 1: 

592 buffer = ti.buffer_control.buffer 

593 

594 if buffer.suggestion and ti.document.is_cursor_at_the_end: 

595 suggestion = buffer.suggestion.text 

596 else: 

597 suggestion = "" 

598 

599 return Transformation(fragments=ti.fragments + [(self.style, suggestion)]) 

600 else: 

601 return Transformation(fragments=ti.fragments) 

602 

603 

604class ShowLeadingWhiteSpaceProcessor(Processor): 

605 """ 

606 Make leading whitespace visible. 

607 

608 :param get_char: Callable that returns one character. 

609 """ 

610 

611 def __init__( 

612 self, 

613 get_char: Callable[[], str] | None = None, 

614 style: str = "class:leading-whitespace", 

615 ) -> None: 

616 def default_get_char() -> str: 

617 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?": 

618 return "." 

619 else: 

620 return "\xb7" 

621 

622 self.style = style 

623 self.get_char = get_char or default_get_char 

624 

625 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

626 fragments = ti.fragments 

627 

628 # Walk through all te fragments. 

629 if fragments and fragment_list_to_text(fragments).startswith(" "): 

630 t = (self.style, self.get_char()) 

631 fragments = explode_text_fragments(fragments) 

632 

633 for i in range(len(fragments)): 

634 if fragments[i][1] == " ": 

635 fragments[i] = t 

636 else: 

637 break 

638 

639 return Transformation(fragments) 

640 

641 

642class ShowTrailingWhiteSpaceProcessor(Processor): 

643 """ 

644 Make trailing whitespace visible. 

645 

646 :param get_char: Callable that returns one character. 

647 """ 

648 

649 def __init__( 

650 self, 

651 get_char: Callable[[], str] | None = None, 

652 style: str = "class:training-whitespace", 

653 ) -> None: 

654 def default_get_char() -> str: 

655 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?": 

656 return "." 

657 else: 

658 return "\xb7" 

659 

660 self.style = style 

661 self.get_char = get_char or default_get_char 

662 

663 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

664 fragments = ti.fragments 

665 

666 if fragments and fragments[-1][1].endswith(" "): 

667 t = (self.style, self.get_char()) 

668 fragments = explode_text_fragments(fragments) 

669 

670 # Walk backwards through all te fragments and replace whitespace. 

671 for i in range(len(fragments) - 1, -1, -1): 

672 char = fragments[i][1] 

673 if char == " ": 

674 fragments[i] = t 

675 else: 

676 break 

677 

678 return Transformation(fragments) 

679 

680 

681class TabsProcessor(Processor): 

682 """ 

683 Render tabs as spaces (instead of ^I) or make them visible (for instance, 

684 by replacing them with dots.) 

685 

686 :param tabstop: Horizontal space taken by a tab. (`int` or callable that 

687 returns an `int`). 

688 :param char1: Character or callable that returns a character (text of 

689 length one). This one is used for the first space taken by the tab. 

690 :param char2: Like `char1`, but for the rest of the space. 

691 """ 

692 

693 def __init__( 

694 self, 

695 tabstop: int | Callable[[], int] = 4, 

696 char1: str | Callable[[], str] = "|", 

697 char2: str | Callable[[], str] = "\u2508", 

698 style: str = "class:tab", 

699 ) -> None: 

700 self.char1 = char1 

701 self.char2 = char2 

702 self.tabstop = tabstop 

703 self.style = style 

704 

705 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

706 tabstop = to_int(self.tabstop) 

707 style = self.style 

708 

709 # Create separator for tabs. 

710 separator1 = to_str(self.char1) 

711 separator2 = to_str(self.char2) 

712 

713 # Transform fragments. 

714 fragments = explode_text_fragments(ti.fragments) 

715 

716 position_mappings = {} 

717 result_fragments: StyleAndTextTuples = [] 

718 pos = 0 

719 

720 for i, fragment_and_text in enumerate(fragments): 

721 position_mappings[i] = pos 

722 

723 if fragment_and_text[1] == "\t": 

724 # Calculate how many characters we have to insert. 

725 count = tabstop - (pos % tabstop) 

726 if count == 0: 

727 count = tabstop 

728 

729 # Insert tab. 

730 result_fragments.append((style, separator1)) 

731 result_fragments.append((style, separator2 * (count - 1))) 

732 pos += count 

733 else: 

734 result_fragments.append(fragment_and_text) 

735 pos += 1 

736 

737 position_mappings[len(fragments)] = pos 

738 # Add `pos+1` to mapping, because the cursor can be right after the 

739 # line as well. 

740 position_mappings[len(fragments) + 1] = pos + 1 

741 

742 def source_to_display(from_position: int) -> int: 

743 "Maps original cursor position to the new one." 

744 return position_mappings[from_position] 

745 

746 def display_to_source(display_pos: int) -> int: 

747 "Maps display cursor position to the original one." 

748 position_mappings_reversed = {v: k for k, v in position_mappings.items()} 

749 

750 while display_pos >= 0: 

751 try: 

752 return position_mappings_reversed[display_pos] 

753 except KeyError: 

754 display_pos -= 1 

755 return 0 

756 

757 return Transformation( 

758 result_fragments, 

759 source_to_display=source_to_display, 

760 display_to_source=display_to_source, 

761 ) 

762 

763 

764class ReverseSearchProcessor(Processor): 

765 """ 

766 Process to display the "(reverse-i-search)`...`:..." stuff around 

767 the search buffer. 

768 

769 Note: This processor is meant to be applied to the BufferControl that 

770 contains the search buffer, it's not meant for the original input. 

771 """ 

772 

773 _excluded_input_processors: list[type[Processor]] = [ 

774 HighlightSearchProcessor, 

775 HighlightSelectionProcessor, 

776 BeforeInput, 

777 AfterInput, 

778 ] 

779 

780 def _get_main_buffer(self, buffer_control: BufferControl) -> BufferControl | None: 

781 from prompt_toolkit.layout.controls import BufferControl 

782 

783 prev_control = get_app().layout.search_target_buffer_control 

784 if ( 

785 isinstance(prev_control, BufferControl) 

786 and prev_control.search_buffer_control == buffer_control 

787 ): 

788 return prev_control 

789 return None 

790 

791 def _content( 

792 self, main_control: BufferControl, ti: TransformationInput 

793 ) -> UIContent: 

794 from prompt_toolkit.layout.controls import BufferControl 

795 

796 # Emulate the BufferControl through which we are searching. 

797 # For this we filter out some of the input processors. 

798 excluded_processors = tuple(self._excluded_input_processors) 

799 

800 def filter_processor(item: Processor) -> Processor | None: 

801 """Filter processors from the main control that we want to disable 

802 here. This returns either an accepted processor or None.""" 

803 # For a `_MergedProcessor`, check each individual processor, recursively. 

804 if isinstance(item, _MergedProcessor): 

805 accepted_processors = [filter_processor(p) for p in item.processors] 

806 return merge_processors( 

807 [p for p in accepted_processors if p is not None] 

808 ) 

809 

810 # For a `ConditionalProcessor`, check the body. 

811 elif isinstance(item, ConditionalProcessor): 

812 p = filter_processor(item.processor) 

813 if p: 

814 return ConditionalProcessor(p, item.filter) 

815 

816 # Otherwise, check the processor itself. 

817 else: 

818 if not isinstance(item, excluded_processors): 

819 return item 

820 

821 return None 

822 

823 filtered_processor = filter_processor( 

824 merge_processors(main_control.input_processors or []) 

825 ) 

826 highlight_processor = HighlightIncrementalSearchProcessor() 

827 

828 if filtered_processor: 

829 new_processors = [filtered_processor, highlight_processor] 

830 else: 

831 new_processors = [highlight_processor] 

832 

833 from .controls import SearchBufferControl 

834 

835 assert isinstance(ti.buffer_control, SearchBufferControl) 

836 

837 buffer_control = BufferControl( 

838 buffer=main_control.buffer, 

839 input_processors=new_processors, 

840 include_default_input_processors=False, 

841 lexer=main_control.lexer, 

842 preview_search=True, 

843 search_buffer_control=ti.buffer_control, 

844 ) 

845 

846 return buffer_control.create_content(ti.width, ti.height, preview_search=True) 

847 

848 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

849 from .controls import SearchBufferControl 

850 

851 assert isinstance(ti.buffer_control, SearchBufferControl), ( 

852 "`ReverseSearchProcessor` should be applied to a `SearchBufferControl` only." 

853 ) 

854 

855 source_to_display: SourceToDisplay | None 

856 display_to_source: DisplayToSource | None 

857 

858 main_control = self._get_main_buffer(ti.buffer_control) 

859 

860 if ti.lineno == 0 and main_control: 

861 content = self._content(main_control, ti) 

862 

863 # Get the line from the original document for this search. 

864 line_fragments = content.get_line(content.cursor_position.y) 

865 

866 if main_control.search_state.direction == SearchDirection.FORWARD: 

867 direction_text = "i-search" 

868 else: 

869 direction_text = "reverse-i-search" 

870 

871 fragments_before: StyleAndTextTuples = [ 

872 ("class:prompt.search", "("), 

873 ("class:prompt.search", direction_text), 

874 ("class:prompt.search", ")`"), 

875 ] 

876 

877 fragments = ( 

878 fragments_before 

879 + [ 

880 ("class:prompt.search.text", fragment_list_to_text(ti.fragments)), 

881 ("", "': "), 

882 ] 

883 + line_fragments 

884 ) 

885 

886 shift_position = fragment_list_len(fragments_before) 

887 source_to_display = lambda i: i + shift_position 

888 display_to_source = lambda i: i - shift_position 

889 else: 

890 source_to_display = None 

891 display_to_source = None 

892 fragments = ti.fragments 

893 

894 return Transformation( 

895 fragments, 

896 source_to_display=source_to_display, 

897 display_to_source=display_to_source, 

898 ) 

899 

900 

901class ConditionalProcessor(Processor): 

902 """ 

903 Processor that applies another processor, according to a certain condition. 

904 Example:: 

905 

906 # Create a function that returns whether or not the processor should 

907 # currently be applied. 

908 def highlight_enabled(): 

909 return true_or_false 

910 

911 # Wrapped it in a `ConditionalProcessor` for usage in a `BufferControl`. 

912 BufferControl(input_processors=[ 

913 ConditionalProcessor(HighlightSearchProcessor(), 

914 Condition(highlight_enabled))]) 

915 

916 :param processor: :class:`.Processor` instance. 

917 :param filter: :class:`~prompt_toolkit.filters.Filter` instance. 

918 """ 

919 

920 def __init__(self, processor: Processor, filter: FilterOrBool) -> None: 

921 self.processor = processor 

922 self.filter = to_filter(filter) 

923 

924 def apply_transformation( 

925 self, transformation_input: TransformationInput 

926 ) -> Transformation: 

927 # Run processor when enabled. 

928 if self.filter(): 

929 return self.processor.apply_transformation(transformation_input) 

930 else: 

931 return Transformation(transformation_input.fragments) 

932 

933 def __repr__(self) -> str: 

934 return f"{self.__class__.__name__}(processor={self.processor!r}, filter={self.filter!r})" 

935 

936 

937class DynamicProcessor(Processor): 

938 """ 

939 Processor class that dynamically returns any Processor. 

940 

941 :param get_processor: Callable that returns a :class:`.Processor` instance. 

942 """ 

943 

944 def __init__(self, get_processor: Callable[[], Processor | None]) -> None: 

945 self.get_processor = get_processor 

946 

947 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

948 processor = self.get_processor() or DummyProcessor() 

949 return processor.apply_transformation(ti) 

950 

951 

952def merge_processors(processors: list[Processor]) -> Processor: 

953 """ 

954 Merge multiple `Processor` objects into one. 

955 """ 

956 if len(processors) == 0: 

957 return DummyProcessor() 

958 

959 if len(processors) == 1: 

960 return processors[0] # Nothing to merge. 

961 

962 return _MergedProcessor(processors) 

963 

964 

965class _MergedProcessor(Processor): 

966 """ 

967 Processor that groups multiple other `Processor` objects, but exposes an 

968 API as if it is one `Processor`. 

969 """ 

970 

971 def __init__(self, processors: list[Processor]): 

972 self.processors = processors 

973 

974 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

975 source_to_display_functions = [ti.source_to_display] 

976 display_to_source_functions = [] 

977 fragments = ti.fragments 

978 

979 def source_to_display(i: int) -> int: 

980 """Translate x position from the buffer to the x position in the 

981 processor fragments list.""" 

982 for f in source_to_display_functions: 

983 i = f(i) 

984 return i 

985 

986 for p in self.processors: 

987 transformation = p.apply_transformation( 

988 TransformationInput( 

989 ti.buffer_control, 

990 ti.document, 

991 ti.lineno, 

992 source_to_display, 

993 fragments, 

994 ti.width, 

995 ti.height, 

996 ti.get_line, 

997 ) 

998 ) 

999 fragments = transformation.fragments 

1000 display_to_source_functions.append(transformation.display_to_source) 

1001 source_to_display_functions.append(transformation.source_to_display) 

1002 

1003 def display_to_source(i: int) -> int: 

1004 for f in reversed(display_to_source_functions): 

1005 i = f(i) 

1006 return i 

1007 

1008 # In the case of a nested _MergedProcessor, each processor wants to 

1009 # receive a 'source_to_display' function (as part of the 

1010 # TransformationInput) that has everything in the chain before 

1011 # included, because it can be called as part of the 

1012 # `apply_transformation` function. However, this first 

1013 # `source_to_display` should not be part of the output that we are 

1014 # returning. (This is the most consistent with `display_to_source`.) 

1015 del source_to_display_functions[:1] 

1016 

1017 return Transformation(fragments, source_to_display, display_to_source)