Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/layout/processors.py: 23%

379 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1""" 

2Processors are little transformation blocks that transform the fragments list 

3from a buffer before the BufferControl will render it to the screen. 

4 

5They can insert fragments before or after, or highlight fragments by replacing the 

6fragment types. 

7""" 

8from __future__ import annotations 

9 

10import re 

11from abc import ABCMeta, abstractmethod 

12from typing import TYPE_CHECKING, Callable, Hashable, cast 

13 

14from prompt_toolkit.application.current import get_app 

15from prompt_toolkit.cache import SimpleCache 

16from prompt_toolkit.document import Document 

17from prompt_toolkit.filters import FilterOrBool, to_filter, vi_insert_multiple_mode 

18from prompt_toolkit.formatted_text import ( 

19 AnyFormattedText, 

20 StyleAndTextTuples, 

21 to_formatted_text, 

22) 

23from prompt_toolkit.formatted_text.utils import fragment_list_len, fragment_list_to_text 

24from prompt_toolkit.search import SearchDirection 

25from prompt_toolkit.utils import to_int, to_str 

26 

27from .utils import explode_text_fragments 

28 

29if TYPE_CHECKING: 

30 from .controls import BufferControl, UIContent 

31 

32__all__ = [ 

33 "Processor", 

34 "TransformationInput", 

35 "Transformation", 

36 "DummyProcessor", 

37 "HighlightSearchProcessor", 

38 "HighlightIncrementalSearchProcessor", 

39 "HighlightSelectionProcessor", 

40 "PasswordProcessor", 

41 "HighlightMatchingBracketProcessor", 

42 "DisplayMultipleCursors", 

43 "BeforeInput", 

44 "ShowArg", 

45 "AfterInput", 

46 "AppendAutoSuggestion", 

47 "ConditionalProcessor", 

48 "ShowLeadingWhiteSpaceProcessor", 

49 "ShowTrailingWhiteSpaceProcessor", 

50 "TabsProcessor", 

51 "ReverseSearchProcessor", 

52 "DynamicProcessor", 

53 "merge_processors", 

54] 

55 

56 

57class Processor(metaclass=ABCMeta): 

58 """ 

59 Manipulate the fragments for a given line in a 

60 :class:`~prompt_toolkit.layout.controls.BufferControl`. 

61 """ 

62 

63 @abstractmethod 

64 def apply_transformation( 

65 self, transformation_input: TransformationInput 

66 ) -> Transformation: 

67 """ 

68 Apply transformation. Returns a :class:`.Transformation` instance. 

69 

70 :param transformation_input: :class:`.TransformationInput` object. 

71 """ 

72 return Transformation(transformation_input.fragments) 

73 

74 

75SourceToDisplay = Callable[[int], int] 

76DisplayToSource = Callable[[int], int] 

77 

78 

79class TransformationInput: 

80 """ 

81 :param buffer_control: :class:`.BufferControl` instance. 

82 :param lineno: The number of the line to which we apply the processor. 

83 :param source_to_display: A function that returns the position in the 

84 `fragments` for any position in the source string. (This takes 

85 previous processors into account.) 

86 :param fragments: List of fragments that we can transform. (Received from the 

87 previous processor.) 

88 """ 

89 

90 def __init__( 

91 self, 

92 buffer_control: BufferControl, 

93 document: Document, 

94 lineno: int, 

95 source_to_display: SourceToDisplay, 

96 fragments: StyleAndTextTuples, 

97 width: int, 

98 height: int, 

99 ) -> None: 

100 self.buffer_control = buffer_control 

101 self.document = document 

102 self.lineno = lineno 

103 self.source_to_display = source_to_display 

104 self.fragments = fragments 

105 self.width = width 

106 self.height = height 

107 

108 def unpack( 

109 self, 

110 ) -> tuple[ 

111 BufferControl, Document, int, SourceToDisplay, StyleAndTextTuples, int, int 

112 ]: 

113 return ( 

114 self.buffer_control, 

115 self.document, 

116 self.lineno, 

117 self.source_to_display, 

118 self.fragments, 

119 self.width, 

120 self.height, 

121 ) 

122 

