Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/layout/processors.py: 23%

379 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1""" 

2Processors are little transformation blocks that transform the fragments list 

3from a buffer before the BufferControl will render it to the screen. 

4 

5They can insert fragments before or after, or highlight fragments by replacing the 

6fragment types. 

7""" 

8from __future__ import annotations 

9 

10import re 

11from abc import ABCMeta, abstractmethod 

12from typing import ( 

13 TYPE_CHECKING, 

14 Callable, 

15 Hashable, 

16 List, 

17 Optional, 

18 Tuple, 

19 Type, 

20 Union, 

21 cast, 

22) 

23 

24from prompt_toolkit.application.current import get_app 

25from prompt_toolkit.cache import SimpleCache 

26from prompt_toolkit.document import Document 

27from prompt_toolkit.filters import FilterOrBool, to_filter, vi_insert_multiple_mode 

28from prompt_toolkit.formatted_text import ( 

29 AnyFormattedText, 

30 StyleAndTextTuples, 

31 to_formatted_text, 

32) 

33from prompt_toolkit.formatted_text.utils import fragment_list_len, fragment_list_to_text 

34from prompt_toolkit.search import SearchDirection 

35from prompt_toolkit.utils import to_int, to_str 

36 

37from .utils import explode_text_fragments 

38 

39if TYPE_CHECKING: 

40 from .controls import BufferControl, UIContent 

41 

42__all__ = [ 

43 "Processor", 

44 "TransformationInput", 

45 "Transformation", 

46 "DummyProcessor", 

47 "HighlightSearchProcessor", 

48 "HighlightIncrementalSearchProcessor", 

49 "HighlightSelectionProcessor", 

50 "PasswordProcessor", 

51 "HighlightMatchingBracketProcessor", 

52 "DisplayMultipleCursors", 

53 "BeforeInput", 

54 "ShowArg", 

55 "AfterInput", 

56 "AppendAutoSuggestion", 

57 "ConditionalProcessor", 

58 "ShowLeadingWhiteSpaceProcessor", 

59 "ShowTrailingWhiteSpaceProcessor", 

60 "TabsProcessor", 

61 "ReverseSearchProcessor", 

62 "DynamicProcessor", 

63 "merge_processors", 

64] 

65 

66 

67class Processor(metaclass=ABCMeta): 

68 """ 

69 Manipulate the fragments for a given line in a 

70 :class:`~prompt_toolkit.layout.controls.BufferControl`. 

71 """ 

72 

73 @abstractmethod 

74 def apply_transformation( 

75 self, transformation_input: TransformationInput 

76 ) -> Transformation: 

77 """ 

78 Apply transformation. Returns a :class:`.Transformation` instance. 

79 

80 :param transformation_input: :class:`.TransformationInput` object. 

81 """ 

82 return Transformation(transformation_input.fragments) 

83 

84 

85SourceToDisplay = Callable[[int], int] 

86DisplayToSource = Callable[[int], int] 

87 

88 

89class TransformationInput: 

90 """ 

91 :param buffer_control: :class:`.BufferControl` instance. 

92 :param lineno: The number of the line to which we apply the processor. 

93 :param source_to_display: A function that returns the position in the 

94 `fragments` for any position in the source string. (This takes 

95 previous processors into account.) 

96 :param fragments: List of fragments that we can transform. (Received from the 

97 previous processor.) 

98 """ 

99 

100 def __init__( 

101 self, 

102 buffer_control: BufferControl, 

103 document: Document, 

104 lineno: int, 

105 source_to_display: SourceToDisplay, 

106 fragments: StyleAndTextTuples, 

107 width: int, 

108 height: int, 

109 ) -> None: 

110 self.buffer_control = buffer_control 

111 self.document = document 

112 self.lineno = lineno 

113 self.source_to_display = source_to_display 

114 self.fragments = fragments 

115 self.width = width 

116 self.height = height 

117 

118 def unpack( 

119 self, 

120 ) -> tuple[ 

121 BufferControl, Document, int, SourceToDisplay, StyleAndTextTuples, int, int 

122 ]: 

