1"""
2Processors are little transformation blocks that transform the fragments list
3from a buffer before the BufferControl will render it to the screen.
4
5They can insert fragments before or after, or highlight fragments by replacing the
6fragment types.
7"""
8
9from __future__ import annotations
10
11import re
12from abc import ABCMeta, abstractmethod
13from typing import TYPE_CHECKING, Callable, Hashable, cast
14
15from prompt_toolkit.application.current import get_app
16from prompt_toolkit.cache import SimpleCache
17from prompt_toolkit.document import Document
18from prompt_toolkit.filters import FilterOrBool, to_filter, vi_insert_multiple_mode
19from prompt_toolkit.formatted_text import (
20 AnyFormattedText,
21 StyleAndTextTuples,
22 to_formatted_text,
23)
24from prompt_toolkit.formatted_text.utils import fragment_list_len, fragment_list_to_text
25from prompt_toolkit.search import SearchDirection
26from prompt_toolkit.utils import to_int, to_str
27
28from .utils import explode_text_fragments
29
30if TYPE_CHECKING:
31 from .controls import BufferControl, UIContent
32
33__all__ = [
34 "Processor",
35 "TransformationInput",
36 "Transformation",
37 "DummyProcessor",
38 "HighlightSearchProcessor",
39 "HighlightIncrementalSearchProcessor",
40 "HighlightSelectionProcessor",
41 "PasswordProcessor",
42 "HighlightMatchingBracketProcessor",
43 "DisplayMultipleCursors",
44 "BeforeInput",
45 "ShowArg",
46 "AfterInput",
47 "AppendAutoSuggestion",
48 "ConditionalProcessor",
49 "ShowLeadingWhiteSpaceProcessor",
50 "ShowTrailingWhiteSpaceProcessor",
51 "TabsProcessor",
52 "ReverseSearchProcessor",
53 "DynamicProcessor",
54 "merge_processors",
55]
56
57
58class Processor(metaclass=ABCMeta):
59 """
60 Manipulate the fragments for a given line in a
61 :class:`~prompt_toolkit.layout.controls.BufferControl`.
62 """
63
64 @abstractmethod
65 def apply_transformation(
66 self, transformation_input: TransformationInput
67 ) -> Transformation:
68 """
69 Apply transformation. Returns a :class:`.Transformation` instance.
70
71 :param transformation_input: :class:`.TransformationInput` object.
72 """
73 return Transformation(transformation_input.fragments)
74
75
76SourceToDisplay = Callable[[int], int]
77DisplayToSource = Callable[[int], int]
78
79
80class TransformationInput:
81 """
82 :param buffer_control: :class:`.BufferControl` instance.
83 :param lineno: The number of the line to which we apply the processor.
84 :param source_to_display: A function that returns the position in the
85 `fragments` for any position in the source string. (This takes
86 previous processors into account.)
87 :param fragments: List of fragments that we can transform. (Received from the
88 previous processor.)
89 :param get_line: Optional ; a callable that returns the fragments of another
90 line in the current buffer; This can be used to create processors capable
91 of affecting transforms across multiple lines.
92 """
93
94 def __init__(
95 self,
96 buffer_control: BufferControl,
97 document: Document,
98 lineno: int,
99 source_to_display: SourceToDisplay,
100 fragments: StyleAndTextTuples,
101 width: int,
102 height: int,
103 get_line: Callable[[int], StyleAndTextTuples] | None = None,
104 ) -> None:
105 self.buffer_control = buffer_control
106 self.document = document
107 self.lineno = lineno
108 self.source_to_display = source_to_display
109 self.fragments = fragments
110 self.width = width
111 self.height = height
112 self.get_line = get_line
113
114 def unpack(
115 self,
116 ) -> tuple[
117 BufferControl, Document, int, SourceToDisplay, StyleAndTextTuples, int, int
118 ]:
119 return (
120 self.buffer_control,
121 self.document,
122 self.lineno,
123 self.source_to_display,
124 self.fragments,
125 self.width,
126 self.height,
127 )
128
129
130class Transformation:
131 """
132 Transformation result, as returned by :meth:`.Processor.apply_transformation`.
133
134 Important: Always make sure that the length of `document.text` is equal to
135 the length of all the text in `fragments`!
136
137 :param fragments: The transformed fragments. To be displayed, or to pass to
138 the next processor.
139 :param source_to_display: Cursor position transformation from original
140 string to transformed string.
141 :param display_to_source: Cursor position transformed from source string to
142 original string.
143 """
144
145 def __init__(
146 self,
147 fragments: StyleAndTextTuples,
148 source_to_display: SourceToDisplay | None = None,
149 display_to_source: DisplayToSource | None = None,
150 ) -> None:
151 self.fragments = fragments
152 self.source_to_display = source_to_display or (lambda i: i)
153 self.display_to_source = display_to_source or (lambda i: i)
154
155
156class DummyProcessor(Processor):
157 """
158 A `Processor` that doesn't do anything.
159 """
160
161 def apply_transformation(
162 self, transformation_input: TransformationInput
163 ) -> Transformation:
164 return Transformation(transformation_input.fragments)
165
166
167class HighlightSearchProcessor(Processor):
168 """
169 Processor that highlights search matches in the document.
170 Note that this doesn't support multiline search matches yet.
171
172 The style classes 'search' and 'search.current' will be applied to the
173 content.
174 """
175
176 _classname = "search"
177 _classname_current = "search.current"
178
179 def _get_search_text(self, buffer_control: BufferControl) -> str:
180 """
181 The text we are searching for.
182 """
183 return buffer_control.search_state.text
184
185 def apply_transformation(
186 self, transformation_input: TransformationInput
187 ) -> Transformation:
188 (
189 buffer_control,
190 document,
191 lineno,
192 source_to_display,
193 fragments,
194 _,
195 _,
196 ) = transformation_input.unpack()
197
198 search_text = self._get_search_text(buffer_control)
199 searchmatch_fragment = f" class:{self._classname} "
200 searchmatch_current_fragment = f" class:{self._classname_current} "
201
202 if search_text and not get_app().is_done:
203 # For each search match, replace the style string.
204 line_text = fragment_list_to_text(fragments)
205 fragments = explode_text_fragments(fragments)
206
207 if buffer_control.search_state.ignore_case():
208 flags = re.IGNORECASE
209 else:
210 flags = re.RegexFlag(0)
211
212 # Get cursor column.
213 cursor_column: int | None
214 if document.cursor_position_row == lineno:
215 cursor_column = source_to_display(document.cursor_position_col)
216 else:
217 cursor_column = None
218
219 for match in re.finditer(re.escape(search_text), line_text, flags=flags):
220 if cursor_column is not None:
221 on_cursor = match.start() <= cursor_column < match.end()
222 else:
223 on_cursor = False
224
225 for i in range(match.start(), match.end()):
226 old_fragment, text, *_ = fragments[i]
227 if on_cursor:
228 fragments[i] = (
229 old_fragment + searchmatch_current_fragment,
230 fragments[i][1],
231 )
232 else:
233 fragments[i] = (
234 old_fragment + searchmatch_fragment,
235 fragments[i][1],
236 )
237
238 return Transformation(fragments)
239
240
241class HighlightIncrementalSearchProcessor(HighlightSearchProcessor):
242 """
243 Highlight the search terms that are used for highlighting the incremental
244 search. The style class 'incsearch' will be applied to the content.
245
246 Important: this requires the `preview_search=True` flag to be set for the
247 `BufferControl`. Otherwise, the cursor position won't be set to the search
248 match while searching, and nothing happens.
249 """
250
251 _classname = "incsearch"
252 _classname_current = "incsearch.current"
253
254 def _get_search_text(self, buffer_control: BufferControl) -> str:
255 """
256 The text we are searching for.
257 """
258 # When the search buffer has focus, take that text.
259 search_buffer = buffer_control.search_buffer
260 if search_buffer is not None and search_buffer.text:
261 return search_buffer.text
262 return ""
263
264
265class HighlightSelectionProcessor(Processor):
266 """
267 Processor that highlights the selection in the document.
268 """
269
270 def apply_transformation(
271 self, transformation_input: TransformationInput
272 ) -> Transformation:
273 (
274 buffer_control,
275 document,
276 lineno,
277 source_to_display,
278 fragments,
279 _,
280 _,
281 ) = transformation_input.unpack()
282
283 selected_fragment = " class:selected "
284
285 # In case of selection, highlight all matches.
286 selection_at_line = document.selection_range_at_line(lineno)
287
288 if selection_at_line:
289 from_, to = selection_at_line
290 from_ = source_to_display(from_)
291 to = source_to_display(to)
292
293 fragments = explode_text_fragments(fragments)
294
295 if from_ == 0 and to == 0 and len(fragments) == 0:
296 # When this is an empty line, insert a space in order to
297 # visualize the selection.
298 return Transformation([(selected_fragment, " ")])
299 else:
300 for i in range(from_, to):
301 if i < len(fragments):
302 old_fragment, old_text, *_ = fragments[i]
303 fragments[i] = (old_fragment + selected_fragment, old_text)
304 elif i == len(fragments):
305 fragments.append((selected_fragment, " "))
306
307 return Transformation(fragments)
308
309
310class PasswordProcessor(Processor):
311 """
312 Processor that masks the input. (For passwords.)
313
314 :param char: (string) Character to be used. "*" by default.
315 """
316
317 def __init__(self, char: str = "*") -> None:
318 self.char = char
319
320 def apply_transformation(self, ti: TransformationInput) -> Transformation:
321 fragments: StyleAndTextTuples = cast(
322 StyleAndTextTuples,
323 [
324 (style, self.char * len(text), *handler)
325 for style, text, *handler in ti.fragments
326 ],
327 )
328
329 return Transformation(fragments)
330
331
332class HighlightMatchingBracketProcessor(Processor):
333 """
334 When the cursor is on or right after a bracket, it highlights the matching
335 bracket.
336
337 :param max_cursor_distance: Only highlight matching brackets when the
338 cursor is within this distance. (From inside a `Processor`, we can't
339 know which lines will be visible on the screen. But we also don't want
340 to scan the whole document for matching brackets on each key press, so
341 we limit to this value.)
342 """
343
344 _closing_braces = "])}>"
345
346 def __init__(
347 self, chars: str = "[](){}<>", max_cursor_distance: int = 1000
348 ) -> None:
349 self.chars = chars
350 self.max_cursor_distance = max_cursor_distance
351
352 self._positions_cache: SimpleCache[Hashable, list[tuple[int, int]]] = (
353 SimpleCache(maxsize=8)
354 )
355
356 def _get_positions_to_highlight(self, document: Document) -> list[tuple[int, int]]:
357 """
358 Return a list of (row, col) tuples that need to be highlighted.
359 """
360 pos: int | None
361
362 # Try for the character under the cursor.
363 if document.current_char and document.current_char in self.chars:
364 pos = document.find_matching_bracket_position(
365 start_pos=document.cursor_position - self.max_cursor_distance,
366 end_pos=document.cursor_position + self.max_cursor_distance,
367 )
368
369 # Try for the character before the cursor.
370 elif (
371 document.char_before_cursor
372 and document.char_before_cursor in self._closing_braces
373 and document.char_before_cursor in self.chars
374 ):
375 document = Document(document.text, document.cursor_position - 1)
376
377 pos = document.find_matching_bracket_position(
378 start_pos=document.cursor_position - self.max_cursor_distance,
379 end_pos=document.cursor_position + self.max_cursor_distance,
380 )
381 else:
382 pos = None
383
384 # Return a list of (row, col) tuples that need to be highlighted.
385 if pos:
386 pos += document.cursor_position # pos is relative.
387 row, col = document.translate_index_to_position(pos)
388 return [
389 (row, col),
390 (document.cursor_position_row, document.cursor_position_col),
391 ]
392 else:
393 return []
394
395 def apply_transformation(
396 self, transformation_input: TransformationInput
397 ) -> Transformation:
398 (
399 buffer_control,
400 document,
401 lineno,
402 source_to_display,
403 fragments,
404 _,
405 _,
406 ) = transformation_input.unpack()
407
408 # When the application is in the 'done' state, don't highlight.
409 if get_app().is_done:
410 return Transformation(fragments)
411
412 # Get the highlight positions.
413 key = (get_app().render_counter, document.text, document.cursor_position)
414 positions = self._positions_cache.get(
415 key, lambda: self._get_positions_to_highlight(document)
416 )
417
418 # Apply if positions were found at this line.
419 if positions:
420 for row, col in positions:
421 if row == lineno:
422 col = source_to_display(col)
423 fragments = explode_text_fragments(fragments)
424 style, text, *_ = fragments[col]
425
426 if col == document.cursor_position_col:
427 style += " class:matching-bracket.cursor "
428 else:
429 style += " class:matching-bracket.other "
430
431 fragments[col] = (style, text)
432
433 return Transformation(fragments)
434
435
436class DisplayMultipleCursors(Processor):
437 """
438 When we're in Vi block insert mode, display all the cursors.
439 """
440
441 def apply_transformation(
442 self, transformation_input: TransformationInput
443 ) -> Transformation:
444 (
445 buffer_control,
446 document,
447 lineno,
448 source_to_display,
449 fragments,
450 _,
451 _,
452 ) = transformation_input.unpack()
453
454 buff = buffer_control.buffer
455
456 if vi_insert_multiple_mode():
457 cursor_positions = buff.multiple_cursor_positions
458 fragments = explode_text_fragments(fragments)
459
460 # If any cursor appears on the current line, highlight that.
461 start_pos = document.translate_row_col_to_index(lineno, 0)
462 end_pos = start_pos + len(document.lines[lineno])
463
464 fragment_suffix = " class:multiple-cursors"
465
466 for p in cursor_positions:
467 if start_pos <= p <= end_pos:
468 column = source_to_display(p - start_pos)
469
470 # Replace fragment.
471 try:
472 style, text, *_ = fragments[column]
473 except IndexError:
474 # Cursor needs to be displayed after the current text.
475 fragments.append((fragment_suffix, " "))
476 else:
477 style += fragment_suffix
478 fragments[column] = (style, text)
479
480 return Transformation(fragments)
481 else:
482 return Transformation(fragments)
483
484
485class BeforeInput(Processor):
486 """
487 Insert text before the input.
488
489 :param text: This can be either plain text or formatted text
490 (or a callable that returns any of those).
491 :param style: style to be applied to this prompt/prefix.
492 """
493
494 def __init__(self, text: AnyFormattedText, style: str = "") -> None:
495 self.text = text
496 self.style = style
497
498 def apply_transformation(self, ti: TransformationInput) -> Transformation:
499 source_to_display: SourceToDisplay | None
500 display_to_source: DisplayToSource | None
501
502 if ti.lineno == 0:
503 # Get fragments.
504 fragments_before = to_formatted_text(self.text, self.style)
505 fragments = fragments_before + ti.fragments
506
507 shift_position = fragment_list_len(fragments_before)
508 source_to_display = lambda i: i + shift_position
509 display_to_source = lambda i: i - shift_position
510 else:
511 fragments = ti.fragments
512 source_to_display = None
513 display_to_source = None
514
515 return Transformation(
516 fragments,
517 source_to_display=source_to_display,
518 display_to_source=display_to_source,
519 )
520
521 def __repr__(self) -> str:
522 return f"BeforeInput({self.text!r}, {self.style!r})"
523
524
525class ShowArg(BeforeInput):
526 """
527 Display the 'arg' in front of the input.
528
529 This was used by the `PromptSession`, but now it uses the
530 `Window.get_line_prefix` function instead.
531 """
532
533 def __init__(self) -> None:
534 super().__init__(self._get_text_fragments)
535
536 def _get_text_fragments(self) -> StyleAndTextTuples:
537 app = get_app()
538 if app.key_processor.arg is None:
539 return []
540 else:
541 arg = app.key_processor.arg
542
543 return [
544 ("class:prompt.arg", "(arg: "),
545 ("class:prompt.arg.text", str(arg)),
546 ("class:prompt.arg", ") "),
547 ]
548
549 def __repr__(self) -> str:
550 return "ShowArg()"
551
552
553class AfterInput(Processor):
554 """
555 Insert text after the input.
556
557 :param text: This can be either plain text or formatted text
558 (or a callable that returns any of those).
559 :param style: style to be applied to this prompt/prefix.
560 """
561
562 def __init__(self, text: AnyFormattedText, style: str = "") -> None:
563 self.text = text
564 self.style = style
565
566 def apply_transformation(self, ti: TransformationInput) -> Transformation:
567 # Insert fragments after the last line.
568 if ti.lineno == ti.document.line_count - 1:
569 # Get fragments.
570 fragments_after = to_formatted_text(self.text, self.style)
571 return Transformation(fragments=ti.fragments + fragments_after)
572 else:
573 return Transformation(fragments=ti.fragments)
574
575 def __repr__(self) -> str:
576 return f"{self.__class__.__name__}({self.text!r}, style={self.style!r})"
577
578
579class AppendAutoSuggestion(Processor):
580 """
581 Append the auto suggestion to the input.
582 (The user can then press the right arrow the insert the suggestion.)
583 """
584
585 def __init__(self, style: str = "class:auto-suggestion") -> None:
586 self.style = style
587
588 def apply_transformation(self, ti: TransformationInput) -> Transformation:
589 # Insert fragments after the last line.
590 if ti.lineno == ti.document.line_count - 1:
591 buffer = ti.buffer_control.buffer
592
593 if buffer.suggestion and ti.document.is_cursor_at_the_end:
594 suggestion = buffer.suggestion.text
595 else:
596 suggestion = ""
597
598 return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
599 else:
600 return Transformation(fragments=ti.fragments)
601
602
603class ShowLeadingWhiteSpaceProcessor(Processor):
604 """
605 Make leading whitespace visible.
606
607 :param get_char: Callable that returns one character.
608 """
609
610 def __init__(
611 self,
612 get_char: Callable[[], str] | None = None,
613 style: str = "class:leading-whitespace",
614 ) -> None:
615 def default_get_char() -> str:
616 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?":
617 return "."
618 else:
619 return "\xb7"
620
621 self.style = style
622 self.get_char = get_char or default_get_char
623
624 def apply_transformation(self, ti: TransformationInput) -> Transformation:
625 fragments = ti.fragments
626
627 # Walk through all te fragments.
628 if fragments and fragment_list_to_text(fragments).startswith(" "):
629 t = (self.style, self.get_char())
630 fragments = explode_text_fragments(fragments)
631
632 for i in range(len(fragments)):
633 if fragments[i][1] == " ":
634 fragments[i] = t
635 else:
636 break
637
638 return Transformation(fragments)
639
640
641class ShowTrailingWhiteSpaceProcessor(Processor):
642 """
643 Make trailing whitespace visible.
644
645 :param get_char: Callable that returns one character.
646 """
647
648 def __init__(
649 self,
650 get_char: Callable[[], str] | None = None,
651 style: str = "class:training-whitespace",
652 ) -> None:
653 def default_get_char() -> str:
654 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?":
655 return "."
656 else:
657 return "\xb7"
658
659 self.style = style
660 self.get_char = get_char or default_get_char
661
662 def apply_transformation(self, ti: TransformationInput) -> Transformation:
663 fragments = ti.fragments
664
665 if fragments and fragments[-1][1].endswith(" "):
666 t = (self.style, self.get_char())
667 fragments = explode_text_fragments(fragments)
668
669 # Walk backwards through all te fragments and replace whitespace.
670 for i in range(len(fragments) - 1, -1, -1):
671 char = fragments[i][1]
672 if char == " ":
673 fragments[i] = t
674 else:
675 break
676
677 return Transformation(fragments)
678
679
680class TabsProcessor(Processor):
681 """
682 Render tabs as spaces (instead of ^I) or make them visible (for instance,
683 by replacing them with dots.)
684
685 :param tabstop: Horizontal space taken by a tab. (`int` or callable that
686 returns an `int`).
687 :param char1: Character or callable that returns a character (text of
688 length one). This one is used for the first space taken by the tab.
689 :param char2: Like `char1`, but for the rest of the space.
690 """
691
692 def __init__(
693 self,
694 tabstop: int | Callable[[], int] = 4,
695 char1: str | Callable[[], str] = "|",
696 char2: str | Callable[[], str] = "\u2508",
697 style: str = "class:tab",
698 ) -> None:
699 self.char1 = char1
700 self.char2 = char2
701 self.tabstop = tabstop
702 self.style = style
703
704 def apply_transformation(self, ti: TransformationInput) -> Transformation:
705 tabstop = to_int(self.tabstop)
706 style = self.style
707
708 # Create separator for tabs.
709 separator1 = to_str(self.char1)
710 separator2 = to_str(self.char2)
711
712 # Transform fragments.
713 fragments = explode_text_fragments(ti.fragments)
714
715 position_mappings = {}
716 result_fragments: StyleAndTextTuples = []
717 pos = 0
718
719 for i, fragment_and_text in enumerate(fragments):
720 position_mappings[i] = pos
721
722 if fragment_and_text[1] == "\t":
723 # Calculate how many characters we have to insert.
724 count = tabstop - (pos % tabstop)
725 if count == 0:
726 count = tabstop
727
728 # Insert tab.
729 result_fragments.append((style, separator1))
730 result_fragments.append((style, separator2 * (count - 1)))
731 pos += count
732 else:
733 result_fragments.append(fragment_and_text)
734 pos += 1
735
736 position_mappings[len(fragments)] = pos
737 # Add `pos+1` to mapping, because the cursor can be right after the
738 # line as well.
739 position_mappings[len(fragments) + 1] = pos + 1
740
741 def source_to_display(from_position: int) -> int:
742 "Maps original cursor position to the new one."
743 return position_mappings[from_position]
744
745 def display_to_source(display_pos: int) -> int:
746 "Maps display cursor position to the original one."
747 position_mappings_reversed = {v: k for k, v in position_mappings.items()}
748
749 while display_pos >= 0:
750 try:
751 return position_mappings_reversed[display_pos]
752 except KeyError:
753 display_pos -= 1
754 return 0
755
756 return Transformation(
757 result_fragments,
758 source_to_display=source_to_display,
759 display_to_source=display_to_source,
760 )
761
762
763class ReverseSearchProcessor(Processor):
764 """
765 Process to display the "(reverse-i-search)`...`:..." stuff around
766 the search buffer.
767
768 Note: This processor is meant to be applied to the BufferControl that
769 contains the search buffer, it's not meant for the original input.
770 """
771
772 _excluded_input_processors: list[type[Processor]] = [
773 HighlightSearchProcessor,
774 HighlightSelectionProcessor,
775 BeforeInput,
776 AfterInput,
777 ]
778
779 def _get_main_buffer(self, buffer_control: BufferControl) -> BufferControl | None:
780 from prompt_toolkit.layout.controls import BufferControl
781
782 prev_control = get_app().layout.search_target_buffer_control
783 if (
784 isinstance(prev_control, BufferControl)
785 and prev_control.search_buffer_control == buffer_control
786 ):
787 return prev_control
788 return None
789
790 def _content(
791 self, main_control: BufferControl, ti: TransformationInput
792 ) -> UIContent:
793 from prompt_toolkit.layout.controls import BufferControl
794
795 # Emulate the BufferControl through which we are searching.
796 # For this we filter out some of the input processors.
797 excluded_processors = tuple(self._excluded_input_processors)
798
799 def filter_processor(item: Processor) -> Processor | None:
800 """Filter processors from the main control that we want to disable
801 here. This returns either an accepted processor or None."""
802 # For a `_MergedProcessor`, check each individual processor, recursively.
803 if isinstance(item, _MergedProcessor):
804 accepted_processors = [filter_processor(p) for p in item.processors]
805 return merge_processors(
806 [p for p in accepted_processors if p is not None]
807 )
808
809 # For a `ConditionalProcessor`, check the body.
810 elif isinstance(item, ConditionalProcessor):
811 p = filter_processor(item.processor)
812 if p:
813 return ConditionalProcessor(p, item.filter)
814
815 # Otherwise, check the processor itself.
816 else:
817 if not isinstance(item, excluded_processors):
818 return item
819
820 return None
821
822 filtered_processor = filter_processor(
823 merge_processors(main_control.input_processors or [])
824 )
825 highlight_processor = HighlightIncrementalSearchProcessor()
826
827 if filtered_processor:
828 new_processors = [filtered_processor, highlight_processor]
829 else:
830 new_processors = [highlight_processor]
831
832 from .controls import SearchBufferControl
833
834 assert isinstance(ti.buffer_control, SearchBufferControl)
835
836 buffer_control = BufferControl(
837 buffer=main_control.buffer,
838 input_processors=new_processors,
839 include_default_input_processors=False,
840 lexer=main_control.lexer,
841 preview_search=True,
842 search_buffer_control=ti.buffer_control,
843 )
844
845 return buffer_control.create_content(ti.width, ti.height, preview_search=True)
846
847 def apply_transformation(self, ti: TransformationInput) -> Transformation:
848 from .controls import SearchBufferControl
849
850 assert isinstance(ti.buffer_control, SearchBufferControl), (
851 "`ReverseSearchProcessor` should be applied to a `SearchBufferControl` only."
852 )
853
854 source_to_display: SourceToDisplay | None
855 display_to_source: DisplayToSource | None
856
857 main_control = self._get_main_buffer(ti.buffer_control)
858
859 if ti.lineno == 0 and main_control:
860 content = self._content(main_control, ti)
861
862 # Get the line from the original document for this search.
863 line_fragments = content.get_line(content.cursor_position.y)
864
865 if main_control.search_state.direction == SearchDirection.FORWARD:
866 direction_text = "i-search"
867 else:
868 direction_text = "reverse-i-search"
869
870 fragments_before: StyleAndTextTuples = [
871 ("class:prompt.search", "("),
872 ("class:prompt.search", direction_text),
873 ("class:prompt.search", ")`"),
874 ]
875
876 fragments = (
877 fragments_before
878 + [
879 ("class:prompt.search.text", fragment_list_to_text(ti.fragments)),
880 ("", "': "),
881 ]
882 + line_fragments
883 )
884
885 shift_position = fragment_list_len(fragments_before)
886 source_to_display = lambda i: i + shift_position
887 display_to_source = lambda i: i - shift_position
888 else:
889 source_to_display = None
890 display_to_source = None
891 fragments = ti.fragments
892
893 return Transformation(
894 fragments,
895 source_to_display=source_to_display,
896 display_to_source=display_to_source,
897 )
898
899
900class ConditionalProcessor(Processor):
901 """
902 Processor that applies another processor, according to a certain condition.
903 Example::
904
905 # Create a function that returns whether or not the processor should
906 # currently be applied.
907 def highlight_enabled():
908 return true_or_false
909
910 # Wrapped it in a `ConditionalProcessor` for usage in a `BufferControl`.
911 BufferControl(input_processors=[
912 ConditionalProcessor(HighlightSearchProcessor(),
913 Condition(highlight_enabled))])
914
915 :param processor: :class:`.Processor` instance.
916 :param filter: :class:`~prompt_toolkit.filters.Filter` instance.
917 """
918
919 def __init__(self, processor: Processor, filter: FilterOrBool) -> None:
920 self.processor = processor
921 self.filter = to_filter(filter)
922
923 def apply_transformation(
924 self, transformation_input: TransformationInput
925 ) -> Transformation:
926 # Run processor when enabled.
927 if self.filter():
928 return self.processor.apply_transformation(transformation_input)
929 else:
930 return Transformation(transformation_input.fragments)
931
932 def __repr__(self) -> str:
933 return f"{self.__class__.__name__}(processor={self.processor!r}, filter={self.filter!r})"
934
935
936class DynamicProcessor(Processor):
937 """
938 Processor class that dynamically returns any Processor.
939
940 :param get_processor: Callable that returns a :class:`.Processor` instance.
941 """
942
943 def __init__(self, get_processor: Callable[[], Processor | None]) -> None:
944 self.get_processor = get_processor
945
946 def apply_transformation(self, ti: TransformationInput) -> Transformation:
947 processor = self.get_processor() or DummyProcessor()
948 return processor.apply_transformation(ti)
949
950
951def merge_processors(processors: list[Processor]) -> Processor:
952 """
953 Merge multiple `Processor` objects into one.
954 """
955 if len(processors) == 0:
956 return DummyProcessor()
957
958 if len(processors) == 1:
959 return processors[0] # Nothing to merge.
960
961 return _MergedProcessor(processors)
962
963
964class _MergedProcessor(Processor):
965 """
966 Processor that groups multiple other `Processor` objects, but exposes an
967 API as if it is one `Processor`.
968 """
969
970 def __init__(self, processors: list[Processor]):
971 self.processors = processors
972
973 def apply_transformation(self, ti: TransformationInput) -> Transformation:
974 source_to_display_functions = [ti.source_to_display]
975 display_to_source_functions = []
976 fragments = ti.fragments
977
978 def source_to_display(i: int) -> int:
979 """Translate x position from the buffer to the x position in the
980 processor fragments list."""
981 for f in source_to_display_functions:
982 i = f(i)
983 return i
984
985 for p in self.processors:
986 transformation = p.apply_transformation(
987 TransformationInput(
988 ti.buffer_control,
989 ti.document,
990 ti.lineno,
991 source_to_display,
992 fragments,
993 ti.width,
994 ti.height,
995 ti.get_line,
996 )
997 )
998 fragments = transformation.fragments
999 display_to_source_functions.append(transformation.display_to_source)
1000 source_to_display_functions.append(transformation.source_to_display)
1001
1002 def display_to_source(i: int) -> int:
1003 for f in reversed(display_to_source_functions):
1004 i = f(i)
1005 return i
1006
1007 # In the case of a nested _MergedProcessor, each processor wants to
1008 # receive a 'source_to_display' function (as part of the
1009 # TransformationInput) that has everything in the chain before
1010 # included, because it can be called as part of the
1011 # `apply_transformation` function. However, this first
1012 # `source_to_display` should not be part of the output that we are
1013 # returning. (This is the most consistent with `display_to_source`.)
1014 del source_to_display_functions[:1]
1015
1016 return Transformation(fragments, source_to_display, display_to_source)