123 

124class Transformation: 

125 """ 

126 Transformation result, as returned by :meth:`.Processor.apply_transformation`. 

127 

128 Important: Always make sure that the length of `document.text` is equal to 

129 the length of all the text in `fragments`! 

130 

131 :param fragments: The transformed fragments. To be displayed, or to pass to 

132 the next processor. 

133 :param source_to_display: Cursor position transformation from original 

134 string to transformed string. 

135 :param display_to_source: Cursor position transformed from source string to 

136 original string. 

137 """ 

138 

139 def __init__( 

140 self, 

141 fragments: StyleAndTextTuples, 

142 source_to_display: SourceToDisplay | None = None, 

143 display_to_source: DisplayToSource | None = None, 

144 ) -> None: 

145 self.fragments = fragments 

146 self.source_to_display = source_to_display or (lambda i: i) 

147 self.display_to_source = display_to_source or (lambda i: i) 

148 

149 

150class DummyProcessor(Processor): 

151 """ 

152 A `Processor` that doesn't do anything. 

153 """ 

154 

155 def apply_transformation( 

156 self, transformation_input: TransformationInput 

157 ) -> Transformation: 

158 return Transformation(transformation_input.fragments) 

159 

160 

161class HighlightSearchProcessor(Processor): 

162 """ 

163 Processor that highlights search matches in the document. 

164 Note that this doesn't support multiline search matches yet. 

165 

166 The style classes 'search' and 'search.current' will be applied to the 

167 content. 

168 """ 

169 

170 _classname = "search" 

171 _classname_current = "search.current" 

172 

173 def _get_search_text(self, buffer_control: BufferControl) -> str: 

174 """ 

175 The text we are searching for. 

176 """ 

177 return buffer_control.search_state.text 

178 

179 def apply_transformation( 

180 self, transformation_input: TransformationInput 

181 ) -> Transformation: 

182 ( 

183 buffer_control, 

184 document, 

185 lineno, 

186 source_to_display, 

187 fragments, 

188 _, 

189 _, 

190 ) = transformation_input.unpack() 

191 

192 search_text = self._get_search_text(buffer_control) 

193 searchmatch_fragment = f" class:{self._classname} " 

194 searchmatch_current_fragment = f" class:{self._classname_current} " 

195 

196 if search_text and not get_app().is_done: 

197 # For each search match, replace the style string. 

198 line_text = fragment_list_to_text(fragments) 

199 fragments = explode_text_fragments(fragments) 

200 

201 if buffer_control.search_state.ignore_case(): 

202 flags = re.IGNORECASE 

203 else: 

204 flags = re.RegexFlag(0) 

205 

206 # Get cursor column. 

207 cursor_column: int | None 

208 if document.cursor_position_row == lineno: 

209 cursor_column = source_to_display(document.cursor_position_col) 

210 else: 

211 cursor_column = None 

212 

213 for match in re.finditer(re.escape(search_text), line_text, flags=flags): 

214 if cursor_column is not None: 

215 on_cursor = match.start() <= cursor_column < match.end() 

216 else: 

217 on_cursor = False 

218 

219 for i in range(match.start(), match.end()): 

220 old_fragment, text, *_ = fragments[i] 

221 if on_cursor: 

222 fragments[i] = ( 

223 old_fragment + searchmatch_current_fragment, 

224 fragments[i][1], 

225 ) 

226 else: 

227 fragments[i] = ( 

228 old_fragment + searchmatch_fragment, 

229 fragments[i][1], 

230 ) 

231 

232 return Transformation(fragments) 

233 

234 

235class HighlightIncrementalSearchProcessor(HighlightSearchProcessor): 

236 """ 

237 Highlight the search terms that are used for highlighting the incremental 

238 search. The style class 'incsearch' will be applied to the content. 

239 

240 Important: this requires the `preview_search=True` flag to be set for the 

241 `BufferControl`. Otherwise, the cursor position won't be set to the search 

242 match while searching, and nothing happens. 

243 """ 

244 

245 _classname = "incsearch" 

246 _classname_current = "incsearch.current" 

247 

248 def _get_search_text(self, buffer_control: BufferControl) -> str: 