123 return ( 

124 self.buffer_control, 

125 self.document, 

126 self.lineno, 

127 self.source_to_display, 

128 self.fragments, 

129 self.width, 

130 self.height, 

131 ) 

132 

133 

134class Transformation: 

135 """ 

136 Transformation result, as returned by :meth:`.Processor.apply_transformation`. 

137 

138 Important: Always make sure that the length of `document.text` is equal to 

139 the length of all the text in `fragments`! 

140 

141 :param fragments: The transformed fragments. To be displayed, or to pass to 

142 the next processor. 

143 :param source_to_display: Cursor position transformation from original 

144 string to transformed string. 

145 :param display_to_source: Cursor position transformed from source string to 

146 original string. 

147 """ 

148 

149 def __init__( 

150 self, 

151 fragments: StyleAndTextTuples, 

152 source_to_display: SourceToDisplay | None = None, 

153 display_to_source: DisplayToSource | None = None, 

154 ) -> None: 

155 self.fragments = fragments 

156 self.source_to_display = source_to_display or (lambda i: i) 

157 self.display_to_source = display_to_source or (lambda i: i) 

158 

159 

160class DummyProcessor(Processor): 

161 """ 

162 A `Processor` that doesn't do anything. 

163 """ 

164 

165 def apply_transformation( 

166 self, transformation_input: TransformationInput 

167 ) -> Transformation: 

168 return Transformation(transformation_input.fragments) 

169 

170 

171class HighlightSearchProcessor(Processor): 

172 """ 

173 Processor that highlights search matches in the document. 

174 Note that this doesn't support multiline search matches yet. 

175 

176 The style classes 'search' and 'search.current' will be applied to the 

177 content. 

178 """ 

179 

180 _classname = "search" 

181 _classname_current = "search.current" 

182 

183 def _get_search_text(self, buffer_control: BufferControl) -> str: 

184 """ 

185 The text we are searching for. 

186 """ 

187 return buffer_control.search_state.text 

188 

189 def apply_transformation( 

190 self, transformation_input: TransformationInput 

191 ) -> Transformation: 

192 ( 

193 buffer_control, 

194 document, 

195 lineno, 

196 source_to_display, 

197 fragments, 

198 _, 

199 _, 

200 ) = transformation_input.unpack() 

201 

202 search_text = self._get_search_text(buffer_control) 

203 searchmatch_fragment = f" class:{self._classname} " 

204 searchmatch_current_fragment = f" class:{self._classname_current} " 

205 

206 if search_text and not get_app().is_done: 

207 # For each search match, replace the style string. 

208 line_text = fragment_list_to_text(fragments) 

209 fragments = explode_text_fragments(fragments) 

210 

211 if buffer_control.search_state.ignore_case(): 

212 flags = re.IGNORECASE 

213 else: 

214 flags = re.RegexFlag(0) 

215 

216 # Get cursor column. 

217 cursor_column: int | None 

218 if document.cursor_position_row == lineno: 

219 cursor_column = source_to_display(document.cursor_position_col) 

220 else: 

221 cursor_column = None 

222 

223 for match in re.finditer(re.escape(search_text), line_text, flags=flags): 

224 if cursor_column is not None: 

225 on_cursor = match.start() <= cursor_column < match.end() 

226 else: 

227 on_cursor = False 

228 

229 for i in range(match.start(), match.end()): 

230 old_fragment, text, *_ = fragments[i] 

231 if on_cursor: 

232 fragments[i] = ( 

233 old_fragment + searchmatch_current_fragment, 

234 fragments[i][1], 

235 ) 

236 else: 

237 fragments[i] = ( 

238 old_fragment + searchmatch_fragment, 

239 fragments[i][1], 

240 ) 

241 

242 return Transformation(fragments) 

243 

244 

245class HighlightIncrementalSearchProcessor(HighlightSearchProcessor): 

246 """ 

247 Highlight the search terms that are used for highlighting the incremental 

248 search. The style class 'incsearch' will be applied to the content. 

249 

250 Important: this requires the `preview_search=True` flag to be set for the 

251 `BufferControl`. Otherwise, the cursor position won't be set to the search 

252 match while searching, and nothing happens. 

253 """ 

254 

255 _classname = "incsearch" 

