1"""
2Processors are little transformation blocks that transform the fragments list
3from a buffer before the BufferControl will render it to the screen.
4
5They can insert fragments before or after, or highlight fragments by replacing the
6fragment types.
7"""
8
9from __future__ import annotations
10
11import re
12from abc import ABCMeta, abstractmethod
13from collections.abc import Callable, Hashable
14from typing import TYPE_CHECKING, cast
15
16from prompt_toolkit.application.current import get_app
17from prompt_toolkit.cache import SimpleCache
18from prompt_toolkit.document import Document
19from prompt_toolkit.filters import FilterOrBool, to_filter, vi_insert_multiple_mode
20from prompt_toolkit.formatted_text import (
21 AnyFormattedText,
22 StyleAndTextTuples,
23 to_formatted_text,
24)
25from prompt_toolkit.formatted_text.utils import fragment_list_len, fragment_list_to_text
26from prompt_toolkit.search import SearchDirection
27from prompt_toolkit.utils import to_int, to_str
28
29from .utils import explode_text_fragments
30
31if TYPE_CHECKING:
32 from .controls import BufferControl, UIContent
33
34__all__ = [
35 "Processor",
36 "TransformationInput",
37 "Transformation",
38 "DummyProcessor",
39 "HighlightSearchProcessor",
40 "HighlightIncrementalSearchProcessor",
41 "HighlightSelectionProcessor",
42 "PasswordProcessor",
43 "HighlightMatchingBracketProcessor",
44 "DisplayMultipleCursors",
45 "BeforeInput",
46 "ShowArg",
47 "AfterInput",
48 "AppendAutoSuggestion",
49 "ConditionalProcessor",
50 "ShowLeadingWhiteSpaceProcessor",
51 "ShowTrailingWhiteSpaceProcessor",
52 "TabsProcessor",
53 "ReverseSearchProcessor",
54 "DynamicProcessor",
55 "merge_processors",
56]
57
58
59class Processor(metaclass=ABCMeta):
60 """
61 Manipulate the fragments for a given line in a
62 :class:`~prompt_toolkit.layout.controls.BufferControl`.
63 """
64
65 @abstractmethod
66 def apply_transformation(
67 self, transformation_input: TransformationInput
68 ) -> Transformation:
69 """
70 Apply transformation. Returns a :class:`.Transformation` instance.
71
72 :param transformation_input: :class:`.TransformationInput` object.
73 """
74 return Transformation(transformation_input.fragments)
75
76
77SourceToDisplay = Callable[[int], int]
78DisplayToSource = Callable[[int], int]
79
80
81class TransformationInput:
82 """
83 :param buffer_control: :class:`.BufferControl` instance.
84 :param lineno: The number of the line to which we apply the processor.
85 :param source_to_display: A function that returns the position in the
86 `fragments` for any position in the source string. (This takes
87 previous processors into account.)
88 :param fragments: List of fragments that we can transform. (Received from the
89 previous processor.)
90 :param get_line: Optional ; a callable that returns the fragments of another
91 line in the current buffer; This can be used to create processors capable
92 of affecting transforms across multiple lines.
93 """
94
95 def __init__(
96 self,
97 buffer_control: BufferControl,
98 document: Document,
99 lineno: int,
100 source_to_display: SourceToDisplay,
101 fragments: StyleAndTextTuples,
102 width: int,
103 height: int,
104 get_line: Callable[[int], StyleAndTextTuples] | None = None,
105 ) -> None:
106 self.buffer_control = buffer_control
107 self.document = document
108 self.lineno = lineno
109 self.source_to_display = source_to_display
110 self.fragments = fragments
111 self.width = width
112 self.height = height
113 self.get_line = get_line
114
115 def unpack(
116 self,
117 ) -> tuple[
118 BufferControl, Document, int, SourceToDisplay, StyleAndTextTuples, int, int
119 ]:
120 return (
121 self.buffer_control,
122 self.document,
123 self.lineno,
124 self.source_to_display,
125 self.fragments,
126 self.width,
127 self.height,
128 )
129
130
131class Transformation:
132 """
133 Transformation result, as returned by :meth:`.Processor.apply_transformation`.
134
135 Important: Always make sure that the length of `document.text` is equal to
136 the length of all the text in `fragments`!
137
138 :param fragments: The transformed fragments. To be displayed, or to pass to
139 the next processor.
140 :param source_to_display: Cursor position transformation from original
141 string to transformed string.
142 :param display_to_source: Cursor position transformed from source string to
143 original string.
144 """
145
146 def __init__(
147 self,
148 fragments: StyleAndTextTuples,
149 source_to_display: SourceToDisplay | None = None,
150 display_to_source: DisplayToSource | None = None,
151 ) -> None:
152 self.fragments = fragments
153 self.source_to_display = source_to_display or (lambda i: i)
154 self.display_to_source = display_to_source or (lambda i: i)
155
156
157class DummyProcessor(Processor):
158 """
159 A `Processor` that doesn't do anything.
160 """
161
162 def apply_transformation(
163 self, transformation_input: TransformationInput
164 ) -> Transformation:
165 return Transformation(transformation_input.fragments)
166
167
168class HighlightSearchProcessor(Processor):
169 """
170 Processor that highlights search matches in the document.
171 Note that this doesn't support multiline search matches yet.
172
173 The style classes 'search' and 'search.current' will be applied to the
174 content.
175 """
176
177 _classname = "search"
178 _classname_current = "search.current"
179
180 def _get_search_text(self, buffer_control: BufferControl) -> str:
181 """
182 The text we are searching for.
183 """
184 return buffer_control.search_state.text
185
186 def apply_transformation(
187 self, transformation_input: TransformationInput
188 ) -> Transformation:
189 (
190 buffer_control,
191 document,
192 lineno,
193 source_to_display,
194 fragments,
195 _,
196 _,
197 ) = transformation_input.unpack()
198
199 search_text = self._get_search_text(buffer_control)
200 searchmatch_fragment = f" class:{self._classname} "
201 searchmatch_current_fragment = f" class:{self._classname_current} "
202
203 if search_text and not get_app().is_done:
204 # For each search match, replace the style string.
205 line_text = fragment_list_to_text(fragments)
206 fragments = explode_text_fragments(fragments)
207
208 if buffer_control.search_state.ignore_case():
209 flags = re.IGNORECASE
210 else:
211 flags = re.RegexFlag(0)
212
213 # Get cursor column.
214 cursor_column: int | None
215 if document.cursor_position_row == lineno:
216 cursor_column = source_to_display(document.cursor_position_col)
217 else:
218 cursor_column = None
219
220 for match in re.finditer(re.escape(search_text), line_text, flags=flags):
221 if cursor_column is not None:
222 on_cursor = match.start() <= cursor_column < match.end()
223 else:
224 on_cursor = False
225
226 for i in range(match.start(), match.end()):
227 old_fragment, text, *_ = fragments[i]
228 if on_cursor:
229 fragments[i] = (
230 old_fragment + searchmatch_current_fragment,
231 fragments[i][1],
232 )
233 else:
234 fragments[i] = (
235 old_fragment + searchmatch_fragment,
236 fragments[i][1],
237 )
238
239 return Transformation(fragments)
240
241
242class HighlightIncrementalSearchProcessor(HighlightSearchProcessor):
243 """
244 Highlight the search terms that are used for highlighting the incremental
245 search. The style class 'incsearch' will be applied to the content.
246
247 Important: this requires the `preview_search=True` flag to be set for the
248 `BufferControl`. Otherwise, the cursor position won't be set to the search
249 match while searching, and nothing happens.
250 """
251
252 _classname = "incsearch"
253 _classname_current = "incsearch.current"
254
255 def _get_search_text(self, buffer_control: BufferControl) -> str:
256 """
257 The text we are searching for.
258 """
259 # When the search buffer has focus, take that text.
260 search_buffer = buffer_control.search_buffer
261 if search_buffer is not None and search_buffer.text:
262 return search_buffer.text
263 return ""
264
265
266class HighlightSelectionProcessor(Processor):
267 """
268 Processor that highlights the selection in the document.
269 """
270
271 def apply_transformation(
272 self, transformation_input: TransformationInput
273 ) -> Transformation:
274 (
275 buffer_control,
276 document,
277 lineno,
278 source_to_display,
279 fragments,
280 _,
281 _,
282 ) = transformation_input.unpack()
283
284 selected_fragment = " class:selected "
285
286 # In case of selection, highlight all matches.
287 selection_at_line = document.selection_range_at_line(lineno)
288
289 if selection_at_line:
290 from_, to = selection_at_line
291 from_ = source_to_display(from_)
292 to = source_to_display(to)
293
294 fragments = explode_text_fragments(fragments)
295
296 if from_ == 0 and to == 0 and len(fragments) == 0:
297 # When this is an empty line, insert a space in order to
298 # visualize the selection.
299 return Transformation([(selected_fragment, " ")])
300 else:
301 for i in range(from_, to):
302 if i < len(fragments):
303 old_fragment, old_text, *_ = fragments[i]
304 fragments[i] = (old_fragment + selected_fragment, old_text)
305 elif i == len(fragments):
306 fragments.append((selected_fragment, " "))
307
308 return Transformation(fragments)
309
310
311class PasswordProcessor(Processor):
312 """
313 Processor that masks the input. (For passwords.)
314
315 :param char: (string) Character to be used. "*" by default.
316 """
317
318 def __init__(self, char: str = "*") -> None:
319 self.char = char
320
321 def apply_transformation(self, ti: TransformationInput) -> Transformation:
322 fragments: StyleAndTextTuples = cast(
323 StyleAndTextTuples,
324 [
325 (style, self.char * len(text), *handler)
326 for style, text, *handler in ti.fragments
327 ],
328 )
329
330 return Transformation(fragments)
331
332
333class HighlightMatchingBracketProcessor(Processor):
334 """
335 When the cursor is on or right after a bracket, it highlights the matching
336 bracket.
337
338 :param max_cursor_distance: Only highlight matching brackets when the
339 cursor is within this distance. (From inside a `Processor`, we can't
340 know which lines will be visible on the screen. But we also don't want
341 to scan the whole document for matching brackets on each key press, so
342 we limit to this value.)
343 """
344
345 _closing_braces = "])}>"
346
347 def __init__(
348 self, chars: str = "[](){}<>", max_cursor_distance: int = 1000
349 ) -> None:
350 self.chars = chars
351 self.max_cursor_distance = max_cursor_distance
352
353 self._positions_cache: SimpleCache[Hashable, list[tuple[int, int]]] = (
354 SimpleCache(maxsize=8)
355 )
356
357 def _get_positions_to_highlight(self, document: Document) -> list[tuple[int, int]]:
358 """
359 Return a list of (row, col) tuples that need to be highlighted.
360 """
361 pos: int | None
362
363 # Try for the character under the cursor.
364 if document.current_char and document.current_char in self.chars:
365 pos = document.find_matching_bracket_position(
366 start_pos=document.cursor_position - self.max_cursor_distance,
367 end_pos=document.cursor_position + self.max_cursor_distance,
368 )
369
370 # Try for the character before the cursor.
371 elif (
372 document.char_before_cursor
373 and document.char_before_cursor in self._closing_braces
374 and document.char_before_cursor in self.chars
375 ):
376 document = Document(document.text, document.cursor_position - 1)
377
378 pos = document.find_matching_bracket_position(
379 start_pos=document.cursor_position - self.max_cursor_distance,
380 end_pos=document.cursor_position + self.max_cursor_distance,
381 )
382 else:
383 pos = None
384
385 # Return a list of (row, col) tuples that need to be highlighted.
386 if pos:
387 pos += document.cursor_position # pos is relative.
388 row, col = document.translate_index_to_position(pos)
389 return [
390 (row, col),
391 (document.cursor_position_row, document.cursor_position_col),
392 ]
393 else:
394 return []
395
396 def apply_transformation(
397 self, transformation_input: TransformationInput
398 ) -> Transformation:
399 (
400 buffer_control,
401 document,
402 lineno,
403 source_to_display,
404 fragments,
405 _,
406 _,
407 ) = transformation_input.unpack()
408
409 # When the application is in the 'done' state, don't highlight.
410 if get_app().is_done:
411 return Transformation(fragments)
412
413 # Get the highlight positions.
414 key = (get_app().render_counter, document.text, document.cursor_position)
415 positions = self._positions_cache.get(
416 key, lambda: self._get_positions_to_highlight(document)
417 )
418
419 # Apply if positions were found at this line.
420 if positions:
421 for row, col in positions:
422 if row == lineno:
423 col = source_to_display(col)
424 fragments = explode_text_fragments(fragments)
425 style, text, *_ = fragments[col]
426
427 if col == document.cursor_position_col:
428 style += " class:matching-bracket.cursor "
429 else:
430 style += " class:matching-bracket.other "
431
432 fragments[col] = (style, text)
433
434 return Transformation(fragments)
435
436
437class DisplayMultipleCursors(Processor):
438 """
439 When we're in Vi block insert mode, display all the cursors.
440 """
441
442 def apply_transformation(
443 self, transformation_input: TransformationInput
444 ) -> Transformation:
445 (
446 buffer_control,
447 document,
448 lineno,
449 source_to_display,
450 fragments,
451 _,
452 _,
453 ) = transformation_input.unpack()
454
455 buff = buffer_control.buffer
456
457 if vi_insert_multiple_mode():
458 cursor_positions = buff.multiple_cursor_positions
459 fragments = explode_text_fragments(fragments)
460
461 # If any cursor appears on the current line, highlight that.
462 start_pos = document.translate_row_col_to_index(lineno, 0)
463 end_pos = start_pos + len(document.lines[lineno])
464
465 fragment_suffix = " class:multiple-cursors"
466
467 for p in cursor_positions:
468 if start_pos <= p <= end_pos:
469 column = source_to_display(p - start_pos)
470
471 # Replace fragment.
472 try:
473 style, text, *_ = fragments[column]
474 except IndexError:
475 # Cursor needs to be displayed after the current text.
476 fragments.append((fragment_suffix, " "))
477 else:
478 style += fragment_suffix
479 fragments[column] = (style, text)
480
481 return Transformation(fragments)
482 else:
483 return Transformation(fragments)
484
485
486class BeforeInput(Processor):
487 """
488 Insert text before the input.
489
490 :param text: This can be either plain text or formatted text
491 (or a callable that returns any of those).
492 :param style: style to be applied to this prompt/prefix.
493 """
494
495 def __init__(self, text: AnyFormattedText, style: str = "") -> None:
496 self.text = text
497 self.style = style
498
499 def apply_transformation(self, ti: TransformationInput) -> Transformation:
500 source_to_display: SourceToDisplay | None
501 display_to_source: DisplayToSource | None
502
503 if ti.lineno == 0:
504 # Get fragments.
505 fragments_before = to_formatted_text(self.text, self.style)
506 fragments = fragments_before + ti.fragments
507
508 shift_position = fragment_list_len(fragments_before)
509 source_to_display = lambda i: i + shift_position
510 display_to_source = lambda i: i - shift_position
511 else:
512 fragments = ti.fragments
513 source_to_display = None
514 display_to_source = None
515
516 return Transformation(
517 fragments,
518 source_to_display=source_to_display,
519 display_to_source=display_to_source,
520 )
521
522 def __repr__(self) -> str:
523 return f"BeforeInput({self.text!r}, {self.style!r})"
524
525
526class ShowArg(BeforeInput):
527 """
528 Display the 'arg' in front of the input.
529
530 This was used by the `PromptSession`, but now it uses the
531 `Window.get_line_prefix` function instead.
532 """
533
534 def __init__(self) -> None:
535 super().__init__(self._get_text_fragments)
536
537 def _get_text_fragments(self) -> StyleAndTextTuples:
538 app = get_app()
539 if app.key_processor.arg is None:
540 return []
541 else:
542 arg = app.key_processor.arg
543
544 return [
545 ("class:prompt.arg", "(arg: "),
546 ("class:prompt.arg.text", str(arg)),
547 ("class:prompt.arg", ") "),
548 ]
549
550 def __repr__(self) -> str:
551 return "ShowArg()"
552
553
554class AfterInput(Processor):
555 """
556 Insert text after the input.
557
558 :param text: This can be either plain text or formatted text
559 (or a callable that returns any of those).
560 :param style: style to be applied to this prompt/prefix.
561 """
562
563 def __init__(self, text: AnyFormattedText, style: str = "") -> None:
564 self.text = text
565 self.style = style
566
567 def apply_transformation(self, ti: TransformationInput) -> Transformation:
568 # Insert fragments after the last line.
569 if ti.lineno == ti.document.line_count - 1:
570 # Get fragments.
571 fragments_after = to_formatted_text(self.text, self.style)
572 return Transformation(fragments=ti.fragments + fragments_after)
573 else:
574 return Transformation(fragments=ti.fragments)
575
576 def __repr__(self) -> str:
577 return f"{self.__class__.__name__}({self.text!r}, style={self.style!r})"
578
579
580class AppendAutoSuggestion(Processor):
581 """
582 Append the auto suggestion to the input.
583 (The user can then press the right arrow the insert the suggestion.)
584 """
585
586 def __init__(self, style: str = "class:auto-suggestion") -> None:
587 self.style = style
588
589 def apply_transformation(self, ti: TransformationInput) -> Transformation:
590 # Insert fragments after the last line.
591 if ti.lineno == ti.document.line_count - 1:
592 buffer = ti.buffer_control.buffer
593
594 if buffer.suggestion and ti.document.is_cursor_at_the_end:
595 suggestion = buffer.suggestion.text
596 else:
597 suggestion = ""
598
599 return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
600 else:
601 return Transformation(fragments=ti.fragments)
602
603
604class ShowLeadingWhiteSpaceProcessor(Processor):
605 """
606 Make leading whitespace visible.
607
608 :param get_char: Callable that returns one character.
609 """
610
611 def __init__(
612 self,
613 get_char: Callable[[], str] | None = None,
614 style: str = "class:leading-whitespace",
615 ) -> None:
616 def default_get_char() -> str:
617 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?":
618 return "."
619 else:
620 return "\xb7"
621
622 self.style = style
623 self.get_char = get_char or default_get_char
624
625 def apply_transformation(self, ti: TransformationInput) -> Transformation:
626 fragments = ti.fragments
627
628 # Walk through all te fragments.
629 if fragments and fragment_list_to_text(fragments).startswith(" "):
630 t = (self.style, self.get_char())
631 fragments = explode_text_fragments(fragments)
632
633 for i in range(len(fragments)):
634 if fragments[i][1] == " ":
635 fragments[i] = t
636 else:
637 break
638
639 return Transformation(fragments)
640
641
642class ShowTrailingWhiteSpaceProcessor(Processor):
643 """
644 Make trailing whitespace visible.
645
646 :param get_char: Callable that returns one character.
647 """
648
649 def __init__(
650 self,
651 get_char: Callable[[], str] | None = None,
652 style: str = "class:training-whitespace",
653 ) -> None:
654 def default_get_char() -> str:
655 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?":
656 return "."
657 else:
658 return "\xb7"
659
660 self.style = style
661 self.get_char = get_char or default_get_char
662
663 def apply_transformation(self, ti: TransformationInput) -> Transformation:
664 fragments = ti.fragments
665
666 if fragments and fragments[-1][1].endswith(" "):
667 t = (self.style, self.get_char())
668 fragments = explode_text_fragments(fragments)
669
670 # Walk backwards through all te fragments and replace whitespace.
671 for i in range(len(fragments) - 1, -1, -1):
672 char = fragments[i][1]
673 if char == " ":
674 fragments[i] = t
675 else:
676 break
677
678 return Transformation(fragments)
679
680
681class TabsProcessor(Processor):
682 """
683 Render tabs as spaces (instead of ^I) or make them visible (for instance,
684 by replacing them with dots.)
685
686 :param tabstop: Horizontal space taken by a tab. (`int` or callable that
687 returns an `int`).
688 :param char1: Character or callable that returns a character (text of
689 length one). This one is used for the first space taken by the tab.
690 :param char2: Like `char1`, but for the rest of the space.
691 """
692
693 def __init__(
694 self,
695 tabstop: int | Callable[[], int] = 4,
696 char1: str | Callable[[], str] = "|",
697 char2: str | Callable[[], str] = "\u2508",
698 style: str = "class:tab",
699 ) -> None:
700 self.char1 = char1
701 self.char2 = char2
702 self.tabstop = tabstop
703 self.style = style
704
705 def apply_transformation(self, ti: TransformationInput) -> Transformation:
706 tabstop = to_int(self.tabstop)
707 style = self.style
708
709 # Create separator for tabs.
710 separator1 = to_str(self.char1)
711 separator2 = to_str(self.char2)
712
713 # Transform fragments.
714 fragments = explode_text_fragments(ti.fragments)
715
716 position_mappings = {}
717 result_fragments: StyleAndTextTuples = []
718 pos = 0
719
720 for i, fragment_and_text in enumerate(fragments):
721 position_mappings[i] = pos
722
723 if fragment_and_text[1] == "\t":
724 # Calculate how many characters we have to insert.
725 count = tabstop - (pos % tabstop)
726 if count == 0:
727 count = tabstop
728
729 # Insert tab.
730 result_fragments.append((style, separator1))
731 result_fragments.append((style, separator2 * (count - 1)))
732 pos += count
733 else:
734 result_fragments.append(fragment_and_text)
735 pos += 1
736
737 position_mappings[len(fragments)] = pos
738 # Add `pos+1` to mapping, because the cursor can be right after the
739 # line as well.
740 position_mappings[len(fragments) + 1] = pos + 1
741
742 def source_to_display(from_position: int) -> int:
743 "Maps original cursor position to the new one."
744 return position_mappings[from_position]
745
746 def display_to_source(display_pos: int) -> int:
747 "Maps display cursor position to the original one."
748 position_mappings_reversed = {v: k for k, v in position_mappings.items()}
749
750 while display_pos >= 0:
751 try:
752 return position_mappings_reversed[display_pos]
753 except KeyError:
754 display_pos -= 1
755 return 0
756
757 return Transformation(
758 result_fragments,
759 source_to_display=source_to_display,
760 display_to_source=display_to_source,
761 )
762
763
764class ReverseSearchProcessor(Processor):
765 """
766 Process to display the "(reverse-i-search)`...`:..." stuff around
767 the search buffer.
768
769 Note: This processor is meant to be applied to the BufferControl that
770 contains the search buffer, it's not meant for the original input.
771 """
772
773 _excluded_input_processors: list[type[Processor]] = [
774 HighlightSearchProcessor,
775 HighlightSelectionProcessor,
776 BeforeInput,
777 AfterInput,
778 ]
779
780 def _get_main_buffer(self, buffer_control: BufferControl) -> BufferControl | None:
781 from prompt_toolkit.layout.controls import BufferControl
782
783 prev_control = get_app().layout.search_target_buffer_control
784 if (
785 isinstance(prev_control, BufferControl)
786 and prev_control.search_buffer_control == buffer_control
787 ):
788 return prev_control
789 return None
790
791 def _content(
792 self, main_control: BufferControl, ti: TransformationInput
793 ) -> UIContent:
794 from prompt_toolkit.layout.controls import BufferControl
795
796 # Emulate the BufferControl through which we are searching.
797 # For this we filter out some of the input processors.
798 excluded_processors = tuple(self._excluded_input_processors)
799
800 def filter_processor(item: Processor) -> Processor | None:
801 """Filter processors from the main control that we want to disable
802 here. This returns either an accepted processor or None."""
803 # For a `_MergedProcessor`, check each individual processor, recursively.
804 if isinstance(item, _MergedProcessor):
805 accepted_processors = [filter_processor(p) for p in item.processors]
806 return merge_processors(
807 [p for p in accepted_processors if p is not None]
808 )
809
810 # For a `ConditionalProcessor`, check the body.
811 elif isinstance(item, ConditionalProcessor):
812 p = filter_processor(item.processor)
813 if p:
814 return ConditionalProcessor(p, item.filter)
815
816 # Otherwise, check the processor itself.
817 else:
818 if not isinstance(item, excluded_processors):
819 return item
820
821 return None
822
823 filtered_processor = filter_processor(
824 merge_processors(main_control.input_processors or [])
825 )
826 highlight_processor = HighlightIncrementalSearchProcessor()
827
828 if filtered_processor:
829 new_processors = [filtered_processor, highlight_processor]
830 else:
831 new_processors = [highlight_processor]
832
833 from .controls import SearchBufferControl
834
835 assert isinstance(ti.buffer_control, SearchBufferControl)
836
837 buffer_control = BufferControl(
838 buffer=main_control.buffer,
839 input_processors=new_processors,
840 include_default_input_processors=False,
841 lexer=main_control.lexer,
842 preview_search=True,
843 search_buffer_control=ti.buffer_control,
844 )
845
846 return buffer_control.create_content(ti.width, ti.height, preview_search=True)
847
848 def apply_transformation(self, ti: TransformationInput) -> Transformation:
849 from .controls import SearchBufferControl
850
851 assert isinstance(ti.buffer_control, SearchBufferControl), (
852 "`ReverseSearchProcessor` should be applied to a `SearchBufferControl` only."
853 )
854
855 source_to_display: SourceToDisplay | None
856 display_to_source: DisplayToSource | None
857
858 main_control = self._get_main_buffer(ti.buffer_control)
859
860 if ti.lineno == 0 and main_control:
861 content = self._content(main_control, ti)
862
863 # Get the line from the original document for this search.
864 line_fragments = content.get_line(content.cursor_position.y)
865
866 if main_control.search_state.direction == SearchDirection.FORWARD:
867 direction_text = "i-search"
868 else:
869 direction_text = "reverse-i-search"
870
871 fragments_before: StyleAndTextTuples = [
872 ("class:prompt.search", "("),
873 ("class:prompt.search", direction_text),
874 ("class:prompt.search", ")`"),
875 ]
876
877 fragments = (
878 fragments_before
879 + [
880 ("class:prompt.search.text", fragment_list_to_text(ti.fragments)),
881 ("", "': "),
882 ]
883 + line_fragments
884 )
885
886 shift_position = fragment_list_len(fragments_before)
887 source_to_display = lambda i: i + shift_position
888 display_to_source = lambda i: i - shift_position
889 else:
890 source_to_display = None
891 display_to_source = None
892 fragments = ti.fragments
893
894 return Transformation(
895 fragments,
896 source_to_display=source_to_display,
897 display_to_source=display_to_source,
898 )
899
900
901class ConditionalProcessor(Processor):
902 """
903 Processor that applies another processor, according to a certain condition.
904 Example::
905
906 # Create a function that returns whether or not the processor should
907 # currently be applied.
908 def highlight_enabled():
909 return true_or_false
910
911 # Wrapped it in a `ConditionalProcessor` for usage in a `BufferControl`.
912 BufferControl(input_processors=[
913 ConditionalProcessor(HighlightSearchProcessor(),
914 Condition(highlight_enabled))])
915
916 :param processor: :class:`.Processor` instance.
917 :param filter: :class:`~prompt_toolkit.filters.Filter` instance.
918 """
919
920 def __init__(self, processor: Processor, filter: FilterOrBool) -> None:
921 self.processor = processor
922 self.filter = to_filter(filter)
923
924 def apply_transformation(
925 self, transformation_input: TransformationInput
926 ) -> Transformation:
927 # Run processor when enabled.
928 if self.filter():
929 return self.processor.apply_transformation(transformation_input)
930 else:
931 return Transformation(transformation_input.fragments)
932
933 def __repr__(self) -> str:
934 return f"{self.__class__.__name__}(processor={self.processor!r}, filter={self.filter!r})"
935
936
937class DynamicProcessor(Processor):
938 """
939 Processor class that dynamically returns any Processor.
940
941 :param get_processor: Callable that returns a :class:`.Processor` instance.
942 """
943
944 def __init__(self, get_processor: Callable[[], Processor | None]) -> None:
945 self.get_processor = get_processor
946
947 def apply_transformation(self, ti: TransformationInput) -> Transformation:
948 processor = self.get_processor() or DummyProcessor()
949 return processor.apply_transformation(ti)
950
951
952def merge_processors(processors: list[Processor]) -> Processor:
953 """
954 Merge multiple `Processor` objects into one.
955 """
956 if len(processors) == 0:
957 return DummyProcessor()
958
959 if len(processors) == 1:
960 return processors[0] # Nothing to merge.
961
962 return _MergedProcessor(processors)
963
964
965class _MergedProcessor(Processor):
966 """
967 Processor that groups multiple other `Processor` objects, but exposes an
968 API as if it is one `Processor`.
969 """
970
971 def __init__(self, processors: list[Processor]):
972 self.processors = processors
973
974 def apply_transformation(self, ti: TransformationInput) -> Transformation:
975 source_to_display_functions = [ti.source_to_display]
976 display_to_source_functions = []
977 fragments = ti.fragments
978
979 def source_to_display(i: int) -> int:
980 """Translate x position from the buffer to the x position in the
981 processor fragments list."""
982 for f in source_to_display_functions:
983 i = f(i)
984 return i
985
986 for p in self.processors:
987 transformation = p.apply_transformation(
988 TransformationInput(
989 ti.buffer_control,
990 ti.document,
991 ti.lineno,
992 source_to_display,
993 fragments,
994 ti.width,
995 ti.height,
996 ti.get_line,
997 )
998 )
999 fragments = transformation.fragments
1000 display_to_source_functions.append(transformation.display_to_source)
1001 source_to_display_functions.append(transformation.source_to_display)
1002
1003 def display_to_source(i: int) -> int:
1004 for f in reversed(display_to_source_functions):
1005 i = f(i)
1006 return i
1007
1008 # In the case of a nested _MergedProcessor, each processor wants to
1009 # receive a 'source_to_display' function (as part of the
1010 # TransformationInput) that has everything in the chain before
1011 # included, because it can be called as part of the
1012 # `apply_transformation` function. However, this first
1013 # `source_to_display` should not be part of the output that we are
1014 # returning. (This is the most consistent with `display_to_source`.)
1015 del source_to_display_functions[:1]
1016
1017 return Transformation(fragments, source_to_display, display_to_source)