249 """ 

250 The text we are searching for. 

251 """ 

252 # When the search buffer has focus, take that text. 

253 search_buffer = buffer_control.search_buffer 

254 if search_buffer is not None and search_buffer.text: 

255 return search_buffer.text 

256 return "" 

257 

258 

259class HighlightSelectionProcessor(Processor): 

260 """ 

261 Processor that highlights the selection in the document. 

262 """ 

263 

264 def apply_transformation( 

265 self, transformation_input: TransformationInput 

266 ) -> Transformation: 

267 ( 

268 buffer_control, 

269 document, 

270 lineno, 

271 source_to_display, 

272 fragments, 

273 _, 

274 _, 

275 ) = transformation_input.unpack() 

276 

277 selected_fragment = " class:selected " 

278 

279 # In case of selection, highlight all matches. 

280 selection_at_line = document.selection_range_at_line(lineno) 

281 

282 if selection_at_line: 

283 from_, to = selection_at_line 

284 from_ = source_to_display(from_) 

285 to = source_to_display(to) 

286 

287 fragments = explode_text_fragments(fragments) 

288 

289 if from_ == 0 and to == 0 and len(fragments) == 0: 

290 # When this is an empty line, insert a space in order to 

291 # visualize the selection. 

292 return Transformation([(selected_fragment, " ")]) 

293 else: 

294 for i in range(from_, to): 

295 if i < len(fragments): 

296 old_fragment, old_text, *_ = fragments[i] 

297 fragments[i] = (old_fragment + selected_fragment, old_text) 

298 elif i == len(fragments): 

299 fragments.append((selected_fragment, " ")) 

300 

301 return Transformation(fragments) 

302 

303 

304class PasswordProcessor(Processor): 

305 """ 

306 Processor that masks the input. (For passwords.) 

307 

308 :param char: (string) Character to be used. "*" by default. 

309 """ 

310 

311 def __init__(self, char: str = "*") -> None: 

312 self.char = char 

313 

314 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

315 fragments: StyleAndTextTuples = cast( 

316 StyleAndTextTuples, 

317 [ 

318 (style, self.char * len(text), *handler) 

319 for style, text, *handler in ti.fragments 

320 ], 

321 ) 

322 

323 return Transformation(fragments) 

324 

325 

326class HighlightMatchingBracketProcessor(Processor): 

327 """ 

328 When the cursor is on or right after a bracket, it highlights the matching 

329 bracket. 

330 

331 :param max_cursor_distance: Only highlight matching brackets when the 

332 cursor is within this distance. (From inside a `Processor`, we can't 

333 know which lines will be visible on the screen. But we also don't want 

334 to scan the whole document for matching brackets on each key press, so 

335 we limit to this value.) 

336 """ 

337 

338 _closing_braces = "])}>" 

339 

340 def __init__( 

341 self, chars: str = "[](){}<>", max_cursor_distance: int = 1000 

342 ) -> None: 

343 self.chars = chars 

344 self.max_cursor_distance = max_cursor_distance 

345 

346 self._positions_cache: SimpleCache[ 

347 Hashable, list[tuple[int, int]] 

348 ] = SimpleCache(maxsize=8) 

349 

350 def _get_positions_to_highlight(self, document: Document) -> list[tuple[int, int]]: 

351 """ 

352 Return a list of (row, col) tuples that need to be highlighted. 

353 """ 

354 pos: int | None 

355 

356 # Try for the character under the cursor. 

357 if document.current_char and document.current_char in self.chars: 

358 pos = document.find_matching_bracket_position( 

359 start_pos=document.cursor_position - self.max_cursor_distance, 

360 end_pos=document.cursor_position + self.max_cursor_distance, 

361 ) 

362 

363 # Try for the character before the cursor. 

364 elif ( 

365 document.char_before_cursor 

366 and document.char_before_cursor in self._closing_braces 

367 and document.char_before_cursor in self.chars 

368 ): 

369 document = Document(document.text, document.cursor_position - 1) 

370 

371 pos = document.find_matching_bracket_position( 

372 start_pos=document.cursor_position - self.max_cursor_distance, 

373 end_pos=document.cursor_position + self.max_cursor_distance, 

374 ) 