256 _classname_current = "incsearch.current" 

257 

258 def _get_search_text(self, buffer_control: BufferControl) -> str: 

259 """ 

260 The text we are searching for. 

261 """ 

262 # When the search buffer has focus, take that text. 

263 search_buffer = buffer_control.search_buffer 

264 if search_buffer is not None and search_buffer.text: 

265 return search_buffer.text 

266 return "" 

267 

268 

269class HighlightSelectionProcessor(Processor): 

270 """ 

271 Processor that highlights the selection in the document. 

272 """ 

273 

274 def apply_transformation( 

275 self, transformation_input: TransformationInput 

276 ) -> Transformation: 

277 ( 

278 buffer_control, 

279 document, 

280 lineno, 

281 source_to_display, 

282 fragments, 

283 _, 

284 _, 

285 ) = transformation_input.unpack() 

286 

287 selected_fragment = " class:selected " 

288 

289 # In case of selection, highlight all matches. 

290 selection_at_line = document.selection_range_at_line(lineno) 

291 

292 if selection_at_line: 

293 from_, to = selection_at_line 

294 from_ = source_to_display(from_) 

295 to = source_to_display(to) 

296 

297 fragments = explode_text_fragments(fragments) 

298 

299 if from_ == 0 and to == 0 and len(fragments) == 0: 

300 # When this is an empty line, insert a space in order to 

301 # visualise the selection. 

302 return Transformation([(selected_fragment, " ")]) 

303 else: 

304 for i in range(from_, to): 

305 if i < len(fragments): 

306 old_fragment, old_text, *_ = fragments[i] 

307 fragments[i] = (old_fragment + selected_fragment, old_text) 

308 elif i == len(fragments): 

309 fragments.append((selected_fragment, " ")) 

310 

311 return Transformation(fragments) 

312 

313 

314class PasswordProcessor(Processor): 

315 """ 

316 Processor that masks the input. (For passwords.) 

317 

318 :param char: (string) Character to be used. "*" by default. 

319 """ 

320 

321 def __init__(self, char: str = "*") -> None: 

322 self.char = char 

323 

324 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

325 fragments: StyleAndTextTuples = cast( 

326 StyleAndTextTuples, 

327 [ 

328 (style, self.char * len(text), *handler) 

329 for style, text, *handler in ti.fragments 

330 ], 

331 ) 

332 

333 return Transformation(fragments) 

334 

335 

336class HighlightMatchingBracketProcessor(Processor): 

337 """ 

338 When the cursor is on or right after a bracket, it highlights the matching 

339 bracket. 

340 

341 :param max_cursor_distance: Only highlight matching brackets when the 

342 cursor is within this distance. (From inside a `Processor`, we can't 

343 know which lines will be visible on the screen. But we also don't want 

344 to scan the whole document for matching brackets on each key press, so 

345 we limit to this value.) 

346 """ 

347 

348 _closing_braces = "])}>" 

349 

350 def __init__( 

351 self, chars: str = "[](){}<>", max_cursor_distance: int = 1000 

352 ) -> None: 

353 self.chars = chars 

354 self.max_cursor_distance = max_cursor_distance 

355 

356 self._positions_cache: SimpleCache[ 

357 Hashable, list[tuple[int, int]] 

358 ] = SimpleCache(maxsize=8) 

359 

360 def _get_positions_to_highlight(self, document: Document) -> list[tuple[int, int]]: 

361 """ 

362 Return a list of (row, col) tuples that need to be highlighted. 

363 """ 

364 pos: int | None 

365 

366 # Try for the character under the cursor. 

367 if document.current_char and document.current_char in self.chars: 

368 pos = document.find_matching_bracket_position( 

369 start_pos=document.cursor_position - self.max_cursor_distance, 

370 end_pos=document.cursor_position + self.max_cursor_distance, 

371 ) 

372 

373 # Try for the character before the cursor. 

374 elif ( 

375 document.char_before_cursor 

376 and document.char_before_cursor in self._closing_braces 

377 and document.char_before_cursor in self.chars 

378 ): 

379 document = Document(document.text, document.cursor_position - 1) 

380 

