1""" """ 
    2 
    3from __future__ import annotations 
    4 
    5from abc import ABCMeta, abstractmethod 
    6from typing import AsyncGenerator, Callable, Iterable, Sequence 
    7 
    8from prompt_toolkit.document import Document 
    9from prompt_toolkit.eventloop import aclosing, generator_to_async_generator 
    10from prompt_toolkit.filters import FilterOrBool, to_filter 
    11from prompt_toolkit.formatted_text import AnyFormattedText, StyleAndTextTuples 
    12 
    13__all__ = [ 
    14    "Completion", 
    15    "Completer", 
    16    "ThreadedCompleter", 
    17    "DummyCompleter", 
    18    "DynamicCompleter", 
    19    "CompleteEvent", 
    20    "ConditionalCompleter", 
    21    "merge_completers", 
    22    "get_common_complete_suffix", 
    23] 
    24 
    25 
    26class Completion: 
    27    """ 
    28    :param text: The new string that will be inserted into the document. 
    29    :param start_position: Position relative to the cursor_position where the 
    30        new text will start. The text will be inserted between the 
    31        start_position and the original cursor position. 
    32    :param display: (optional string or formatted text) If the completion has 
    33        to be displayed differently in the completion menu. 
    34    :param display_meta: (Optional string or formatted text) Meta information 
    35        about the completion, e.g. the path or source where it's coming from. 
    36        This can also be a callable that returns a string. 
    37    :param style: Style string. 
    38    :param selected_style: Style string, used for a selected completion. 
    39        This can override the `style` parameter. 
    40    """ 
    41 
    42    def __init__( 
    43        self, 
    44        text: str, 
    45        start_position: int = 0, 
    46        display: AnyFormattedText | None = None, 
    47        display_meta: AnyFormattedText | None = None, 
    48        style: str = "", 
    49        selected_style: str = "", 
    50    ) -> None: 
    51        from prompt_toolkit.formatted_text import to_formatted_text 
    52 
    53        self.text = text 
    54        self.start_position = start_position 
    55        self._display_meta = display_meta 
    56 
    57        if display is None: 
    58            display = text 
    59 
    60        self.display = to_formatted_text(display) 
    61 
    62        self.style = style 
    63        self.selected_style = selected_style 
    64 
    65        assert self.start_position <= 0 
    66 
    67    def __repr__(self) -> str: 
    68        if isinstance(self.display, str) and self.display == self.text: 
    69            return f"{self.__class__.__name__}(text={self.text!r}, start_position={self.start_position!r})" 
    70        else: 
    71            return f"{self.__class__.__name__}(text={self.text!r}, start_position={self.start_position!r}, display={self.display!r})" 
    72 
    73    def __eq__(self, other: object) -> bool: 
    74        if not isinstance(other, Completion): 
    75            return False 
    76        return ( 
    77            self.text == other.text 
    78            and self.start_position == other.start_position 
    79            and self.display == other.display 
    80            and self._display_meta == other._display_meta 
    81        ) 
    82 
    83    def __hash__(self) -> int: 
    84        return hash((self.text, self.start_position, self.display, self._display_meta)) 
    85 
    86    @property 
    87    def display_text(self) -> str: 
    88        "The 'display' field as plain text." 
    89        from prompt_toolkit.formatted_text import fragment_list_to_text 
    90 
    91        return fragment_list_to_text(self.display) 
    92 
    93    @property 
    94    def display_meta(self) -> StyleAndTextTuples: 
    95        "Return meta-text. (This is lazy when using a callable)." 
    96        from prompt_toolkit.formatted_text import to_formatted_text 
    97 
    98        return to_formatted_text(self._display_meta or "") 
    99 
    100    @property 
    101    def display_meta_text(self) -> str: 
    102        "The 'meta' field as plain text." 
    103        from prompt_toolkit.formatted_text import fragment_list_to_text 
    104 
    105        return fragment_list_to_text(self.display_meta) 
    106 
    107    def new_completion_from_position(self, position: int) -> Completion: 
    108        """ 
    109        (Only for internal use!) 
    110        Get a new completion by splitting this one. Used by `Application` when 
    111        it needs to have a list of new completions after inserting the common 
    112        prefix. 
    113        """ 
    114        assert position - self.start_position >= 0 
    115 
    116        return Completion( 
    117            text=self.text[position - self.start_position :], 
    118            display=self.display, 
    119            display_meta=self._display_meta, 
    120        ) 
    121 
    122 
    123class CompleteEvent: 
    124    """ 
    125    Event that called the completer. 
    126 
    127    :param text_inserted: When True, it means that completions are requested 
    128        because of a text insert. (`Buffer.complete_while_typing`.) 
    129    :param completion_requested: When True, it means that the user explicitly 
    130        pressed the `Tab` key in order to view the completions. 
    131 
    132    These two flags can be used for instance to implement a completer that 
    133    shows some completions when ``Tab`` has been pressed, but not 
    134    automatically when the user presses a space. (Because of 
    135    `complete_while_typing`.) 
    136    """ 
    137 
    138    def __init__( 
    139        self, text_inserted: bool = False, completion_requested: bool = False 
    140    ) -> None: 
    141        assert not (text_inserted and completion_requested) 
    142 
    143        #: Automatic completion while typing. 
    144        self.text_inserted = text_inserted 
    145 
    146        #: Used explicitly requested completion by pressing 'tab'. 
    147        self.completion_requested = completion_requested 
    148 
    149    def __repr__(self) -> str: 
    150        return f"{self.__class__.__name__}(text_inserted={self.text_inserted!r}, completion_requested={self.completion_requested!r})" 
    151 
    152 
    153class Completer(metaclass=ABCMeta): 
    154    """ 
    155    Base class for completer implementations. 
    156    """ 
    157 
    158    @abstractmethod 
    159    def get_completions( 
    160        self, document: Document, complete_event: CompleteEvent 
    161    ) -> Iterable[Completion]: 
    162        """ 
    163        This should be a generator that yields :class:`.Completion` instances. 
    164 
    165        If the generation of completions is something expensive (that takes a 
    166        lot of time), consider wrapping this `Completer` class in a 
    167        `ThreadedCompleter`. In that case, the completer algorithm runs in a 
    168        background thread and completions will be displayed as soon as they 
    169        arrive. 
    170 
    171        :param document: :class:`~prompt_toolkit.document.Document` instance. 
    172        :param complete_event: :class:`.CompleteEvent` instance. 
    173        """ 
    174        while False: 
    175            yield 
    176 
    177    async def get_completions_async( 
    178        self, document: Document, complete_event: CompleteEvent 
    179    ) -> AsyncGenerator[Completion, None]: 
    180        """ 
    181        Asynchronous generator for completions. (Probably, you won't have to 
    182        override this.) 
    183 
    184        Asynchronous generator of :class:`.Completion` objects. 
    185        """ 
    186        for item in self.get_completions(document, complete_event): 
    187            yield item 
    188 
    189 
    190class ThreadedCompleter(Completer): 
    191    """ 
    192    Wrapper that runs the `get_completions` generator in a thread. 
    193 
    194    (Use this to prevent the user interface from becoming unresponsive if the 
    195    generation of completions takes too much time.) 
    196 
    197    The completions will be displayed as soon as they are produced. The user 
    198    can already select a completion, even if not all completions are displayed. 
    199    """ 
    200 
    201    def __init__(self, completer: Completer) -> None: 
    202        self.completer = completer 
    203 
    204    def get_completions( 
    205        self, document: Document, complete_event: CompleteEvent 
    206    ) -> Iterable[Completion]: 
    207        return self.completer.get_completions(document, complete_event) 
    208 
    209    async def get_completions_async( 
    210        self, document: Document, complete_event: CompleteEvent 
    211    ) -> AsyncGenerator[Completion, None]: 
    212        """ 
    213        Asynchronous generator of completions. 
    214        """ 
    215        # NOTE: Right now, we are consuming the `get_completions` generator in 
    216        #       a synchronous background thread, then passing the results one 
    217        #       at a time over a queue, and consuming this queue in the main 
    218        #       thread (that's what `generator_to_async_generator` does). That 
    219        #       means that if the completer is *very* slow, we'll be showing 
    220        #       completions in the UI once they are computed. 
    221 
    222        #       It's very tempting to replace this implementation with the 
    223        #       commented code below for several reasons: 
    224 
    225        #       - `generator_to_async_generator` is not perfect and hard to get 
    226        #         right. It's a lot of complexity for little gain. The 
    227        #         implementation needs a huge buffer for it to be efficient 
    228        #         when there are many completions (like 50k+). 
    229        #       - Normally, a completer is supposed to be fast, users can have 
    230        #         "complete while typing" enabled, and want to see the 
    231        #         completions within a second. Handling one completion at a 
    232        #         time, and rendering once we get it here doesn't make any 
    233        #         sense if this is quick anyway. 
    234        #       - Completers like `FuzzyCompleter` prepare all completions 
    235        #         anyway so that they can be sorted by accuracy before they are 
    236        #         yielded. At the point that we start yielding completions 
    237        #         here, we already have all completions. 
    238        #       - The `Buffer` class has complex logic to invalidate the UI 
    239        #         while it is consuming the completions. We don't want to 
    240        #         invalidate the UI for every completion (if there are many), 
    241        #         but we want to do it often enough so that completions are 
    242        #         being displayed while they are produced. 
    243 
    244        #       We keep the current behavior mainly for backward-compatibility. 
    245        #       Similarly, it would be better for this function to not return 
    246        #       an async generator, but simply be a coroutine that returns a 
    247        #       list of `Completion` objects, containing all completions at 
    248        #       once. 
    249 
    250        #       Note that this argument doesn't mean we shouldn't use 
    251        #       `ThreadedCompleter`. It still makes sense to produce 
    252        #       completions in a background thread, because we don't want to 
    253        #       freeze the UI while the user is typing. But sending the 
    254        #       completions one at a time to the UI maybe isn't worth it. 
    255 
    256        # def get_all_in_thread() -> List[Completion]: 
    257        #   return list(self.get_completions(document, complete_event)) 
    258 
    259        # completions = await get_running_loop().run_in_executor(None, get_all_in_thread) 
    260        # for completion in completions: 
    261        #   yield completion 
    262 
    263        async with aclosing( 
    264            generator_to_async_generator( 
    265                lambda: self.completer.get_completions(document, complete_event) 
    266            ) 
    267        ) as async_generator: 
    268            async for completion in async_generator: 
    269                yield completion 
    270 
    271    def __repr__(self) -> str: 
    272        return f"ThreadedCompleter({self.completer!r})" 
    273 
    274 
    275class DummyCompleter(Completer): 
    276    """ 
    277    A completer that doesn't return any completion. 
    278    """ 
    279 
    280    def get_completions( 
    281        self, document: Document, complete_event: CompleteEvent 
    282    ) -> Iterable[Completion]: 
    283        return [] 
    284 
    285    def __repr__(self) -> str: 
    286        return "DummyCompleter()" 
    287 
    288 
    289class DynamicCompleter(Completer): 
    290    """ 
    291    Completer class that can dynamically returns any Completer. 
    292 
    293    :param get_completer: Callable that returns a :class:`.Completer` instance. 
    294    """ 
    295 
    296    def __init__(self, get_completer: Callable[[], Completer | None]) -> None: 
    297        self.get_completer = get_completer 
    298 
    299    def get_completions( 
    300        self, document: Document, complete_event: CompleteEvent 
    301    ) -> Iterable[Completion]: 
    302        completer = self.get_completer() or DummyCompleter() 
    303        return completer.get_completions(document, complete_event) 
    304 
    305    async def get_completions_async( 
    306        self, document: Document, complete_event: CompleteEvent 
    307    ) -> AsyncGenerator[Completion, None]: 
    308        completer = self.get_completer() or DummyCompleter() 
    309 
    310        async for completion in completer.get_completions_async( 
    311            document, complete_event 
    312        ): 
    313            yield completion 
    314 
    315    def __repr__(self) -> str: 
    316        return f"DynamicCompleter({self.get_completer!r} -> {self.get_completer()!r})" 
    317 
    318 
    319class ConditionalCompleter(Completer): 
    320    """ 
    321    Wrapper around any other completer that will enable/disable the completions 
    322    depending on whether the received condition is satisfied. 
    323 
    324    :param completer: :class:`.Completer` instance. 
    325    :param filter: :class:`.Filter` instance. 
    326    """ 
    327 
    328    def __init__(self, completer: Completer, filter: FilterOrBool) -> None: 
    329        self.completer = completer 
    330        self.filter = to_filter(filter) 
    331 
    332    def __repr__(self) -> str: 
    333        return f"ConditionalCompleter({self.completer!r}, filter={self.filter!r})" 
    334 
    335    def get_completions( 
    336        self, document: Document, complete_event: CompleteEvent 
    337    ) -> Iterable[Completion]: 
    338        # Get all completions in a blocking way. 
    339        if self.filter(): 
    340            yield from self.completer.get_completions(document, complete_event) 
    341 
    342    async def get_completions_async( 
    343        self, document: Document, complete_event: CompleteEvent 
    344    ) -> AsyncGenerator[Completion, None]: 
    345        # Get all completions in a non-blocking way. 
    346        if self.filter(): 
    347            async with aclosing( 
    348                self.completer.get_completions_async(document, complete_event) 
    349            ) as async_generator: 
    350                async for item in async_generator: 
    351                    yield item 
    352 
    353 
    354class _MergedCompleter(Completer): 
    355    """ 
    356    Combine several completers into one. 
    357    """ 
    358 
    359    def __init__(self, completers: Sequence[Completer]) -> None: 
    360        self.completers = completers 
    361 
    362    def get_completions( 
    363        self, document: Document, complete_event: CompleteEvent 
    364    ) -> Iterable[Completion]: 
    365        # Get all completions from the other completers in a blocking way. 
    366        for completer in self.completers: 
    367            yield from completer.get_completions(document, complete_event) 
    368 
    369    async def get_completions_async( 
    370        self, document: Document, complete_event: CompleteEvent 
    371    ) -> AsyncGenerator[Completion, None]: 
    372        # Get all completions from the other completers in a non-blocking way. 
    373        for completer in self.completers: 
    374            async with aclosing( 
    375                completer.get_completions_async(document, complete_event) 
    376            ) as async_generator: 
    377                async for item in async_generator: 
    378                    yield item 
    379 
    380 
    381def merge_completers( 
    382    completers: Sequence[Completer], deduplicate: bool = False 
    383) -> Completer: 
    384    """ 
    385    Combine several completers into one. 
    386 
    387    :param deduplicate: If `True`, wrap the result in a `DeduplicateCompleter` 
    388        so that completions that would result in the same text will be 
    389        deduplicated. 
    390    """ 
    391    if deduplicate: 
    392        from .deduplicate import DeduplicateCompleter 
    393 
    394        return DeduplicateCompleter(_MergedCompleter(completers)) 
    395 
    396    return _MergedCompleter(completers) 
    397 
    398 
    399def get_common_complete_suffix( 
    400    document: Document, completions: Sequence[Completion] 
    401) -> str: 
    402    """ 
    403    Return the common prefix for all completions. 
    404    """ 
    405 
    406    # Take only completions that don't change the text before the cursor. 
    407    def doesnt_change_before_cursor(completion: Completion) -> bool: 
    408        end = completion.text[: -completion.start_position] 
    409        return document.text_before_cursor.endswith(end) 
    410 
    411    completions2 = [c for c in completions if doesnt_change_before_cursor(c)] 
    412 
    413    # When there is at least one completion that changes the text before the 
    414    # cursor, don't return any common part. 
    415    if len(completions2) != len(completions): 
    416        return "" 
    417 
    418    # Return the common prefix. 
    419    def get_suffix(completion: Completion) -> str: 
    420        return completion.text[-completion.start_position :] 
    421 
    422    return _commonprefix([get_suffix(c) for c in completions2]) 
    423 
    424 
    425def _commonprefix(strings: Iterable[str]) -> str: 
    426    # Similar to os.path.commonprefix 
    427    if not strings: 
    428        return "" 
    429 
    430    else: 
    431        s1 = min(strings) 
    432        s2 = max(strings) 
    433 
    434        for i, c in enumerate(s1): 
    435            if c != s2[i]: 
    436                return s1[:i] 
    437 
    438        return s1