375 else: 

376 pos = None 

377 

378 # Return a list of (row, col) tuples that need to be highlighted. 

379 if pos: 

380 pos += document.cursor_position # pos is relative. 

381 row, col = document.translate_index_to_position(pos) 

382 return [ 

383 (row, col), 

384 (document.cursor_position_row, document.cursor_position_col), 

385 ] 

386 else: 

387 return [] 

388 

389 def apply_transformation( 

390 self, transformation_input: TransformationInput 

391 ) -> Transformation: 

392 ( 

393 buffer_control, 

394 document, 

395 lineno, 

396 source_to_display, 

397 fragments, 

398 _, 

399 _, 

400 ) = transformation_input.unpack() 

401 

402 # When the application is in the 'done' state, don't highlight. 

403 if get_app().is_done: 

404 return Transformation(fragments) 

405 

406 # Get the highlight positions. 

407 key = (get_app().render_counter, document.text, document.cursor_position) 

408 positions = self._positions_cache.get( 

409 key, lambda: self._get_positions_to_highlight(document) 

410 ) 

411 

412 # Apply if positions were found at this line. 

413 if positions: 

414 for row, col in positions: 

415 if row == lineno: 

416 col = source_to_display(col) 

417 fragments = explode_text_fragments(fragments) 

418 style, text, *_ = fragments[col] 

419 

420 if col == document.cursor_position_col: 

421 style += " class:matching-bracket.cursor " 

422 else: 

423 style += " class:matching-bracket.other " 

424 

425 fragments[col] = (style, text) 

426 

427 return Transformation(fragments) 

428 

429 

430class DisplayMultipleCursors(Processor): 

431 """ 

432 When we're in Vi block insert mode, display all the cursors. 

433 """ 

434 

435 def apply_transformation( 

436 self, transformation_input: TransformationInput 

437 ) -> Transformation: 

438 ( 

439 buffer_control, 

440 document, 

441 lineno, 

442 source_to_display, 

443 fragments, 

444 _, 

445 _, 

446 ) = transformation_input.unpack() 

447 

448 buff = buffer_control.buffer 

449 

450 if vi_insert_multiple_mode(): 

451 cursor_positions = buff.multiple_cursor_positions 

452 fragments = explode_text_fragments(fragments) 

453 

454 # If any cursor appears on the current line, highlight that. 

455 start_pos = document.translate_row_col_to_index(lineno, 0) 

456 end_pos = start_pos + len(document.lines[lineno]) 

457 

458 fragment_suffix = " class:multiple-cursors" 

459 

460 for p in cursor_positions: 

461 if start_pos <= p <= end_pos: 

462 column = source_to_display(p - start_pos) 

463 

464 # Replace fragment. 

465 try: 

466 style, text, *_ = fragments[column] 

467 except IndexError: 

468 # Cursor needs to be displayed after the current text. 

469 fragments.append((fragment_suffix, " ")) 

470 else: 

471 style += fragment_suffix 

472 fragments[column] = (style, text) 

473 

474 return Transformation(fragments) 

475 else: 

476 return Transformation(fragments) 

477 

478 

479class BeforeInput(Processor): 

480 """ 

481 Insert text before the input. 

482 

483 :param text: This can be either plain text or formatted text 

484 (or a callable that returns any of those). 

485 :param style: style to be applied to this prompt/prefix. 

486 """ 

487 

488 def __init__(self, text: AnyFormattedText, style: str = "") -> None: 

489 self.text = text 

490 self.style = style 

491 

492 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

493 source_to_display: SourceToDisplay | None 

494 display_to_source: DisplayToSource | None 

495 

496 if ti.lineno == 0: 

497 # Get fragments. 

498 fragments_before = to_formatted_text(self.text, self.style) 

499 fragments = fragments_before + ti.fragments 

500 

501 shift_position = fragment_list_len(fragments_before) 

502 source_to_display = lambda i: i + shift_position 

503 display_to_source = lambda i: i - shift_position 

504 else: 

505 fragments = ti.fragments 

506 source_to_display = None 