381 pos = document.find_matching_bracket_position( 

382 start_pos=document.cursor_position - self.max_cursor_distance, 

383 end_pos=document.cursor_position + self.max_cursor_distance, 

384 ) 

385 else: 

386 pos = None 

387 

388 # Return a list of (row, col) tuples that need to be highlighted. 

389 if pos: 

390 pos += document.cursor_position # pos is relative. 

391 row, col = document.translate_index_to_position(pos) 

392 return [ 

393 (row, col), 

394 (document.cursor_position_row, document.cursor_position_col), 

395 ] 

396 else: 

397 return [] 

398 

399 def apply_transformation( 

400 self, transformation_input: TransformationInput 

401 ) -> Transformation: 

402 ( 

403 buffer_control, 

404 document, 

405 lineno, 

406 source_to_display, 

407 fragments, 

408 _, 

409 _, 

410 ) = transformation_input.unpack() 

411 

412 # When the application is in the 'done' state, don't highlight. 

413 if get_app().is_done: 

414 return Transformation(fragments) 

415 

416 # Get the highlight positions. 

417 key = (get_app().render_counter, document.text, document.cursor_position) 

418 positions = self._positions_cache.get( 

419 key, lambda: self._get_positions_to_highlight(document) 

420 ) 

421 

422 # Apply if positions were found at this line. 

423 if positions: 

424 for row, col in positions: 

425 if row == lineno: 

426 col = source_to_display(col) 

427 fragments = explode_text_fragments(fragments) 

428 style, text, *_ = fragments[col] 

429 

430 if col == document.cursor_position_col: 

431 style += " class:matching-bracket.cursor " 

432 else: 

433 style += " class:matching-bracket.other " 

434 

435 fragments[col] = (style, text) 

436 

437 return Transformation(fragments) 

438 

439 

440class DisplayMultipleCursors(Processor): 

441 """ 

442 When we're in Vi block insert mode, display all the cursors. 

443 """ 

444 

445 def apply_transformation( 

446 self, transformation_input: TransformationInput 

447 ) -> Transformation: 

448 ( 

449 buffer_control, 

450 document, 

451 lineno, 

452 source_to_display, 

453 fragments, 

454 _, 

455 _, 

456 ) = transformation_input.unpack() 

457 

458 buff = buffer_control.buffer 

459 

460 if vi_insert_multiple_mode(): 

461 cursor_positions = buff.multiple_cursor_positions 

462 fragments = explode_text_fragments(fragments) 

463 

464 # If any cursor appears on the current line, highlight that. 

465 start_pos = document.translate_row_col_to_index(lineno, 0) 

466 end_pos = start_pos + len(document.lines[lineno]) 

467 

468 fragment_suffix = " class:multiple-cursors" 

469 

470 for p in cursor_positions: 

471 if start_pos <= p <= end_pos: 

472 column = source_to_display(p - start_pos) 

473 

474 # Replace fragment. 

475 try: 

476 style, text, *_ = fragments[column] 

477 except IndexError: 

478 # Cursor needs to be displayed after the current text. 

479 fragments.append((fragment_suffix, " ")) 

480 else: 

481 style += fragment_suffix 

482 fragments[column] = (style, text) 

483 

484 return Transformation(fragments) 

485 else: 

486 return Transformation(fragments) 

487 

488 

489class BeforeInput(Processor): 

490 """ 

491 Insert text before the input. 

492 

493 :param text: This can be either plain text or formatted text 

494 (or a callable that returns any of those). 

495 :param style: style to be applied to this prompt/prefix. 

496 """ 

497 

498 def __init__(self, text: AnyFormattedText, style: str = "") -> None: 

499 self.text = text 

500 self.style = style 

501 

502 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

503 source_to_display: SourceToDisplay | None 

504 display_to_source: DisplayToSource | None 

505 

506 if ti.lineno == 0: 

507 # Get fragments. 

508 fragments_before = to_formatted_text(self.text, self.style) 

509 fragments = fragments_before + ti.fragments 

510 

511 shift_position = fragment_list_len(fragments_before) 

512 source_to_display = lambda i: i + shift_position 

513 display_to_source = lambda i: i - shift_position 

514 else: 

