Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/layout/processors.py: 23%
379 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1"""
2Processors are little transformation blocks that transform the fragments list
3from a buffer before the BufferControl will render it to the screen.
5They can insert fragments before or after, or highlight fragments by replacing the
6fragment types.
7"""
8from __future__ import annotations
10import re
11from abc import ABCMeta, abstractmethod
12from typing import (
13 TYPE_CHECKING,
14 Callable,
15 Hashable,
16 List,
17 Optional,
18 Tuple,
19 Type,
20 Union,
21 cast,
22)
24from prompt_toolkit.application.current import get_app
25from prompt_toolkit.cache import SimpleCache
26from prompt_toolkit.document import Document
27from prompt_toolkit.filters import FilterOrBool, to_filter, vi_insert_multiple_mode
28from prompt_toolkit.formatted_text import (
29 AnyFormattedText,
30 StyleAndTextTuples,
31 to_formatted_text,
32)
33from prompt_toolkit.formatted_text.utils import fragment_list_len, fragment_list_to_text
34from prompt_toolkit.search import SearchDirection
35from prompt_toolkit.utils import to_int, to_str
37from .utils import explode_text_fragments
39if TYPE_CHECKING:
40 from .controls import BufferControl, UIContent
42__all__ = [
43 "Processor",
44 "TransformationInput",
45 "Transformation",
46 "DummyProcessor",
47 "HighlightSearchProcessor",
48 "HighlightIncrementalSearchProcessor",
49 "HighlightSelectionProcessor",
50 "PasswordProcessor",
51 "HighlightMatchingBracketProcessor",
52 "DisplayMultipleCursors",
53 "BeforeInput",
54 "ShowArg",
55 "AfterInput",
56 "AppendAutoSuggestion",
57 "ConditionalProcessor",
58 "ShowLeadingWhiteSpaceProcessor",
59 "ShowTrailingWhiteSpaceProcessor",
60 "TabsProcessor",
61 "ReverseSearchProcessor",
62 "DynamicProcessor",
63 "merge_processors",
64]
67class Processor(metaclass=ABCMeta):
68 """
69 Manipulate the fragments for a given line in a
70 :class:`~prompt_toolkit.layout.controls.BufferControl`.
71 """
73 @abstractmethod
74 def apply_transformation(
75 self, transformation_input: TransformationInput
76 ) -> Transformation:
77 """
78 Apply transformation. Returns a :class:`.Transformation` instance.
80 :param transformation_input: :class:`.TransformationInput` object.
81 """
82 return Transformation(transformation_input.fragments)
85SourceToDisplay = Callable[[int], int]
86DisplayToSource = Callable[[int], int]
89class TransformationInput:
90 """
91 :param buffer_control: :class:`.BufferControl` instance.
92 :param lineno: The number of the line to which we apply the processor.
93 :param source_to_display: A function that returns the position in the
94 `fragments` for any position in the source string. (This takes
95 previous processors into account.)
96 :param fragments: List of fragments that we can transform. (Received from the
97 previous processor.)
98 """
100 def __init__(
101 self,
102 buffer_control: BufferControl,
103 document: Document,
104 lineno: int,
105 source_to_display: SourceToDisplay,
106 fragments: StyleAndTextTuples,
107 width: int,
108 height: int,
109 ) -> None:
110 self.buffer_control = buffer_control
111 self.document = document
112 self.lineno = lineno
113 self.source_to_display = source_to_display
114 self.fragments = fragments
115 self.width = width
116 self.height = height
118 def unpack(
119 self,
120 ) -> tuple[
121 BufferControl, Document, int, SourceToDisplay, StyleAndTextTuples, int, int
122 ]:
123 return (
124 self.buffer_control,
125 self.document,
126 self.lineno,
127 self.source_to_display,
128 self.fragments,
129 self.width,
130 self.height,
131 )
134class Transformation:
135 """
136 Transformation result, as returned by :meth:`.Processor.apply_transformation`.
138 Important: Always make sure that the length of `document.text` is equal to
139 the length of all the text in `fragments`!
141 :param fragments: The transformed fragments. To be displayed, or to pass to
142 the next processor.
143 :param source_to_display: Cursor position transformation from original
144 string to transformed string.
145 :param display_to_source: Cursor position transformed from source string to
146 original string.
147 """
149 def __init__(
150 self,
151 fragments: StyleAndTextTuples,
152 source_to_display: SourceToDisplay | None = None,
153 display_to_source: DisplayToSource | None = None,
154 ) -> None:
155 self.fragments = fragments
156 self.source_to_display = source_to_display or (lambda i: i)
157 self.display_to_source = display_to_source or (lambda i: i)
160class DummyProcessor(Processor):
161 """
162 A `Processor` that doesn't do anything.
163 """
165 def apply_transformation(
166 self, transformation_input: TransformationInput
167 ) -> Transformation:
168 return Transformation(transformation_input.fragments)
171class HighlightSearchProcessor(Processor):
172 """
173 Processor that highlights search matches in the document.
174 Note that this doesn't support multiline search matches yet.
176 The style classes 'search' and 'search.current' will be applied to the
177 content.
178 """
180 _classname = "search"
181 _classname_current = "search.current"
183 def _get_search_text(self, buffer_control: BufferControl) -> str:
184 """
185 The text we are searching for.
186 """
187 return buffer_control.search_state.text
189 def apply_transformation(
190 self, transformation_input: TransformationInput
191 ) -> Transformation:
192 (
193 buffer_control,
194 document,
195 lineno,
196 source_to_display,
197 fragments,
198 _,
199 _,
200 ) = transformation_input.unpack()
202 search_text = self._get_search_text(buffer_control)
203 searchmatch_fragment = f" class:{self._classname} "
204 searchmatch_current_fragment = f" class:{self._classname_current} "
206 if search_text and not get_app().is_done:
207 # For each search match, replace the style string.
208 line_text = fragment_list_to_text(fragments)
209 fragments = explode_text_fragments(fragments)
211 if buffer_control.search_state.ignore_case():
212 flags = re.IGNORECASE
213 else:
214 flags = re.RegexFlag(0)
216 # Get cursor column.
217 cursor_column: int | None
218 if document.cursor_position_row == lineno:
219 cursor_column = source_to_display(document.cursor_position_col)
220 else:
221 cursor_column = None
223 for match in re.finditer(re.escape(search_text), line_text, flags=flags):
224 if cursor_column is not None:
225 on_cursor = match.start() <= cursor_column < match.end()
226 else:
227 on_cursor = False
229 for i in range(match.start(), match.end()):
230 old_fragment, text, *_ = fragments[i]
231 if on_cursor:
232 fragments[i] = (
233 old_fragment + searchmatch_current_fragment,
234 fragments[i][1],
235 )
236 else:
237 fragments[i] = (
238 old_fragment + searchmatch_fragment,
239 fragments[i][1],
240 )
242 return Transformation(fragments)
245class HighlightIncrementalSearchProcessor(HighlightSearchProcessor):
246 """
247 Highlight the search terms that are used for highlighting the incremental
248 search. The style class 'incsearch' will be applied to the content.
250 Important: this requires the `preview_search=True` flag to be set for the
251 `BufferControl`. Otherwise, the cursor position won't be set to the search
252 match while searching, and nothing happens.
253 """
255 _classname = "incsearch"
256 _classname_current = "incsearch.current"
258 def _get_search_text(self, buffer_control: BufferControl) -> str:
259 """
260 The text we are searching for.
261 """
262 # When the search buffer has focus, take that text.
263 search_buffer = buffer_control.search_buffer
264 if search_buffer is not None and search_buffer.text:
265 return search_buffer.text
266 return ""
269class HighlightSelectionProcessor(Processor):
270 """
271 Processor that highlights the selection in the document.
272 """
274 def apply_transformation(
275 self, transformation_input: TransformationInput
276 ) -> Transformation:
277 (
278 buffer_control,
279 document,
280 lineno,
281 source_to_display,
282 fragments,
283 _,
284 _,
285 ) = transformation_input.unpack()
287 selected_fragment = " class:selected "
289 # In case of selection, highlight all matches.
290 selection_at_line = document.selection_range_at_line(lineno)
292 if selection_at_line:
293 from_, to = selection_at_line
294 from_ = source_to_display(from_)
295 to = source_to_display(to)
297 fragments = explode_text_fragments(fragments)
299 if from_ == 0 and to == 0 and len(fragments) == 0:
300 # When this is an empty line, insert a space in order to
301 # visualise the selection.
302 return Transformation([(selected_fragment, " ")])
303 else:
304 for i in range(from_, to):
305 if i < len(fragments):
306 old_fragment, old_text, *_ = fragments[i]
307 fragments[i] = (old_fragment + selected_fragment, old_text)
308 elif i == len(fragments):
309 fragments.append((selected_fragment, " "))
311 return Transformation(fragments)
314class PasswordProcessor(Processor):
315 """
316 Processor that masks the input. (For passwords.)
318 :param char: (string) Character to be used. "*" by default.
319 """
321 def __init__(self, char: str = "*") -> None:
322 self.char = char
324 def apply_transformation(self, ti: TransformationInput) -> Transformation:
325 fragments: StyleAndTextTuples = cast(
326 StyleAndTextTuples,
327 [
328 (style, self.char * len(text), *handler)
329 for style, text, *handler in ti.fragments
330 ],
331 )
333 return Transformation(fragments)
336class HighlightMatchingBracketProcessor(Processor):
337 """
338 When the cursor is on or right after a bracket, it highlights the matching
339 bracket.
341 :param max_cursor_distance: Only highlight matching brackets when the
342 cursor is within this distance. (From inside a `Processor`, we can't
343 know which lines will be visible on the screen. But we also don't want
344 to scan the whole document for matching brackets on each key press, so
345 we limit to this value.)
346 """
348 _closing_braces = "])}>"
350 def __init__(
351 self, chars: str = "[](){}<>", max_cursor_distance: int = 1000
352 ) -> None:
353 self.chars = chars
354 self.max_cursor_distance = max_cursor_distance
356 self._positions_cache: SimpleCache[
357 Hashable, list[tuple[int, int]]
358 ] = SimpleCache(maxsize=8)
360 def _get_positions_to_highlight(self, document: Document) -> list[tuple[int, int]]:
361 """
362 Return a list of (row, col) tuples that need to be highlighted.
363 """
364 pos: int | None
366 # Try for the character under the cursor.
367 if document.current_char and document.current_char in self.chars:
368 pos = document.find_matching_bracket_position(
369 start_pos=document.cursor_position - self.max_cursor_distance,
370 end_pos=document.cursor_position + self.max_cursor_distance,
371 )
373 # Try for the character before the cursor.
374 elif (
375 document.char_before_cursor
376 and document.char_before_cursor in self._closing_braces
377 and document.char_before_cursor in self.chars
378 ):
379 document = Document(document.text, document.cursor_position - 1)
381 pos = document.find_matching_bracket_position(
382 start_pos=document.cursor_position - self.max_cursor_distance,
383 end_pos=document.cursor_position + self.max_cursor_distance,
384 )
385 else:
386 pos = None
388 # Return a list of (row, col) tuples that need to be highlighted.
389 if pos:
390 pos += document.cursor_position # pos is relative.
391 row, col = document.translate_index_to_position(pos)
392 return [
393 (row, col),
394 (document.cursor_position_row, document.cursor_position_col),
395 ]
396 else:
397 return []
399 def apply_transformation(
400 self, transformation_input: TransformationInput
401 ) -> Transformation:
402 (
403 buffer_control,
404 document,
405 lineno,
406 source_to_display,
407 fragments,
408 _,
409 _,
410 ) = transformation_input.unpack()
412 # When the application is in the 'done' state, don't highlight.
413 if get_app().is_done:
414 return Transformation(fragments)
416 # Get the highlight positions.
417 key = (get_app().render_counter, document.text, document.cursor_position)
418 positions = self._positions_cache.get(
419 key, lambda: self._get_positions_to_highlight(document)
420 )
422 # Apply if positions were found at this line.
423 if positions:
424 for row, col in positions:
425 if row == lineno:
426 col = source_to_display(col)
427 fragments = explode_text_fragments(fragments)
428 style, text, *_ = fragments[col]
430 if col == document.cursor_position_col:
431 style += " class:matching-bracket.cursor "
432 else:
433 style += " class:matching-bracket.other "
435 fragments[col] = (style, text)
437 return Transformation(fragments)
440class DisplayMultipleCursors(Processor):
441 """
442 When we're in Vi block insert mode, display all the cursors.
443 """
445 def apply_transformation(
446 self, transformation_input: TransformationInput
447 ) -> Transformation:
448 (
449 buffer_control,
450 document,
451 lineno,
452 source_to_display,
453 fragments,
454 _,
455 _,
456 ) = transformation_input.unpack()
458 buff = buffer_control.buffer
460 if vi_insert_multiple_mode():
461 cursor_positions = buff.multiple_cursor_positions
462 fragments = explode_text_fragments(fragments)
464 # If any cursor appears on the current line, highlight that.
465 start_pos = document.translate_row_col_to_index(lineno, 0)
466 end_pos = start_pos + len(document.lines[lineno])
468 fragment_suffix = " class:multiple-cursors"
470 for p in cursor_positions:
471 if start_pos <= p <= end_pos:
472 column = source_to_display(p - start_pos)
474 # Replace fragment.
475 try:
476 style, text, *_ = fragments[column]
477 except IndexError:
478 # Cursor needs to be displayed after the current text.
479 fragments.append((fragment_suffix, " "))
480 else:
481 style += fragment_suffix
482 fragments[column] = (style, text)
484 return Transformation(fragments)
485 else:
486 return Transformation(fragments)
489class BeforeInput(Processor):
490 """
491 Insert text before the input.
493 :param text: This can be either plain text or formatted text
494 (or a callable that returns any of those).
495 :param style: style to be applied to this prompt/prefix.
496 """
498 def __init__(self, text: AnyFormattedText, style: str = "") -> None:
499 self.text = text
500 self.style = style
502 def apply_transformation(self, ti: TransformationInput) -> Transformation:
503 source_to_display: SourceToDisplay | None
504 display_to_source: DisplayToSource | None
506 if ti.lineno == 0:
507 # Get fragments.
508 fragments_before = to_formatted_text(self.text, self.style)
509 fragments = fragments_before + ti.fragments
511 shift_position = fragment_list_len(fragments_before)
512 source_to_display = lambda i: i + shift_position
513 display_to_source = lambda i: i - shift_position
514 else:
515 fragments = ti.fragments
516 source_to_display = None
517 display_to_source = None
519 return Transformation(
520 fragments,
521 source_to_display=source_to_display,
522 display_to_source=display_to_source,
523 )
525 def __repr__(self) -> str:
526 return f"BeforeInput({self.text!r}, {self.style!r})"
529class ShowArg(BeforeInput):
530 """
531 Display the 'arg' in front of the input.
533 This was used by the `PromptSession`, but now it uses the
534 `Window.get_line_prefix` function instead.
535 """
537 def __init__(self) -> None:
538 super().__init__(self._get_text_fragments)
540 def _get_text_fragments(self) -> StyleAndTextTuples:
541 app = get_app()
542 if app.key_processor.arg is None:
543 return []
544 else:
545 arg = app.key_processor.arg
547 return [
548 ("class:prompt.arg", "(arg: "),
549 ("class:prompt.arg.text", str(arg)),
550 ("class:prompt.arg", ") "),
551 ]
553 def __repr__(self) -> str:
554 return "ShowArg()"
557class AfterInput(Processor):
558 """
559 Insert text after the input.
561 :param text: This can be either plain text or formatted text
562 (or a callable that returns any of those).
563 :param style: style to be applied to this prompt/prefix.
564 """
566 def __init__(self, text: AnyFormattedText, style: str = "") -> None:
567 self.text = text
568 self.style = style
570 def apply_transformation(self, ti: TransformationInput) -> Transformation:
571 # Insert fragments after the last line.
572 if ti.lineno == ti.document.line_count - 1:
573 # Get fragments.
574 fragments_after = to_formatted_text(self.text, self.style)
575 return Transformation(fragments=ti.fragments + fragments_after)
576 else:
577 return Transformation(fragments=ti.fragments)
579 def __repr__(self) -> str:
580 return f"{self.__class__.__name__}({self.text!r}, style={self.style!r})"
583class AppendAutoSuggestion(Processor):
584 """
585 Append the auto suggestion to the input.
586 (The user can then press the right arrow the insert the suggestion.)
587 """
589 def __init__(self, style: str = "class:auto-suggestion") -> None:
590 self.style = style
592 def apply_transformation(self, ti: TransformationInput) -> Transformation:
593 # Insert fragments after the last line.
594 if ti.lineno == ti.document.line_count - 1:
595 buffer = ti.buffer_control.buffer
597 if buffer.suggestion and ti.document.is_cursor_at_the_end:
598 suggestion = buffer.suggestion.text
599 else:
600 suggestion = ""
602 return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
603 else:
604 return Transformation(fragments=ti.fragments)
607class ShowLeadingWhiteSpaceProcessor(Processor):
608 """
609 Make leading whitespace visible.
611 :param get_char: Callable that returns one character.
612 """
614 def __init__(
615 self,
616 get_char: Callable[[], str] | None = None,
617 style: str = "class:leading-whitespace",
618 ) -> None:
619 def default_get_char() -> str:
620 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?":
621 return "."
622 else:
623 return "\xb7"
625 self.style = style
626 self.get_char = get_char or default_get_char
628 def apply_transformation(self, ti: TransformationInput) -> Transformation:
629 fragments = ti.fragments
631 # Walk through all te fragments.
632 if fragments and fragment_list_to_text(fragments).startswith(" "):
633 t = (self.style, self.get_char())
634 fragments = explode_text_fragments(fragments)
636 for i in range(len(fragments)):
637 if fragments[i][1] == " ":
638 fragments[i] = t
639 else:
640 break
642 return Transformation(fragments)
645class ShowTrailingWhiteSpaceProcessor(Processor):
646 """
647 Make trailing whitespace visible.
649 :param get_char: Callable that returns one character.
650 """
652 def __init__(
653 self,
654 get_char: Callable[[], str] | None = None,
655 style: str = "class:training-whitespace",
656 ) -> None:
657 def default_get_char() -> str:
658 if "\xb7".encode(get_app().output.encoding(), "replace") == b"?":
659 return "."
660 else:
661 return "\xb7"
663 self.style = style
664 self.get_char = get_char or default_get_char
666 def apply_transformation(self, ti: TransformationInput) -> Transformation:
667 fragments = ti.fragments
669 if fragments and fragments[-1][1].endswith(" "):
670 t = (self.style, self.get_char())
671 fragments = explode_text_fragments(fragments)
673 # Walk backwards through all te fragments and replace whitespace.
674 for i in range(len(fragments) - 1, -1, -1):
675 char = fragments[i][1]
676 if char == " ":
677 fragments[i] = t
678 else:
679 break
681 return Transformation(fragments)
684class TabsProcessor(Processor):
685 """
686 Render tabs as spaces (instead of ^I) or make them visible (for instance,
687 by replacing them with dots.)
689 :param tabstop: Horizontal space taken by a tab. (`int` or callable that
690 returns an `int`).
691 :param char1: Character or callable that returns a character (text of
692 length one). This one is used for the first space taken by the tab.
693 :param char2: Like `char1`, but for the rest of the space.
694 """
696 def __init__(
697 self,
698 tabstop: int | Callable[[], int] = 4,
699 char1: str | Callable[[], str] = "|",
700 char2: str | Callable[[], str] = "\u2508",
701 style: str = "class:tab",
702 ) -> None:
703 self.char1 = char1
704 self.char2 = char2
705 self.tabstop = tabstop
706 self.style = style
708 def apply_transformation(self, ti: TransformationInput) -> Transformation:
709 tabstop = to_int(self.tabstop)
710 style = self.style
712 # Create separator for tabs.
713 separator1 = to_str(self.char1)
714 separator2 = to_str(self.char2)
716 # Transform fragments.
717 fragments = explode_text_fragments(ti.fragments)
719 position_mappings = {}
720 result_fragments: StyleAndTextTuples = []
721 pos = 0
723 for i, fragment_and_text in enumerate(fragments):
724 position_mappings[i] = pos
726 if fragment_and_text[1] == "\t":
727 # Calculate how many characters we have to insert.
728 count = tabstop - (pos % tabstop)
729 if count == 0:
730 count = tabstop
732 # Insert tab.
733 result_fragments.append((style, separator1))
734 result_fragments.append((style, separator2 * (count - 1)))
735 pos += count
736 else:
737 result_fragments.append(fragment_and_text)
738 pos += 1
740 position_mappings[len(fragments)] = pos
741 # Add `pos+1` to mapping, because the cursor can be right after the
742 # line as well.
743 position_mappings[len(fragments) + 1] = pos + 1
745 def source_to_display(from_position: int) -> int:
746 "Maps original cursor position to the new one."
747 return position_mappings[from_position]
749 def display_to_source(display_pos: int) -> int:
750 "Maps display cursor position to the original one."
751 position_mappings_reversed = {v: k for k, v in position_mappings.items()}
753 while display_pos >= 0:
754 try:
755 return position_mappings_reversed[display_pos]
756 except KeyError:
757 display_pos -= 1
758 return 0
760 return Transformation(
761 result_fragments,
762 source_to_display=source_to_display,
763 display_to_source=display_to_source,
764 )
767class ReverseSearchProcessor(Processor):
768 """
769 Process to display the "(reverse-i-search)`...`:..." stuff around
770 the search buffer.
772 Note: This processor is meant to be applied to the BufferControl that
773 contains the search buffer, it's not meant for the original input.
774 """
776 _excluded_input_processors: list[type[Processor]] = [
777 HighlightSearchProcessor,
778 HighlightSelectionProcessor,
779 BeforeInput,
780 AfterInput,
781 ]
783 def _get_main_buffer(self, buffer_control: BufferControl) -> BufferControl | None:
784 from prompt_toolkit.layout.controls import BufferControl
786 prev_control = get_app().layout.search_target_buffer_control
787 if (
788 isinstance(prev_control, BufferControl)
789 and prev_control.search_buffer_control == buffer_control
790 ):
791 return prev_control
792 return None
794 def _content(
795 self, main_control: BufferControl, ti: TransformationInput
796 ) -> UIContent:
797 from prompt_toolkit.layout.controls import BufferControl
799 # Emulate the BufferControl through which we are searching.
800 # For this we filter out some of the input processors.
801 excluded_processors = tuple(self._excluded_input_processors)
803 def filter_processor(item: Processor) -> Processor | None:
804 """Filter processors from the main control that we want to disable
805 here. This returns either an accepted processor or None."""
806 # For a `_MergedProcessor`, check each individual processor, recursively.
807 if isinstance(item, _MergedProcessor):
808 accepted_processors = [filter_processor(p) for p in item.processors]
809 return merge_processors(
810 [p for p in accepted_processors if p is not None]
811 )
813 # For a `ConditionalProcessor`, check the body.
814 elif isinstance(item, ConditionalProcessor):
815 p = filter_processor(item.processor)
816 if p:
817 return ConditionalProcessor(p, item.filter)
819 # Otherwise, check the processor itself.
820 else:
821 if not isinstance(item, excluded_processors):
822 return item
824 return None
826 filtered_processor = filter_processor(
827 merge_processors(main_control.input_processors or [])
828 )
829 highlight_processor = HighlightIncrementalSearchProcessor()
831 if filtered_processor:
832 new_processors = [filtered_processor, highlight_processor]
833 else:
834 new_processors = [highlight_processor]
836 from .controls import SearchBufferControl
838 assert isinstance(ti.buffer_control, SearchBufferControl)
840 buffer_control = BufferControl(
841 buffer=main_control.buffer,
842 input_processors=new_processors,
843 include_default_input_processors=False,
844 lexer=main_control.lexer,
845 preview_search=True,
846 search_buffer_control=ti.buffer_control,
847 )
849 return buffer_control.create_content(ti.width, ti.height, preview_search=True)
851 def apply_transformation(self, ti: TransformationInput) -> Transformation:
852 from .controls import SearchBufferControl
854 assert isinstance(
855 ti.buffer_control, SearchBufferControl
856 ), "`ReverseSearchProcessor` should be applied to a `SearchBufferControl` only."
858 source_to_display: SourceToDisplay | None
859 display_to_source: DisplayToSource | None
861 main_control = self._get_main_buffer(ti.buffer_control)
863 if ti.lineno == 0 and main_control:
864 content = self._content(main_control, ti)
866 # Get the line from the original document for this search.
867 line_fragments = content.get_line(content.cursor_position.y)
869 if main_control.search_state.direction == SearchDirection.FORWARD:
870 direction_text = "i-search"
871 else:
872 direction_text = "reverse-i-search"
874 fragments_before: StyleAndTextTuples = [
875 ("class:prompt.search", "("),
876 ("class:prompt.search", direction_text),
877 ("class:prompt.search", ")`"),
878 ]
880 fragments = (
881 fragments_before
882 + [
883 ("class:prompt.search.text", fragment_list_to_text(ti.fragments)),
884 ("", "': "),
885 ]
886 + line_fragments
887 )
889 shift_position = fragment_list_len(fragments_before)
890 source_to_display = lambda i: i + shift_position
891 display_to_source = lambda i: i - shift_position
892 else:
893 source_to_display = None
894 display_to_source = None
895 fragments = ti.fragments
897 return Transformation(
898 fragments,
899 source_to_display=source_to_display,
900 display_to_source=display_to_source,
901 )
904class ConditionalProcessor(Processor):
905 """
906 Processor that applies another processor, according to a certain condition.
907 Example::
909 # Create a function that returns whether or not the processor should
910 # currently be applied.
911 def highlight_enabled():
912 return true_or_false
914 # Wrapped it in a `ConditionalProcessor` for usage in a `BufferControl`.
915 BufferControl(input_processors=[
916 ConditionalProcessor(HighlightSearchProcessor(),
917 Condition(highlight_enabled))])
919 :param processor: :class:`.Processor` instance.
920 :param filter: :class:`~prompt_toolkit.filters.Filter` instance.
921 """
923 def __init__(self, processor: Processor, filter: FilterOrBool) -> None:
924 self.processor = processor
925 self.filter = to_filter(filter)
927 def apply_transformation(
928 self, transformation_input: TransformationInput
929 ) -> Transformation:
930 # Run processor when enabled.
931 if self.filter():
932 return self.processor.apply_transformation(transformation_input)
933 else:
934 return Transformation(transformation_input.fragments)
936 def __repr__(self) -> str:
937 return "{}(processor={!r}, filter={!r})".format(
938 self.__class__.__name__,
939 self.processor,
940 self.filter,
941 )
944class DynamicProcessor(Processor):
945 """
946 Processor class that dynamically returns any Processor.
948 :param get_processor: Callable that returns a :class:`.Processor` instance.
949 """
951 def __init__(self, get_processor: Callable[[], Processor | None]) -> None:
952 self.get_processor = get_processor
954 def apply_transformation(self, ti: TransformationInput) -> Transformation:
955 processor = self.get_processor() or DummyProcessor()
956 return processor.apply_transformation(ti)
959def merge_processors(processors: list[Processor]) -> Processor:
960 """
961 Merge multiple `Processor` objects into one.
962 """
963 if len(processors) == 0:
964 return DummyProcessor()
966 if len(processors) == 1:
967 return processors[0] # Nothing to merge.
969 return _MergedProcessor(processors)
972class _MergedProcessor(Processor):
973 """
974 Processor that groups multiple other `Processor` objects, but exposes an
975 API as if it is one `Processor`.
976 """
978 def __init__(self, processors: list[Processor]):
979 self.processors = processors
981 def apply_transformation(self, ti: TransformationInput) -> Transformation:
982 source_to_display_functions = [ti.source_to_display]
983 display_to_source_functions = []
984 fragments = ti.fragments
986 def source_to_display(i: int) -> int:
987 """Translate x position from the buffer to the x position in the
988 processor fragments list."""
989 for f in source_to_display_functions:
990 i = f(i)
991 return i
993 for p in self.processors:
994 transformation = p.apply_transformation(
995 TransformationInput(
996 ti.buffer_control,
997 ti.document,
998 ti.lineno,
999 source_to_display,
1000 fragments,
1001 ti.width,
1002 ti.height,
1003 )
1004 )
1005 fragments = transformation.fragments
1006 display_to_source_functions.append(transformation.display_to_source)
1007 source_to_display_functions.append(transformation.source_to_display)
1009 def display_to_source(i: int) -> int:
1010 for f in reversed(display_to_source_functions):
1011 i = f(i)
1012 return i
1014 # In the case of a nested _MergedProcessor, each processor wants to
1015 # receive a 'source_to_display' function (as part of the
1016 # TransformationInput) that has everything in the chain before
1017 # included, because it can be called as part of the
1018 # `apply_transformation` function. However, this first
1019 # `source_to_display` should not be part of the output that we are
1020 # returning. (This is the most consistent with `display_to_source`.)
1021 del source_to_display_functions[:1]
1023 return Transformation(fragments, source_to_display, display_to_source)