507 display_to_source = None 

508 

509 return Transformation( 

510 fragments, 

511 source_to_display=source_to_display, 

512 display_to_source=display_to_source, 

513 ) 

514 

515 def __repr__(self) -> str: 

516 return f"BeforeInput({self.text!r}, {self.style!r})" 

517 

518 

519class ShowArg(BeforeInput): 

520 """ 

521 Display the 'arg' in front of the input. 

522 

523 This was used by the `PromptSession`, but now it uses the 

524 `Window.get_line_prefix` function instead. 

525 """ 

526 

527 def __init__(self) -> None: 

528 super().__init__(self._get_text_fragments) 

529 

530 def _get_text_fragments(self) -> StyleAndTextTuples: 

531 app = get_app() 

532 if app.key_processor.arg is None: 

533 return [] 

534 else: 

535 arg = app.key_processor.arg 

536 

537 return [ 

538 ("class:prompt.arg", "(arg: "), 

539 ("class:prompt.arg.text", str(arg)), 

540 ("class:prompt.arg", ") "), 

541 ] 

542 

543 def __repr__(self) -> str: 

544 return "ShowArg()" 

545 

546 

547class AfterInput(Processor): 

548 """ 

549 Insert text after the input. 

550 

551 :param text: This can be either plain text or formatted text 

552 (or a callable that returns any of those). 

553 :param style: style to be applied to this prompt/prefix. 

554 """ 

555 

556 def __init__(self, text: AnyFormattedText, style: str = "") -> None: 

557 self.text = text 

558 self.style = style 

559 

560 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

561 # Insert fragments after the last line. 

562 if ti.lineno == ti.document.line_count - 1: 

563 # Get fragments. 

564 fragments_after = to_formatted_text(self.text, self.style) 

565 return Transformation(fragments=ti.fragments + fragments_after) 

566 else: 

567 return Transformation(fragments=ti.fragments) 

568 

569 def __repr__(self) -> str: 

570 return f"{self.__class__.__name__}({self.text!r}, style={self.style!r})" 

571 

572 

573class AppendAutoSuggestion(Processor): 

574 """ 

575 Append the auto suggestion to the input. 

576 (The user can then press the right arrow the insert the suggestion.) 

577 """ 

578 

579 def __init__(self, style: str = "class:auto-suggestion") -> None: 

580 self.style = style 

581 

582 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

583 # Insert fragments after the last line. 

584 if ti.lineno == ti.document.line_count - 1: 

585 buffer = ti.buffer_control.buffer 

586 

587 if buffer.suggestion and ti.document.is_cursor_at_the_end: 

588 suggestion = buffer.suggestion.text 

589 else: 

590 suggestion = "" 

591 

592 return Transformation(fragments=ti.fragments + [(self.style, suggestion)]) 

593 else: 

594 return Transformation(fragments=ti.fragments) 

595 

596 

597class ShowLeadingWhiteSpaceProcessor(Processor): 

598 """ 

599 Make leading whitespace visible. 

600 

601 :param get_char: Callable that returns one character. 

602 """ 

603 

604 def __init__( 

605 self, 

606 get_char: Callable[[], str] | None = None, 

607 style: str = "class:leading-whitespace", 

608 ) -> None: 

609 def default_get_char() -> str: 

610 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?": 

611 return "." 

612 else: 

613 return "\xb7" 

614 

615 self.style = style 

616 self.get_char = get_char or default_get_char 

617 

618 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

619 fragments = ti.fragments 

620 

621 # Walk through all te fragments. 

622 if fragments and fragment_list_to_text(fragments).startswith(" "): 

623 t = (self.style, self.get_char()) 

624 fragments = explode_text_fragments(fragments) 

625 

626 for i in range(len(fragments)): 

627 if fragments[i][1] == " ": 

628 fragments[i] = t 

629 else: 

630 break 

631 

632 return Transformation(fragments) 

633 

634 

635class ShowTrailingWhiteSpaceProcessor(Processor): 

636 """ 

637 Make trailing whitespace visible. 

638 

639 :param get_char: Callable that returns one character. 

640 """ 

641 