515 fragments = ti.fragments 

516 source_to_display = None 

517 display_to_source = None 

518 

519 return Transformation( 

520 fragments, 

521 source_to_display=source_to_display, 

522 display_to_source=display_to_source, 

523 ) 

524 

525 def __repr__(self) -> str: 

526 return f"BeforeInput({self.text!r}, {self.style!r})" 

527 

528 

529class ShowArg(BeforeInput): 

530 """ 

531 Display the 'arg' in front of the input. 

532 

533 This was used by the `PromptSession`, but now it uses the 

534 `Window.get_line_prefix` function instead. 

535 """ 

536 

537 def __init__(self) -> None: 

538 super().__init__(self._get_text_fragments) 

539 

540 def _get_text_fragments(self) -> StyleAndTextTuples: 

541 app = get_app() 

542 if app.key_processor.arg is None: 

543 return [] 

544 else: 

545 arg = app.key_processor.arg 

546 

547 return [ 

548 ("class:prompt.arg", "(arg: "), 

549 ("class:prompt.arg.text", str(arg)), 

550 ("class:prompt.arg", ") "), 

551 ] 

552 

553 def __repr__(self) -> str: 

554 return "ShowArg()" 

555 

556 

557class AfterInput(Processor): 

558 """ 

559 Insert text after the input. 

560 

561 :param text: This can be either plain text or formatted text 

562 (or a callable that returns any of those). 

563 :param style: style to be applied to this prompt/prefix. 

564 """ 

565 

566 def __init__(self, text: AnyFormattedText, style: str = "") -> None: 

567 self.text = text 

568 self.style = style 

569 

570 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

571 # Insert fragments after the last line. 

572 if ti.lineno == ti.document.line_count - 1: 

573 # Get fragments. 

574 fragments_after = to_formatted_text(self.text, self.style) 

575 return Transformation(fragments=ti.fragments + fragments_after) 

576 else: 

577 return Transformation(fragments=ti.fragments) 

578 

579 def __repr__(self) -> str: 

580 return f"{self.__class__.__name__}({self.text!r}, style={self.style!r})" 

581 

582 

583class AppendAutoSuggestion(Processor): 

584 """ 

585 Append the auto suggestion to the input. 

586 (The user can then press the right arrow the insert the suggestion.) 

587 """ 

588 

589 def __init__(self, style: str = "class:auto-suggestion") -> None: 

590 self.style = style 

591 

592 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

593 # Insert fragments after the last line. 

594 if ti.lineno == ti.document.line_count - 1: 

595 buffer = ti.buffer_control.buffer 

596 

597 if buffer.suggestion and ti.document.is_cursor_at_the_end: 

598 suggestion = buffer.suggestion.text 

599 else: 

600 suggestion = "" 

601 

602 return Transformation(fragments=ti.fragments + [(self.style, suggestion)]) 

603 else: 

604 return Transformation(fragments=ti.fragments) 

605 

606 

607class ShowLeadingWhiteSpaceProcessor(Processor): 

608 """ 

609 Make leading whitespace visible. 

610 

611 :param get_char: Callable that returns one character. 

612 """ 

613 

614 def __init__( 

615 self, 

616 get_char: Callable[[], str] | None = None, 

617 style: str = "class:leading-whitespace", 

618 ) -> None: 

619 def default_get_char() -> str: 

620 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?": 

621 return "." 

622 else: 

623 return "\xb7" 

624 

625 self.style = style 

626 self.get_char = get_char or default_get_char 

627 

628 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

629 fragments = ti.fragments 

630 

631 # Walk through all te fragments. 

632 if fragments and fragment_list_to_text(fragments).startswith(" "): 

633 t = (self.style, self.get_char()) 

634 fragments = explode_text_fragments(fragments) 

635 

636 for i in range(len(fragments)): 

637 if fragments[i][1] == " ": 

638 fragments[i] = t 

639 else: 

640 break 

641 

642 return Transformation(fragments) 

643 

644 

645class ShowTrailingWhiteSpaceProcessor(Processor): 

646 """ 

647 Make trailing whitespace visible. 

648 

649 :param get_char: Callable that returns one character. 

650 """ 

651 

