Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/layout/processors.py: 23%
379 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1"""
2Processors are little transformation blocks that transform the fragments list
3from a buffer before the BufferControl will render it to the screen.
5They can insert fragments before or after, or highlight fragments by replacing the
6fragment types.
7"""
8from __future__ import annotations
10import re
11from abc import ABCMeta, abstractmethod
12from typing import TYPE_CHECKING, Callable, Hashable, cast
14from prompt_toolkit.application.current import get_app
15from prompt_toolkit.cache import SimpleCache
16from prompt_toolkit.document import Document
17from prompt_toolkit.filters import FilterOrBool, to_filter, vi_insert_multiple_mode
18from prompt_toolkit.formatted_text import (
19 AnyFormattedText,
20 StyleAndTextTuples,
21 to_formatted_text,
22)
23from prompt_toolkit.formatted_text.utils import fragment_list_len, fragment_list_to_text
24from prompt_toolkit.search import SearchDirection
25from prompt_toolkit.utils import to_int, to_str
27from .utils import explode_text_fragments
29if TYPE_CHECKING:
30 from .controls import BufferControl, UIContent
32__all__ = [
33 "Processor",
34 "TransformationInput",
35 "Transformation",
36 "DummyProcessor",
37 "HighlightSearchProcessor",
38 "HighlightIncrementalSearchProcessor",
39 "HighlightSelectionProcessor",
40 "PasswordProcessor",
41 "HighlightMatchingBracketProcessor",
42 "DisplayMultipleCursors",
43 "BeforeInput",
44 "ShowArg",
45 "AfterInput",
46 "AppendAutoSuggestion",
47 "ConditionalProcessor",
48 "ShowLeadingWhiteSpaceProcessor",
49 "ShowTrailingWhiteSpaceProcessor",
50 "TabsProcessor",
51 "ReverseSearchProcessor",
52 "DynamicProcessor",
53 "merge_processors",
54]
57class Processor(metaclass=ABCMeta):
58 """
59 Manipulate the fragments for a given line in a
60 :class:`~prompt_toolkit.layout.controls.BufferControl`.
61 """
63 @abstractmethod
64 def apply_transformation(
65 self, transformation_input: TransformationInput
66 ) -> Transformation:
67 """
68 Apply transformation. Returns a :class:`.Transformation` instance.
70 :param transformation_input: :class:`.TransformationInput` object.
71 """
72 return Transformation(transformation_input.fragments)
75SourceToDisplay = Callable[[int], int]
76DisplayToSource = Callable[[int], int]
79class TransformationInput:
80 """
81 :param buffer_control: :class:`.BufferControl` instance.
82 :param lineno: The number of the line to which we apply the processor.
83 :param source_to_display: A function that returns the position in the
84 `fragments` for any position in the source string. (This takes
85 previous processors into account.)
86 :param fragments: List of fragments that we can transform. (Received from the
87 previous processor.)
88 """
90 def __init__(
91 self,
92 buffer_control: BufferControl,
93 document: Document,
94 lineno: int,
95 source_to_display: SourceToDisplay,
96 fragments: StyleAndTextTuples,
97 width: int,
98 height: int,
99 ) -> None:
100 self.buffer_control = buffer_control
101 self.document = document
102 self.lineno = lineno
103 self.source_to_display = source_to_display
104 self.fragments = fragments
105 self.width = width
106 self.height = height
108 def unpack(
109 self,
110 ) -> tuple[
111 BufferControl, Document, int, SourceToDisplay, StyleAndTextTuples, int, int
112 ]:
113 return (
114 self.buffer_control,
115 self.document,
116 self.lineno,
117 self.source_to_display,
118 self.fragments,
119 self.width,
120 self.height,
121 )
124class Transformation:
125 """
126 Transformation result, as returned by :meth:`.Processor.apply_transformation`.
128 Important: Always make sure that the length of `document.text` is equal to
129 the length of all the text in `fragments`!
131 :param fragments: The transformed fragments. To be displayed, or to pass to
132 the next processor.
133 :param source_to_display: Cursor position transformation from original
134 string to transformed string.
135 :param display_to_source: Cursor position transformed from source string to
136 original string.
137 """
139 def __init__(
140 self,
141 fragments: StyleAndTextTuples,
142 source_to_display: SourceToDisplay | None = None,
143 display_to_source: DisplayToSource | None = None,
144 ) -> None:
145 self.fragments = fragments
146 self.source_to_display = source_to_display or (lambda i: i)
147 self.display_to_source = display_to_source or (lambda i: i)
150class DummyProcessor(Processor):
151 """
152 A `Processor` that doesn't do anything.
153 """
155 def apply_transformation(
156 self, transformation_input: TransformationInput
157 ) -> Transformation:
158 return Transformation(transformation_input.fragments)
161class HighlightSearchProcessor(Processor):
162 """
163 Processor that highlights search matches in the document.
164 Note that this doesn't support multiline search matches yet.
166 The style classes 'search' and 'search.current' will be applied to the
167 content.
168 """
170 _classname = "search"
171 _classname_current = "search.current"
173 def _get_search_text(self, buffer_control: BufferControl) -> str:
174 """
175 The text we are searching for.
176 """
177 return buffer_control.search_state.text
179 def apply_transformation(
180 self, transformation_input: TransformationInput
181 ) -> Transformation:
182 (
183 buffer_control,
184 document,
185 lineno,
186 source_to_display,
187 fragments,
188 _,
189 _,
190 ) = transformation_input.unpack()
192 search_text = self._get_search_text(buffer_control)
193 searchmatch_fragment = f" class:{self._classname} "
194 searchmatch_current_fragment = f" class:{self._classname_current} "
196 if search_text and not get_app().is_done:
197 # For each search match, replace the style string.
198 line_text = fragment_list_to_text(fragments)
199 fragments = explode_text_fragments(fragments)
201 if buffer_control.search_state.ignore_case():
202 flags = re.IGNORECASE
203 else:
204 flags = re.RegexFlag(0)
206 # Get cursor column.
207 cursor_column: int | None
208 if document.cursor_position_row == lineno:
209 cursor_column = source_to_display(document.cursor_position_col)
210 else:
211 cursor_column = None
213 for match in re.finditer(re.escape(search_text), line_text, flags=flags):
214 if cursor_column is not None:
215 on_cursor = match.start() <= cursor_column < match.end()
216 else:
217 on_cursor = False
219 for i in range(match.start(), match.end()):
220 old_fragment, text, *_ = fragments[i]
221 if on_cursor:
222 fragments[i] = (
223 old_fragment + searchmatch_current_fragment,
224 fragments[i][1],
225 )
226 else:
227 fragments[i] = (
228 old_fragment + searchmatch_fragment,
229 fragments[i][1],
230 )
232 return Transformation(fragments)
235class HighlightIncrementalSearchProcessor(HighlightSearchProcessor):
236 """
237 Highlight the search terms that are used for highlighting the incremental
238 search. The style class 'incsearch' will be applied to the content.
240 Important: this requires the `preview_search=True` flag to be set for the
241 `BufferControl`. Otherwise, the cursor position won't be set to the search
242 match while searching, and nothing happens.
243 """
245 _classname = "incsearch"
246 _classname_current = "incsearch.current"
248 def _get_search_text(self, buffer_control: BufferControl) -> str:
249 """
250 The text we are searching for.
251 """
252 # When the search buffer has focus, take that text.
253 search_buffer = buffer_control.search_buffer
254 if search_buffer is not None and search_buffer.text:
255 return search_buffer.text
256 return ""
259class HighlightSelectionProcessor(Processor):
260 """
261 Processor that highlights the selection in the document.
262 """
264 def apply_transformation(
265 self, transformation_input: TransformationInput
266 ) -> Transformation:
267 (
268 buffer_control,
269 document,
270 lineno,
271 source_to_display,
272 fragments,
273 _,
274 _,
275 ) = transformation_input.unpack()
277 selected_fragment = " class:selected "
279 # In case of selection, highlight all matches.
280 selection_at_line = document.selection_range_at_line(lineno)
282 if selection_at_line:
283 from_, to = selection_at_line
284 from_ = source_to_display(from_)
285 to = source_to_display(to)
287 fragments = explode_text_fragments(fragments)
289 if from_ == 0 and to == 0 and len(fragments) == 0:
290 # When this is an empty line, insert a space in order to
291 # visualize the selection.
292 return Transformation([(selected_fragment, " ")])
293 else:
294 for i in range(from_, to):
295 if i < len(fragments):
296 old_fragment, old_text, *_ = fragments[i]
297 fragments[i] = (old_fragment + selected_fragment, old_text)
298 elif i == len(fragments):
299 fragments.append((selected_fragment, " "))
301 return Transformation(fragments)
304class PasswordProcessor(Processor):
305 """
306 Processor that masks the input. (For passwords.)
308 :param char: (string) Character to be used. "*" by default.
309 """
311 def __init__(self, char: str = "*") -> None:
312 self.char = char
314 def apply_transformation(self, ti: TransformationInput) -> Transformation:
315 fragments: StyleAndTextTuples = cast(
316 StyleAndTextTuples,
317 [
318 (style, self.char * len(text), *handler)
319 for style, text, *handler in ti.fragments
320 ],
321 )
323 return Transformation(fragments)
326class HighlightMatchingBracketProcessor(Processor):
327 """
328 When the cursor is on or right after a bracket, it highlights the matching
329 bracket.
331 :param max_cursor_distance: Only highlight matching brackets when the
332 cursor is within this distance. (From inside a `Processor`, we can't
333 know which lines will be visible on the screen. But we also don't want
334 to scan the whole document for matching brackets on each key press, so
335 we limit to this value.)
336 """
338 _closing_braces = "])}>"
340 def __init__(
341 self, chars: str = "[](){}<>", max_cursor_distance: int = 1000
342 ) -> None:
343 self.chars = chars
344 self.max_cursor_distance = max_cursor_distance
346 self._positions_cache: SimpleCache[
347 Hashable, list[tuple[int, int]]
348 ] = SimpleCache(maxsize=8)
350 def _get_positions_to_highlight(self, document: Document) -> list[tuple[int, int]]:
351 """
352 Return a list of (row, col) tuples that need to be highlighted.
353 """
354 pos: int | None
356 # Try for the character under the cursor.
357 if document.current_char and document.current_char in self.chars:
358 pos = document.find_matching_bracket_position(
359 start_pos=document.cursor_position - self.max_cursor_distance,
360 end_pos=document.cursor_position + self.max_cursor_distance,
361 )
363 # Try for the character before the cursor.
364 elif (
365 document.char_before_cursor
366 and document.char_before_cursor in self._closing_braces
367 and document.char_before_cursor in self.chars
368 ):
369 document = Document(document.text, document.cursor_position - 1)
371 pos = document.find_matching_bracket_position(
372 start_pos=document.cursor_position - self.max_cursor_distance,
373 end_pos=document.cursor_position + self.max_cursor_distance,
374 )
375 else:
376 pos = None
378 # Return a list of (row, col) tuples that need to be highlighted.
379 if pos:
380 pos += document.cursor_position # pos is relative.
381 row, col = document.translate_index_to_position(pos)
382 return [
383 (row, col),
384 (document.cursor_position_row, document.cursor_position_col),
385 ]
386 else:
387 return []
389 def apply_transformation(
390 self, transformation_input: TransformationInput
391 ) -> Transformation:
392 (
393 buffer_control,
394 document,
395 lineno,
396 source_to_display,
397 fragments,
398 _,
399 _,
400 ) = transformation_input.unpack()
402 # When the application is in the 'done' state, don't highlight.
403 if get_app().is_done:
404 return Transformation(fragments)
406 # Get the highlight positions.
407 key = (get_app().render_counter, document.text, document.cursor_position)
408 positions = self._positions_cache.get(
409 key, lambda: self._get_positions_to_highlight(document)
410 )
412 # Apply if positions were found at this line.
413 if positions:
414 for row, col in positions:
415 if row == lineno:
416 col = source_to_display(col)
417 fragments = explode_text_fragments(fragments)
418 style, text, *_ = fragments[col]
420 if col == document.cursor_position_col:
421 style += " class:matching-bracket.cursor "
422 else:
423 style += " class:matching-bracket.other "
425 fragments[col] = (style, text)
427 return Transformation(fragments)
430class DisplayMultipleCursors(Processor):
431 """
432 When we're in Vi block insert mode, display all the cursors.
433 """
435 def apply_transformation(
436 self, transformation_input: TransformationInput
437 ) -> Transformation:
438 (
439 buffer_control,
440 document,
441 lineno,
442 source_to_display,
443 fragments,
444 _,
445 _,
446 ) = transformation_input.unpack()
448 buff = buffer_control.buffer
450 if vi_insert_multiple_mode():
451 cursor_positions = buff.multiple_cursor_positions
452 fragments = explode_text_fragments(fragments)
454 # If any cursor appears on the current line, highlight that.
455 start_pos = document.translate_row_col_to_index(lineno, 0)
456 end_pos = start_pos + len(document.lines[lineno])
458 fragment_suffix = " class:multiple-cursors"
460 for p in cursor_positions:
461 if start_pos <= p <= end_pos:
462 column = source_to_display(p - start_pos)
464 # Replace fragment.
465 try:
466 style, text, *_ = fragments[column]
467 except IndexError:
468 # Cursor needs to be displayed after the current text.
469 fragments.append((fragment_suffix, " "))
470 else:
471 style += fragment_suffix
472 fragments[column] = (style, text)
474 return Transformation(fragments)
475 else:
476 return Transformation(fragments)
479class BeforeInput(Processor):
480 """
481 Insert text before the input.
483 :param text: This can be either plain text or formatted text
484 (or a callable that returns any of those).
485 :param style: style to be applied to this prompt/prefix.
486 """
488 def __init__(self, text: AnyFormattedText, style: str = "") -> None:
489 self.text = text
490 self.style = style
492 def apply_transformation(self, ti: TransformationInput) -> Transformation:
493 source_to_display: SourceToDisplay | None
494 display_to_source: DisplayToSource | None
496 if ti.lineno == 0:
497 # Get fragments.
498 fragments_before = to_formatted_text(self.text, self.style)
499 fragments = fragments_before + ti.fragments
501 shift_position = fragment_list_len(fragments_before)
502 source_to_display = lambda i: i + shift_position
503 display_to_source = lambda i: i - shift_position
504 else:
505 fragments = ti.fragments
506 source_to_display = None
507 display_to_source = None
509 return Transformation(
510 fragments,
511 source_to_display=source_to_display,
512 display_to_source=display_to_source,
513 )
515 def __repr__(self) -> str:
516 return f"BeforeInput({self.text!r}, {self.style!r})"
519class ShowArg(BeforeInput):
520 """
521 Display the 'arg' in front of the input.
523 This was used by the `PromptSession`, but now it uses the
524 `Window.get_line_prefix` function instead.
525 """
527 def __init__(self) -> None:
528 super().__init__(self._get_text_fragments)
530 def _get_text_fragments(self) -> StyleAndTextTuples:
531 app = get_app()
532 if app.key_processor.arg is None:
533 return []
534 else:
535 arg = app.key_processor.arg
537 return [
538 ("class:prompt.arg", "(arg: "),
539 ("class:prompt.arg.text", str(arg)),
540 ("class:prompt.arg", ") "),
541 ]
543 def __repr__(self) -> str:
544 return "ShowArg()"
547class AfterInput(Processor):
548 """
549 Insert text after the input.
551 :param text: This can be either plain text or formatted text
552 (or a callable that returns any of those).
553 :param style: style to be applied to this prompt/prefix.
554 """
556 def __init__(self, text: AnyFormattedText, style: str = "") -> None:
557 self.text = text
558 self.style = style
560 def apply_transformation(self, ti: TransformationInput) -> Transformation:
561 # Insert fragments after the last line.
562 if ti.lineno == ti.document.line_count - 1:
563 # Get fragments.
564 fragments_after = to_formatted_text(self.text, self.style)
565 return Transformation(fragments=ti.fragments + fragments_after)
566 else:
567 return Transformation(fragments=ti.fragments)
569 def __repr__(self) -> str:
570 return f"{self.__class__.__name__}({self.text!r}, style={self.style!r})"
573class AppendAutoSuggestion(Processor):
574 """
575 Append the auto suggestion to the input.
576 (The user can then press the right arrow the insert the suggestion.)
577 """
579 def __init__(self, style: str = "class:auto-suggestion") -> None:
580 self.style = style
582 def apply_transformation(self, ti: TransformationInput) -> Transformation:
583 # Insert fragments after the last line.
584 if ti.lineno == ti.document.line_count - 1:
585 buffer = ti.buffer_control.buffer
587 if buffer.suggestion and ti.document.is_cursor_at_the_end:
588 suggestion = buffer.suggestion.text
589 else:
590 suggestion = ""
592 return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
593 else:
594 return Transformation(fragments=ti.fragments)
597class ShowLeadingWhiteSpaceProcessor(Processor):
598 """
599 Make leading whitespace visible.
601 :param get_char: Callable that returns one character.
602 """
604 def __init__(
605 self,
606 get_char: Callable[[], str] | None = None,
607 style: str = "class:leading-whitespace",
608 ) -> None:
609 def default_get_char() -> str:
610 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?":
611 return "."
612 else:
613 return "\xb7"
615 self.style = style
616 self.get_char = get_char or default_get_char
618 def apply_transformation(self, ti: TransformationInput) -> Transformation:
619 fragments = ti.fragments
621 # Walk through all te fragments.
622 if fragments and fragment_list_to_text(fragments).startswith(" "):
623 t = (self.style, self.get_char())
624 fragments = explode_text_fragments(fragments)
626 for i in range(len(fragments)):
627 if fragments[i][1] == " ":
628 fragments[i] = t
629 else:
630 break
632 return Transformation(fragments)
635class ShowTrailingWhiteSpaceProcessor(Processor):
636 """
637 Make trailing whitespace visible.
639 :param get_char: Callable that returns one character.
640 """
642 def __init__(
643 self,
644 get_char: Callable[[], str] | None = None,
645 style: str = "class:training-whitespace",
646 ) -> None:
647 def default_get_char() -> str:
648 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?":
649 return "."
650 else:
651 return "\xb7"
653 self.style = style
654 self.get_char = get_char or default_get_char
656 def apply_transformation(self, ti: TransformationInput) -> Transformation:
657 fragments = ti.fragments
659 if fragments and fragments[-1][1].endswith(" "):
660 t = (self.style, self.get_char())
661 fragments = explode_text_fragments(fragments)
663 # Walk backwards through all te fragments and replace whitespace.
664 for i in range(len(fragments) - 1, -1, -1):
665 char = fragments[i][1]
666 if char == " ":
667 fragments[i] = t
668 else:
669 break
671 return Transformation(fragments)
674class TabsProcessor(Processor):
675 """
676 Render tabs as spaces (instead of ^I) or make them visible (for instance,
677 by replacing them with dots.)
679 :param tabstop: Horizontal space taken by a tab. (`int` or callable that
680 returns an `int`).
681 :param char1: Character or callable that returns a character (text of
682 length one). This one is used for the first space taken by the tab.
683 :param char2: Like `char1`, but for the rest of the space.
684 """
686 def __init__(
687 self,
688 tabstop: int | Callable[[], int] = 4,
689 char1: str | Callable[[], str] = "|",
690 char2: str | Callable[[], str] = "\u2508",
691 style: str = "class:tab",
692 ) -> None:
693 self.char1 = char1
694 self.char2 = char2
695 self.tabstop = tabstop
696 self.style = style
698 def apply_transformation(self, ti: TransformationInput) -> Transformation:
699 tabstop = to_int(self.tabstop)
700 style = self.style
702 # Create separator for tabs.
703 separator1 = to_str(self.char1)
704 separator2 = to_str(self.char2)
706 # Transform fragments.
707 fragments = explode_text_fragments(ti.fragments)
709 position_mappings = {}
710 result_fragments: StyleAndTextTuples = []
711 pos = 0
713 for i, fragment_and_text in enumerate(fragments):
714 position_mappings[i] = pos
716 if fragment_and_text[1] == "\t":
717 # Calculate how many characters we have to insert.
718 count = tabstop - (pos % tabstop)
719 if count == 0:
720 count = tabstop
722 # Insert tab.
723 result_fragments.append((style, separator1))
724 result_fragments.append((style, separator2 * (count - 1)))
725 pos += count
726 else:
727 result_fragments.append(fragment_and_text)
728 pos += 1
730 position_mappings[len(fragments)] = pos
731 # Add `pos+1` to mapping, because the cursor can be right after the
732 # line as well.
733 position_mappings[len(fragments) + 1] = pos + 1
735 def source_to_display(from_position: int) -> int:
736 "Maps original cursor position to the new one."
737 return position_mappings[from_position]
739 def display_to_source(display_pos: int) -> int:
740 "Maps display cursor position to the original one."
741 position_mappings_reversed = {v: k for k, v in position_mappings.items()}
743 while display_pos >= 0:
744 try:
745 return position_mappings_reversed[display_pos]
746 except KeyError:
747 display_pos -= 1
748 return 0
750 return Transformation(
751 result_fragments,
752 source_to_display=source_to_display,
753 display_to_source=display_to_source,
754 )
757class ReverseSearchProcessor(Processor):
758 """
759 Process to display the "(reverse-i-search)`...`:..." stuff around
760 the search buffer.
762 Note: This processor is meant to be applied to the BufferControl that
763 contains the search buffer, it's not meant for the original input.
764 """
766 _excluded_input_processors: list[type[Processor]] = [
767 HighlightSearchProcessor,
768 HighlightSelectionProcessor,
769 BeforeInput,
770 AfterInput,
771 ]
773 def _get_main_buffer(self, buffer_control: BufferControl) -> BufferControl | None:
774 from prompt_toolkit.layout.controls import BufferControl
776 prev_control = get_app().layout.search_target_buffer_control
777 if (
778 isinstance(prev_control, BufferControl)
779 and prev_control.search_buffer_control == buffer_control
780 ):
781 return prev_control
782 return None
784 def _content(
785 self, main_control: BufferControl, ti: TransformationInput
786 ) -> UIContent:
787 from prompt_toolkit.layout.controls import BufferControl
789 # Emulate the BufferControl through which we are searching.
790 # For this we filter out some of the input processors.
791 excluded_processors = tuple(self._excluded_input_processors)
793 def filter_processor(item: Processor) -> Processor | None:
794 """Filter processors from the main control that we want to disable
795 here. This returns either an accepted processor or None."""
796 # For a `_MergedProcessor`, check each individual processor, recursively.
797 if isinstance(item, _MergedProcessor):
798 accepted_processors = [filter_processor(p) for p in item.processors]
799 return merge_processors(
800 [p for p in accepted_processors if p is not None]
801 )
803 # For a `ConditionalProcessor`, check the body.
804 elif isinstance(item, ConditionalProcessor):
805 p = filter_processor(item.processor)
806 if p:
807 return ConditionalProcessor(p, item.filter)
809 # Otherwise, check the processor itself.
810 else:
811 if not isinstance(item, excluded_processors):
812 return item
814 return None
816 filtered_processor = filter_processor(
817 merge_processors(main_control.input_processors or [])
818 )
819 highlight_processor = HighlightIncrementalSearchProcessor()
821 if filtered_processor:
822 new_processors = [filtered_processor, highlight_processor]
823 else:
824 new_processors = [highlight_processor]
826 from .controls import SearchBufferControl
828 assert isinstance(ti.buffer_control, SearchBufferControl)
830 buffer_control = BufferControl(
831 buffer=main_control.buffer,
832 input_processors=new_processors,
833 include_default_input_processors=False,
834 lexer=main_control.lexer,
835 preview_search=True,
836 search_buffer_control=ti.buffer_control,
837 )
839 return buffer_control.create_content(ti.width, ti.height, preview_search=True)
841 def apply_transformation(self, ti: TransformationInput) -> Transformation:
842 from .controls import SearchBufferControl
844 assert isinstance(
845 ti.buffer_control, SearchBufferControl
846 ), "`ReverseSearchProcessor` should be applied to a `SearchBufferControl` only."
848 source_to_display: SourceToDisplay | None
849 display_to_source: DisplayToSource | None
851 main_control = self._get_main_buffer(ti.buffer_control)
853 if ti.lineno == 0 and main_control:
854 content = self._content(main_control, ti)
856 # Get the line from the original document for this search.
857 line_fragments = content.get_line(content.cursor_position.y)
859 if main_control.search_state.direction == SearchDirection.FORWARD:
860 direction_text = "i-search"
861 else:
862 direction_text = "reverse-i-search"
864 fragments_before: StyleAndTextTuples = [
865 ("class:prompt.search", "("),
866 ("class:prompt.search", direction_text),
867 ("class:prompt.search", ")`"),
868 ]
870 fragments = (
871 fragments_before
872 + [
873 ("class:prompt.search.text", fragment_list_to_text(ti.fragments)),
874 ("", "': "),
875 ]
876 + line_fragments
877 )
879 shift_position = fragment_list_len(fragments_before)
880 source_to_display = lambda i: i + shift_position
881 display_to_source = lambda i: i - shift_position
882 else:
883 source_to_display = None
884 display_to_source = None
885 fragments = ti.fragments
887 return Transformation(
888 fragments,
889 source_to_display=source_to_display,
890 display_to_source=display_to_source,
891 )
894class ConditionalProcessor(Processor):
895 """
896 Processor that applies another processor, according to a certain condition.
897 Example::
899 # Create a function that returns whether or not the processor should
900 # currently be applied.
901 def highlight_enabled():
902 return true_or_false
904 # Wrapped it in a `ConditionalProcessor` for usage in a `BufferControl`.
905 BufferControl(input_processors=[
906 ConditionalProcessor(HighlightSearchProcessor(),
907 Condition(highlight_enabled))])
909 :param processor: :class:`.Processor` instance.
910 :param filter: :class:`~prompt_toolkit.filters.Filter` instance.
911 """
913 def __init__(self, processor: Processor, filter: FilterOrBool) -> None:
914 self.processor = processor
915 self.filter = to_filter(filter)
917 def apply_transformation(
918 self, transformation_input: TransformationInput
919 ) -> Transformation:
920 # Run processor when enabled.
921 if self.filter():
922 return self.processor.apply_transformation(transformation_input)
923 else:
924 return Transformation(transformation_input.fragments)
926 def __repr__(self) -> str:
927 return "{}(processor={!r}, filter={!r})".format(
928 self.__class__.__name__,
929 self.processor,
930 self.filter,
931 )
934class DynamicProcessor(Processor):
935 """
936 Processor class that dynamically returns any Processor.
938 :param get_processor: Callable that returns a :class:`.Processor` instance.
939 """
941 def __init__(self, get_processor: Callable[[], Processor | None]) -> None:
942 self.get_processor = get_processor
944 def apply_transformation(self, ti: TransformationInput) -> Transformation:
945 processor = self.get_processor() or DummyProcessor()
946 return processor.apply_transformation(ti)
949def merge_processors(processors: list[Processor]) -> Processor:
950 """
951 Merge multiple `Processor` objects into one.
952 """
953 if len(processors) == 0:
954 return DummyProcessor()
956 if len(processors) == 1:
957 return processors[0] # Nothing to merge.
959 return _MergedProcessor(processors)
962class _MergedProcessor(Processor):
963 """
964 Processor that groups multiple other `Processor` objects, but exposes an
965 API as if it is one `Processor`.
966 """
968 def __init__(self, processors: list[Processor]):
969 self.processors = processors
971 def apply_transformation(self, ti: TransformationInput) -> Transformation:
972 source_to_display_functions = [ti.source_to_display]
973 display_to_source_functions = []
974 fragments = ti.fragments
976 def source_to_display(i: int) -> int:
977 """Translate x position from the buffer to the x position in the
978 processor fragments list."""
979 for f in source_to_display_functions:
980 i = f(i)
981 return i
983 for p in self.processors:
984 transformation = p.apply_transformation(
985 TransformationInput(
986 ti.buffer_control,
987 ti.document,
988 ti.lineno,
989 source_to_display,
990 fragments,
991 ti.width,
992 ti.height,
993 )
994 )
995 fragments = transformation.fragments
996 display_to_source_functions.append(transformation.display_to_source)
997 source_to_display_functions.append(transformation.source_to_display)
999 def display_to_source(i: int) -> int:
1000 for f in reversed(display_to_source_functions):
1001 i = f(i)
1002 return i
1004 # In the case of a nested _MergedProcessor, each processor wants to
1005 # receive a 'source_to_display' function (as part of the
1006 # TransformationInput) that has everything in the chain before
1007 # included, because it can be called as part of the
1008 # `apply_transformation` function. However, this first
1009 # `source_to_display` should not be part of the output that we are
1010 # returning. (This is the most consistent with `display_to_source`.)
1011 del source_to_display_functions[:1]
1013 return Transformation(fragments, source_to_display, display_to_source)