642 def __init__( 

643 self, 

644 get_char: Callable[[], str] | None = None, 

645 style: str = "class:training-whitespace", 

646 ) -> None: 

647 def default_get_char() -> str: 

648 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?": 

649 return "." 

650 else: 

651 return "\xb7" 

652 

653 self.style = style 

654 self.get_char = get_char or default_get_char 

655 

656 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

657 fragments = ti.fragments 

658 

659 if fragments and fragments[-1][1].endswith(" "): 

660 t = (self.style, self.get_char()) 

661 fragments = explode_text_fragments(fragments) 

662 

663 # Walk backwards through all te fragments and replace whitespace. 

664 for i in range(len(fragments) - 1, -1, -1): 

665 char = fragments[i][1] 

666 if char == " ": 

667 fragments[i] = t 

668 else: 

669 break 

670 

671 return Transformation(fragments) 

672 

673 

674class TabsProcessor(Processor): 

675 """ 

676 Render tabs as spaces (instead of ^I) or make them visible (for instance, 

677 by replacing them with dots.) 

678 

679 :param tabstop: Horizontal space taken by a tab. (`int` or callable that 

680 returns an `int`). 

681 :param char1: Character or callable that returns a character (text of 

682 length one). This one is used for the first space taken by the tab. 

683 :param char2: Like `char1`, but for the rest of the space. 

684 """ 

685 

686 def __init__( 

687 self, 

688 tabstop: int | Callable[[], int] = 4, 

689 char1: str | Callable[[], str] = "|", 

690 char2: str | Callable[[], str] = "\u2508", 

691 style: str = "class:tab", 

692 ) -> None: 

693 self.char1 = char1 

694 self.char2 = char2 

695 self.tabstop = tabstop 

696 self.style = style 

697 

698 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

699 tabstop = to_int(self.tabstop) 

700 style = self.style 

701 

702 # Create separator for tabs. 

703 separator1 = to_str(self.char1) 

704 separator2 = to_str(self.char2) 

705 

706 # Transform fragments. 

707 fragments = explode_text_fragments(ti.fragments) 

708 

709 position_mappings = {} 

710 result_fragments: StyleAndTextTuples = [] 

711 pos = 0 

712 

713 for i, fragment_and_text in enumerate(fragments): 

714 position_mappings[i] = pos 

715 

716 if fragment_and_text[1] == "\t": 

717 # Calculate how many characters we have to insert. 

718 count = tabstop - (pos % tabstop) 

719 if count == 0: 

720 count = tabstop 

721 

722 # Insert tab. 

723 result_fragments.append((style, separator1)) 

724 result_fragments.append((style, separator2 * (count - 1))) 

725 pos += count 

726 else: 

727 result_fragments.append(fragment_and_text) 

728 pos += 1 

729 

730 position_mappings[len(fragments)] = pos 

731 # Add `pos+1` to mapping, because the cursor can be right after the 

732 # line as well. 

733 position_mappings[len(fragments) + 1] = pos + 1 

734 

735 def source_to_display(from_position: int) -> int: 

736 "Maps original cursor position to the new one." 

737 return position_mappings[from_position] 

738 

739 def display_to_source(display_pos: int) -> int: 

740 "Maps display cursor position to the original one." 

741 position_mappings_reversed = {v: k for k, v in position_mappings.items()} 

742 

743 while display_pos >= 0: 

744 try: 

745 return position_mappings_reversed[display_pos] 

746 except KeyError: 

747 display_pos -= 1 

748 return 0 

749 

750 return Transformation( 

751 result_fragments, 

752 source_to_display=source_to_display, 

753 display_to_source=display_to_source, 

754 ) 

755 

756 

757class ReverseSearchProcessor(Processor): 

758 """ 

759 Process to display the "(reverse-i-search)`...`:..." stuff around 

760 the search buffer. 

761 

762 Note: This processor is meant to be applied to the BufferControl that 

763 contains the search buffer, it's not meant for the original input. 

764 """ 

765 

766 _excluded_input_processors: list[type[Processor]] = [ 

767 HighlightSearchProcessor, 

768 HighlightSelectionProcessor, 

769 BeforeInput, 

770 AfterInput, 

771 ] 