652 def __init__( 

653 self, 

654 get_char: Callable[[], str] | None = None, 

655 style: str = "class:training-whitespace", 

656 ) -> None: 

657 def default_get_char() -> str: 

658 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?": 

659 return "." 

660 else: 

661 return "\xb7" 

662 

663 self.style = style 

664 self.get_char = get_char or default_get_char 

665 

666 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

667 fragments = ti.fragments 

668 

669 if fragments and fragments[-1][1].endswith(" "): 

670 t = (self.style, self.get_char()) 

671 fragments = explode_text_fragments(fragments) 

672 

673 # Walk backwards through all te fragments and replace whitespace. 

674 for i in range(len(fragments) - 1, -1, -1): 

675 char = fragments[i][1] 

676 if char == " ": 

677 fragments[i] = t 

678 else: 

679 break 

680 

681 return Transformation(fragments) 

682 

683 

684class TabsProcessor(Processor): 

685 """ 

686 Render tabs as spaces (instead of ^I) or make them visible (for instance, 

687 by replacing them with dots.) 

688 

689 :param tabstop: Horizontal space taken by a tab. (`int` or callable that 

690 returns an `int`). 

691 :param char1: Character or callable that returns a character (text of 

692 length one). This one is used for the first space taken by the tab. 

693 :param char2: Like `char1`, but for the rest of the space. 

694 """ 

695 

696 def __init__( 

697 self, 

698 tabstop: int | Callable[[], int] = 4, 

699 char1: str | Callable[[], str] = "|", 

700 char2: str | Callable[[], str] = "\u2508", 

701 style: str = "class:tab", 

702 ) -> None: 

703 self.char1 = char1 

704 self.char2 = char2 

705 self.tabstop = tabstop 

706 self.style = style 

707 

708 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

709 tabstop = to_int(self.tabstop) 

710 style = self.style 

711 

712 # Create separator for tabs. 

713 separator1 = to_str(self.char1) 

714 separator2 = to_str(self.char2) 

715 

716 # Transform fragments. 

717 fragments = explode_text_fragments(ti.fragments) 

718 

719 position_mappings = {} 

720 result_fragments: StyleAndTextTuples = [] 

721 pos = 0 

722 

723 for i, fragment_and_text in enumerate(fragments): 

724 position_mappings[i] = pos 

725 

726 if fragment_and_text[1] == "\t": 

727 # Calculate how many characters we have to insert. 

728 count = tabstop - (pos % tabstop) 

729 if count == 0: 

730 count = tabstop 

731 

732 # Insert tab. 

733 result_fragments.append((style, separator1)) 

734 result_fragments.append((style, separator2 * (count - 1))) 

735 pos += count 

736 else: 

737 result_fragments.append(fragment_and_text) 

738 pos += 1 

739 

740 position_mappings[len(fragments)] = pos 

741 # Add `pos+1` to mapping, because the cursor can be right after the 

742 # line as well. 

743 position_mappings[len(fragments) + 1] = pos + 1 

744 

745 def source_to_display(from_position: int) -> int: 

746 "Maps original cursor position to the new one." 

747 return position_mappings[from_position] 

748 

749 def display_to_source(display_pos: int) -> int: 

750 "Maps display cursor position to the original one." 

751 position_mappings_reversed = {v: k for k, v in position_mappings.items()} 

752 

753 while display_pos >= 0: 

754 try: 

755 return position_mappings_reversed[display_pos] 

756 except KeyError: 

757 display_pos -= 1 

758 return 0 

759 

760 return Transformation( 

761 result_fragments, 

762 source_to_display=source_to_display, 

763 display_to_source=display_to_source, 

764 ) 

765 

766 

767class ReverseSearchProcessor(Processor): 

768 """ 

769 Process to display the "(reverse-i-search)`...`:..." stuff around 

770 the search buffer. 

771 

772 Note: This processor is meant to be applied to the BufferControl that 

773 contains the search buffer, it's not meant for the original input. 

774 """ 

775 

776 _excluded_input_processors: list[type[Processor]] = [ 

777 HighlightSearchProcessor, 

778 HighlightSelectionProcessor, 

779 BeforeInput, 

780 AfterInput, 

781 ] 

782 

783 def _get_main_buffer(self, buffer_control: BufferControl) -> BufferControl | None: 

784 from prompt_toolkit.layout.controls import BufferControl 

785 

786 prev_control = get_app().layout.search_target_buffer_control 

787 if ( 

788 isinstance(prev_control, BufferControl) 

789 and prev_control.search_buffer_control == buffer_control 

790 ): 

791 return prev_control 

792 return None 

793 

794 def _content( 

795 self, main_control: BufferControl, ti: TransformationInput 

796 ) -> UIContent: 

797 from prompt_toolkit.layout.controls import BufferControl 

798 

799 # Emulate the BufferControl through which we are searching. 

800 # For this we filter out some of the input processors. 

801 excluded_processors = tuple(self._excluded_input_processors) 

802 

803 def filter_processor(item: Processor) -> Processor | None: 

804 """Filter processors from the main control that we want to disable 

805 here. This returns either an accepted processor or None.""" 

806 # For a `_MergedProcessor`, check each individual processor, recursively. 

807 if isinstance(item, _MergedProcessor): 

808 accepted_processors = [filter_processor(p) for p in item.processors] 

809 return merge_processors( 

810 [p for p in accepted_processors if p is not None] 

811 ) 

812 

813 # For a `ConditionalProcessor`, check the body. 

814 elif isinstance(item, ConditionalProcessor): 

815 p = filter_processor(item.processor) 

816 if p: 

817 return ConditionalProcessor(p, item.filter) 

818 

819 # Otherwise, check the processor itself. 

820 else: 

821 if not isinstance(item, excluded_processors): 

822 return item 

823 

824 return None 

825 

826 filtered_processor = filter_processor( 

827 merge_processors(main_control.input_processors or []) 

828 ) 

829 highlight_processor = HighlightIncrementalSearchProcessor() 

830 

831 if filtered_processor: 

832 new_processors = [filtered_processor, highlight_processor] 

833 else: 

834 new_processors = [highlight_processor] 

835 

836 from .controls import SearchBufferControl 

837 

838 assert isinstance(ti.buffer_control, SearchBufferControl) 

839 

840 buffer_control = BufferControl( 

841 buffer=main_control.buffer, 

842 input_processors=new_processors, 

843 include_default_input_processors=False, 

844 lexer=main_control.lexer, 

845 preview_search=True, 

846 search_buffer_control=ti.buffer_control, 

847 ) 

848 

849 return buffer_control.create_content(ti.width, ti.height, preview_search=True) 

850 

851 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

852 from .controls import SearchBufferControl 

853 

854 assert isinstance( 

855 ti.buffer_control, SearchBufferControl 

856 ), "`ReverseSearchProcessor` should be applied to a `SearchBufferControl` only." 

857 

858 source_to_display: SourceToDisplay | None 

859 display_to_source: DisplayToSource | None 

860 

861 main_control = self._get_main_buffer(ti.buffer_control) 

862 

863 if ti.lineno == 0 and main_control: 

864 content = self._content(main_control, ti) 

865 

866 # Get the line from the original document for this search. 

867 line_fragments = content.get_line(content.cursor_position.y) 

868 

869 if main_control.search_state.direction == SearchDirection.FORWARD: 

870 direction_text = "i-search" 

871 else: 

872 direction_text = "reverse-i-search" 

873 

874 fragments_before: StyleAndTextTuples = [ 

875 ("class:prompt.search", "("), 

876 ("class:prompt.search", direction_text), 

877 ("class:prompt.search", ")`"), 

878 ] 

879 

880 fragments = ( 

881 fragments_before 

882 + [ 

883 ("class:prompt.search.text", fragment_list_to_text(ti.fragments)), 

884 ("", "': "), 

885 ] 

886 + line_fragments 

887 ) 

888 

889 shift_position = fragment_list_len(fragments_before) 

890 source_to_display = lambda i: i + shift_position 

891 display_to_source = lambda i: i - shift_position 

892 else: 

893 source_to_display = None 

894 display_to_source = None 

895 fragments = ti.fragments 

896 