772 

773 def _get_main_buffer(self, buffer_control: BufferControl) -> BufferControl | None: 

774 from prompt_toolkit.layout.controls import BufferControl 

775 

776 prev_control = get_app().layout.search_target_buffer_control 

777 if ( 

778 isinstance(prev_control, BufferControl) 

779 and prev_control.search_buffer_control == buffer_control 

780 ): 

781 return prev_control 

782 return None 

783 

784 def _content( 

785 self, main_control: BufferControl, ti: TransformationInput 

786 ) -> UIContent: 

787 from prompt_toolkit.layout.controls import BufferControl 

788 

789 # Emulate the BufferControl through which we are searching. 

790 # For this we filter out some of the input processors. 

791 excluded_processors = tuple(self._excluded_input_processors) 

792 

793 def filter_processor(item: Processor) -> Processor | None: 

794 """Filter processors from the main control that we want to disable 

795 here. This returns either an accepted processor or None.""" 

796 # For a `_MergedProcessor`, check each individual processor, recursively. 

797 if isinstance(item, _MergedProcessor): 

798 accepted_processors = [filter_processor(p) for p in item.processors] 

799 return merge_processors( 

800 [p for p in accepted_processors if p is not None] 

801 ) 

802 

803 # For a `ConditionalProcessor`, check the body. 

804 elif isinstance(item, ConditionalProcessor): 

805 p = filter_processor(item.processor) 

806 if p: 

807 return ConditionalProcessor(p, item.filter) 

808 

809 # Otherwise, check the processor itself. 

810 else: 

811 if not isinstance(item, excluded_processors): 

812 return item 

813 

814 return None 

815 

816 filtered_processor = filter_processor( 

817 merge_processors(main_control.input_processors or []) 

818 ) 

819 highlight_processor = HighlightIncrementalSearchProcessor() 

820 

821 if filtered_processor: 

822 new_processors = [filtered_processor, highlight_processor] 

823 else: 

824 new_processors = [highlight_processor] 

825 

826 from .controls import SearchBufferControl 

827 

828 assert isinstance(ti.buffer_control, SearchBufferControl) 

829 

830 buffer_control = BufferControl( 

831 buffer=main_control.buffer, 

832 input_processors=new_processors, 

833 include_default_input_processors=False, 

834 lexer=main_control.lexer, 

835 preview_search=True, 

836 search_buffer_control=ti.buffer_control, 

837 ) 

838 

839 return buffer_control.create_content(ti.width, ti.height, preview_search=True) 

840 

841 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

842 from .controls import SearchBufferControl 

843 

844 assert isinstance( 

845 ti.buffer_control, SearchBufferControl 

846 ), "`ReverseSearchProcessor` should be applied to a `SearchBufferControl` only." 

847 

848 source_to_display: SourceToDisplay | None 

849 display_to_source: DisplayToSource | None 

850 

851 main_control = self._get_main_buffer(ti.buffer_control) 

852 

853 if ti.lineno == 0 and main_control: 

854 content = self._content(main_control, ti) 

855 

856 # Get the line from the original document for this search. 

857 line_fragments = content.get_line(content.cursor_position.y) 

858 

859 if main_control.search_state.direction == SearchDirection.FORWARD: 

860 direction_text = "i-search" 

861 else: 

862 direction_text = "reverse-i-search" 

863 

864 fragments_before: StyleAndTextTuples = [ 

865 ("class:prompt.search", "("), 

866 ("class:prompt.search", direction_text), 

867 ("class:prompt.search", ")`"), 

868 ] 

869 

870 fragments = ( 

871 fragments_before 

872 + [ 

873 ("class:prompt.search.text", fragment_list_to_text(ti.fragments)), 

874 ("", "': "), 

875 ] 

876 + line_fragments 

877 ) 

878 

879 shift_position = fragment_list_len(fragments_before) 

880 source_to_display = lambda i: i + shift_position 

881 display_to_source = lambda i: i - shift_position 

882 else: 

883 source_to_display = None 

884 display_to_source = None 

885 fragments = ti.fragments 

886 