897 return Transformation( 

898 fragments, 

899 source_to_display=source_to_display, 

900 display_to_source=display_to_source, 

901 ) 

902 

903 

904class ConditionalProcessor(Processor): 

905 """ 

906 Processor that applies another processor, according to a certain condition. 

907 Example:: 

908 

909 # Create a function that returns whether or not the processor should 

910 # currently be applied. 

911 def highlight_enabled(): 

912 return true_or_false 

913 

914 # Wrapped it in a `ConditionalProcessor` for usage in a `BufferControl`. 

915 BufferControl(input_processors=[ 

916 ConditionalProcessor(HighlightSearchProcessor(), 

917 Condition(highlight_enabled))]) 

918 

919 :param processor: :class:`.Processor` instance. 

920 :param filter: :class:`~prompt_toolkit.filters.Filter` instance. 

921 """ 

922 

923 def __init__(self, processor: Processor, filter: FilterOrBool) -> None: 

924 self.processor = processor 

925 self.filter = to_filter(filter) 

926 

927 def apply_transformation( 

928 self, transformation_input: TransformationInput 

929 ) -> Transformation: 

930 # Run processor when enabled. 

931 if self.filter(): 

932 return self.processor.apply_transformation(transformation_input) 

933 else: 

934 return Transformation(transformation_input.fragments) 

935 

936 def __repr__(self) -> str: 

937 return "{}(processor={!r}, filter={!r})".format( 

938 self.__class__.__name__, 

939 self.processor, 

940 self.filter, 

941 ) 

942 

943 

944class DynamicProcessor(Processor): 

945 """ 

946 Processor class that dynamically returns any Processor. 

947 

948 :param get_processor: Callable that returns a :class:`.Processor` instance. 

949 """ 

950 

951 def __init__(self, get_processor: Callable[[], Processor | None]) -> None: 

952 self.get_processor = get_processor 

953 

954 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

955 processor = self.get_processor() or DummyProcessor() 

956 return processor.apply_transformation(ti) 

957 

958 

959def merge_processors(processors: list[Processor]) -> Processor: 

960 """ 

961 Merge multiple `Processor` objects into one. 

962 """ 

963 if len(processors) == 0: 

964 return DummyProcessor() 

965 

966 if len(processors) == 1: 

967 return processors[0] # Nothing to merge. 

968 

969 return _MergedProcessor(processors) 

970 

971 

972class _MergedProcessor(Processor): 

973 """ 

974 Processor that groups multiple other `Processor` objects, but exposes an 

975 API as if it is one `Processor`. 

976 """ 

977 

978 def __init__(self, processors: list[Processor]): 

979 self.processors = processors 

980 

981 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

982 source_to_display_functions = [ti.source_to_display] 

983 display_to_source_functions = [] 

984 fragments = ti.fragments 

985 

986 def source_to_display(i: int) -> int: 

987 """Translate x position from the buffer to the x position in the 

988 processor fragments list.""" 

989 for f in source_to_display_functions: 

990 i = f(i) 

991 return i 

992 

993 for p in self.processors: 

994 transformation = p.apply_transformation( 

995 TransformationInput( 

996 ti.buffer_control, 

997 ti.document, 

998 ti.lineno, 

999 source_to_display, 

1000 fragments, 

1001 ti.width, 

1002 ti.height, 

1003 ) 

1004 ) 

1005 fragments = transformation.fragments 

1006 display_to_source_functions.append(transformation.display_to_source) 

1007 source_to_display_functions.append(transformation.source_to_display) 

1008 

1009 def display_to_source(i: int) -> int: 

1010 for f in reversed(display_to_source_functions): 

1011 i = f(i) 

1012 return i 

1013 

1014 # In the case of a nested _MergedProcessor, each processor wants to 

1015 # receive a 'source_to_display' function (as part of the 

1016 # TransformationInput) that has everything in the chain before 

1017 # included, because it can be called as part of the 

1018 # `apply_transformation` function. However, this first 

1019 # `source_to_display` should not be part of the output that we are 

1020 # returning. (This is the most consistent with `display_to_source`.) 

1021 del source_to_display_functions[:1] 

1022 

1023 return Transformation(fragments, source_to_display, display_to_source)