887 return Transformation( 

888 fragments, 

889 source_to_display=source_to_display, 

890 display_to_source=display_to_source, 

891 ) 

892 

893 

894class ConditionalProcessor(Processor): 

895 """ 

896 Processor that applies another processor, according to a certain condition. 

897 Example:: 

898 

899 # Create a function that returns whether or not the processor should 

900 # currently be applied. 

901 def highlight_enabled(): 

902 return true_or_false 

903 

904 # Wrapped it in a `ConditionalProcessor` for usage in a `BufferControl`. 

905 BufferControl(input_processors=[ 

906 ConditionalProcessor(HighlightSearchProcessor(), 

907 Condition(highlight_enabled))]) 

908 

909 :param processor: :class:`.Processor` instance. 

910 :param filter: :class:`~prompt_toolkit.filters.Filter` instance. 

911 """ 

912 

913 def __init__(self, processor: Processor, filter: FilterOrBool) -> None: 

914 self.processor = processor 

915 self.filter = to_filter(filter) 

916 

917 def apply_transformation( 

918 self, transformation_input: TransformationInput 

919 ) -> Transformation: 

920 # Run processor when enabled. 

921 if self.filter(): 

922 return self.processor.apply_transformation(transformation_input) 

923 else: 

924 return Transformation(transformation_input.fragments) 

925 

926 def __repr__(self) -> str: 

927 return "{}(processor={!r}, filter={!r})".format( 

928 self.__class__.__name__, 

929 self.processor, 

930 self.filter, 

931 ) 

932 

933 

934class DynamicProcessor(Processor): 

935 """ 

936 Processor class that dynamically returns any Processor. 

937 

938 :param get_processor: Callable that returns a :class:`.Processor` instance. 

939 """ 

940 

941 def __init__(self, get_processor: Callable[[], Processor | None]) -> None: 

942 self.get_processor = get_processor 

943 

944 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

945 processor = self.get_processor() or DummyProcessor() 

946 return processor.apply_transformation(ti) 

947 

948 

949def merge_processors(processors: list[Processor]) -> Processor: 

950 """ 

951 Merge multiple `Processor` objects into one. 

952 """ 

953 if len(processors) == 0: 

954 return DummyProcessor() 

955 

956 if len(processors) == 1: 

957 return processors[0] # Nothing to merge. 

958 

959 return _MergedProcessor(processors) 

960 

961 

962class _MergedProcessor(Processor): 

963 """ 

964 Processor that groups multiple other `Processor` objects, but exposes an 

965 API as if it is one `Processor`. 

966 """ 

967 

968 def __init__(self, processors: list[Processor]): 

969 self.processors = processors 

970 

971 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

972 source_to_display_functions = [ti.source_to_display] 

973 display_to_source_functions = [] 

974 fragments = ti.fragments 

975 

976 def source_to_display(i: int) -> int: 

977 """Translate x position from the buffer to the x position in the 

978 processor fragments list.""" 

979 for f in source_to_display_functions: 

980 i = f(i) 

981 return i 

982 

983 for p in self.processors: 

984 transformation = p.apply_transformation( 

985 TransformationInput( 

986 ti.buffer_control, 

987 ti.document, 

988 ti.lineno, 

989 source_to_display, 

990 fragments, 

991 ti.width, 

992 ti.height, 

993 ) 

994 ) 

995 fragments = transformation.fragments 

996 display_to_source_functions.append(transformation.display_to_source) 

997 source_to_display_functions.append(transformation.source_to_display) 

998 

999 def display_to_source(i: int) -> int: 

1000 for f in reversed(display_to_source_functions): 

1001 i = f(i) 

1002 return i 

1003 

1004 # In the case of a nested _MergedProcessor, each processor wants to 

1005 # receive a 'source_to_display' function (as part of the 

1006 # TransformationInput) that has everything in the chain before 

1007 # included, because it can be called as part of the 

1008 # `apply_transformation` function. However, this first 

1009 # `source_to_display` should not be part of the output that we are 

1010 # returning. (This is the most consistent with `display_to_source`.) 

1011 del source_to_display_functions[:1] 

1012 

1013 return Transformation(fragments, source_to_display, display_to_source)