1""" 
    2An :class:`~.KeyProcessor` receives callbacks for the keystrokes parsed from 
    3the input in the :class:`~prompt_toolkit.inputstream.InputStream` instance. 
    4 
    5The `KeyProcessor` will according to the implemented keybindings call the 
    6correct callbacks when new key presses are feed through `feed`. 
    7""" 
    8 
    9from __future__ import annotations 
    10 
    11import weakref 
    12from asyncio import Task, sleep 
    13from collections import deque 
    14from typing import TYPE_CHECKING, Any, Generator 
    15 
    16from prompt_toolkit.application.current import get_app 
    17from prompt_toolkit.enums import EditingMode 
    18from prompt_toolkit.filters.app import vi_navigation_mode 
    19from prompt_toolkit.keys import Keys 
    20from prompt_toolkit.utils import Event 
    21 
    22from .key_bindings import Binding, KeyBindingsBase 
    23 
    24if TYPE_CHECKING: 
    25    from prompt_toolkit.application import Application 
    26    from prompt_toolkit.buffer import Buffer 
    27 
    28 
    29__all__ = [ 
    30    "KeyProcessor", 
    31    "KeyPress", 
    32    "KeyPressEvent", 
    33] 
    34 
    35 
    36class KeyPress: 
    37    """ 
    38    :param key: A `Keys` instance or text (one character). 
    39    :param data: The received string on stdin. (Often vt100 escape codes.) 
    40    """ 
    41 
    42    def __init__(self, key: Keys | str, data: str | None = None) -> None: 
    43        assert isinstance(key, Keys) or len(key) == 1 
    44 
    45        if data is None: 
    46            if isinstance(key, Keys): 
    47                data = key.value 
    48            else: 
    49                data = key  # 'key' is a one character string. 
    50 
    51        self.key = key 
    52        self.data = data 
    53 
    54    def __repr__(self) -> str: 
    55        return f"{self.__class__.__name__}(key={self.key!r}, data={self.data!r})" 
    56 
    57    def __eq__(self, other: object) -> bool: 
    58        if not isinstance(other, KeyPress): 
    59            return False 
    60        return self.key == other.key and self.data == other.data 
    61 
    62 
    63""" 
    64Helper object to indicate flush operation in the KeyProcessor. 
    65NOTE: the implementation is very similar to the VT100 parser. 
    66""" 
    67_Flush = KeyPress("?", data="_Flush") 
    68 
    69 
    70class KeyProcessor: 
    71    """ 
    72    Statemachine that receives :class:`KeyPress` instances and according to the 
    73    key bindings in the given :class:`KeyBindings`, calls the matching handlers. 
    74 
    75    :: 
    76 
    77        p = KeyProcessor(key_bindings) 
    78 
    79        # Send keys into the processor. 
    80        p.feed(KeyPress(Keys.ControlX, '\x18')) 
    81        p.feed(KeyPress(Keys.ControlC, '\x03') 
    82 
    83        # Process all the keys in the queue. 
    84        p.process_keys() 
    85 
    86        # Now the ControlX-ControlC callback will be called if this sequence is 
    87        # registered in the key bindings. 
    88 
    89    :param key_bindings: `KeyBindingsBase` instance. 
    90    """ 
    91 
    92    def __init__(self, key_bindings: KeyBindingsBase) -> None: 
    93        self._bindings = key_bindings 
    94 
    95        self.before_key_press = Event(self) 
    96        self.after_key_press = Event(self) 
    97 
    98        self._flush_wait_task: Task[None] | None = None 
    99 
    100        self.reset() 
    101 
    102    def reset(self) -> None: 
    103        self._previous_key_sequence: list[KeyPress] = [] 
    104        self._previous_handler: Binding | None = None 
    105 
    106        # The queue of keys not yet send to our _process generator/state machine. 
    107        self.input_queue: deque[KeyPress] = deque() 
    108 
    109        # The key buffer that is matched in the generator state machine. 
    110        # (This is at at most the amount of keys that make up for one key binding.) 
    111        self.key_buffer: list[KeyPress] = [] 
    112 
    113        #: Readline argument (for repetition of commands.) 
    114        #: https://www.gnu.org/software/bash/manual/html_node/Readline-Arguments.html 
    115        self.arg: str | None = None 
    116 
    117        # Start the processor coroutine. 
    118        self._process_coroutine = self._process() 
    119        self._process_coroutine.send(None)  # type: ignore 
    120 
    121    def _get_matches(self, key_presses: list[KeyPress]) -> list[Binding]: 
    122        """ 
    123        For a list of :class:`KeyPress` instances. Give the matching handlers 
    124        that would handle this. 
    125        """ 
    126        keys = tuple(k.key for k in key_presses) 
    127 
    128        # Try match, with mode flag 
    129        return [b for b in self._bindings.get_bindings_for_keys(keys) if b.filter()] 
    130 
    131    def _is_prefix_of_longer_match(self, key_presses: list[KeyPress]) -> bool: 
    132        """ 
    133        For a list of :class:`KeyPress` instances. Return True if there is any 
    134        handler that is bound to a suffix of this keys. 
    135        """ 
    136        keys = tuple(k.key for k in key_presses) 
    137 
    138        # Get the filters for all the key bindings that have a longer match. 
    139        # Note that we transform it into a `set`, because we don't care about 
    140        # the actual bindings and executing it more than once doesn't make 
    141        # sense. (Many key bindings share the same filter.) 
    142        filters = { 
    143            b.filter for b in self._bindings.get_bindings_starting_with_keys(keys) 
    144        } 
    145 
    146        # When any key binding is active, return True. 
    147        return any(f() for f in filters) 
    148 
    149    def _process(self) -> Generator[None, KeyPress, None]: 
    150        """ 
    151        Coroutine implementing the key match algorithm. Key strokes are sent 
    152        into this generator, and it calls the appropriate handlers. 
    153        """ 
    154        buffer = self.key_buffer 
    155        retry = False 
    156 
    157        while True: 
    158            flush = False 
    159 
    160            if retry: 
    161                retry = False 
    162            else: 
    163                key = yield 
    164                if key is _Flush: 
    165                    flush = True 
    166                else: 
    167                    buffer.append(key) 
    168 
    169            # If we have some key presses, check for matches. 
    170            if buffer: 
    171                matches = self._get_matches(buffer) 
    172 
    173                if flush: 
    174                    is_prefix_of_longer_match = False 
    175                else: 
    176                    is_prefix_of_longer_match = self._is_prefix_of_longer_match(buffer) 
    177 
    178                # When eager matches were found, give priority to them and also 
    179                # ignore all the longer matches. 
    180                eager_matches = [m for m in matches if m.eager()] 
    181 
    182                if eager_matches: 
    183                    matches = eager_matches 
    184                    is_prefix_of_longer_match = False 
    185 
    186                # Exact matches found, call handler. 
    187                if not is_prefix_of_longer_match and matches: 
    188                    self._call_handler(matches[-1], key_sequence=buffer[:]) 
    189                    del buffer[:]  # Keep reference. 
    190 
    191                # No match found. 
    192                elif not is_prefix_of_longer_match and not matches: 
    193                    retry = True 
    194                    found = False 
    195 
    196                    # Loop over the input, try longest match first and shift. 
    197                    for i in range(len(buffer), 0, -1): 
    198                        matches = self._get_matches(buffer[:i]) 
    199                        if matches: 
    200                            self._call_handler(matches[-1], key_sequence=buffer[:i]) 
    201                            del buffer[:i] 
    202                            found = True 
    203                            break 
    204 
    205                    if not found: 
    206                        del buffer[:1] 
    207 
    208    def feed(self, key_press: KeyPress, first: bool = False) -> None: 
    209        """ 
    210        Add a new :class:`KeyPress` to the input queue. 
    211        (Don't forget to call `process_keys` in order to process the queue.) 
    212 
    213        :param first: If true, insert before everything else. 
    214        """ 
    215        if first: 
    216            self.input_queue.appendleft(key_press) 
    217        else: 
    218            self.input_queue.append(key_press) 
    219 
    220    def feed_multiple(self, key_presses: list[KeyPress], first: bool = False) -> None: 
    221        """ 
    222        :param first: If true, insert before everything else. 
    223        """ 
    224        if first: 
    225            self.input_queue.extendleft(reversed(key_presses)) 
    226        else: 
    227            self.input_queue.extend(key_presses) 
    228 
    229    def process_keys(self) -> None: 
    230        """ 
    231        Process all the keys in the `input_queue`. 
    232        (To be called after `feed`.) 
    233 
    234        Note: because of the `feed`/`process_keys` separation, it is 
    235              possible to call `feed` from inside a key binding. 
    236              This function keeps looping until the queue is empty. 
    237        """ 
    238        app = get_app() 
    239 
    240        def not_empty() -> bool: 
    241            # When the application result is set, stop processing keys.  (E.g. 
    242            # if ENTER was received, followed by a few additional key strokes, 
    243            # leave the other keys in the queue.) 
    244            if app.is_done: 
    245                # But if there are still CPRResponse keys in the queue, these 
    246                # need to be processed. 
    247                return any(k for k in self.input_queue if k.key == Keys.CPRResponse) 
    248            else: 
    249                return bool(self.input_queue) 
    250 
    251        def get_next() -> KeyPress: 
    252            if app.is_done: 
    253                # Only process CPR responses. Everything else is typeahead. 
    254                cpr = [k for k in self.input_queue if k.key == Keys.CPRResponse][0] 
    255                self.input_queue.remove(cpr) 
    256                return cpr 
    257            else: 
    258                return self.input_queue.popleft() 
    259 
    260        is_flush = False 
    261 
    262        while not_empty(): 
    263            # Process next key. 
    264            key_press = get_next() 
    265 
    266            is_flush = key_press is _Flush 
    267            is_cpr = key_press.key == Keys.CPRResponse 
    268 
    269            if not is_flush and not is_cpr: 
    270                self.before_key_press.fire() 
    271 
    272            try: 
    273                self._process_coroutine.send(key_press) 
    274            except Exception: 
    275                # If for some reason something goes wrong in the parser, (maybe 
    276                # an exception was raised) restart the processor for next time. 
    277                self.reset() 
    278                self.empty_queue() 
    279                raise 
    280 
    281            if not is_flush and not is_cpr: 
    282                self.after_key_press.fire() 
    283 
    284        # Skip timeout if the last key was flush. 
    285        if not is_flush: 
    286            self._start_timeout() 
    287 
    288    def empty_queue(self) -> list[KeyPress]: 
    289        """ 
    290        Empty the input queue. Return the unprocessed input. 
    291        """ 
    292        key_presses = list(self.input_queue) 
    293        self.input_queue.clear() 
    294 
    295        # Filter out CPRs. We don't want to return these. 
    296        key_presses = [k for k in key_presses if k.key != Keys.CPRResponse] 
    297        return key_presses 
    298 
    299    def _call_handler(self, handler: Binding, key_sequence: list[KeyPress]) -> None: 
    300        app = get_app() 
    301        was_recording_emacs = app.emacs_state.is_recording 
    302        was_recording_vi = bool(app.vi_state.recording_register) 
    303        was_temporary_navigation_mode = app.vi_state.temporary_navigation_mode 
    304        arg = self.arg 
    305        self.arg = None 
    306 
    307        event = KeyPressEvent( 
    308            weakref.ref(self), 
    309            arg=arg, 
    310            key_sequence=key_sequence, 
    311            previous_key_sequence=self._previous_key_sequence, 
    312            is_repeat=(handler == self._previous_handler), 
    313        ) 
    314 
    315        # Save the state of the current buffer. 
    316        if handler.save_before(event): 
    317            event.app.current_buffer.save_to_undo_stack() 
    318 
    319        # Call handler. 
    320        from prompt_toolkit.buffer import EditReadOnlyBuffer 
    321 
    322        try: 
    323            handler.call(event) 
    324            self._fix_vi_cursor_position(event) 
    325 
    326        except EditReadOnlyBuffer: 
    327            # When a key binding does an attempt to change a buffer which is 
    328            # read-only, we can ignore that. We sound a bell and go on. 
    329            app.output.bell() 
    330 
    331        if was_temporary_navigation_mode: 
    332            self._leave_vi_temp_navigation_mode(event) 
    333 
    334        self._previous_key_sequence = key_sequence 
    335        self._previous_handler = handler 
    336 
    337        # Record the key sequence in our macro. (Only if we're in macro mode 
    338        # before and after executing the key.) 
    339        if handler.record_in_macro(): 
    340            if app.emacs_state.is_recording and was_recording_emacs: 
    341                recording = app.emacs_state.current_recording 
    342                if recording is not None:  # Should always be true, given that 
    343                    # `was_recording_emacs` is set. 
    344                    recording.extend(key_sequence) 
    345 
    346            if app.vi_state.recording_register and was_recording_vi: 
    347                for k in key_sequence: 
    348                    app.vi_state.current_recording += k.data 
    349 
    350    def _fix_vi_cursor_position(self, event: KeyPressEvent) -> None: 
    351        """ 
    352        After every command, make sure that if we are in Vi navigation mode, we 
    353        never put the cursor after the last character of a line. (Unless it's 
    354        an empty line.) 
    355        """ 
    356        app = event.app 
    357        buff = app.current_buffer 
    358        preferred_column = buff.preferred_column 
    359 
    360        if ( 
    361            vi_navigation_mode() 
    362            and buff.document.is_cursor_at_the_end_of_line 
    363            and len(buff.document.current_line) > 0 
    364        ): 
    365            buff.cursor_position -= 1 
    366 
    367            # Set the preferred_column for arrow up/down again. 
    368            # (This was cleared after changing the cursor position.) 
    369            buff.preferred_column = preferred_column 
    370 
    371    def _leave_vi_temp_navigation_mode(self, event: KeyPressEvent) -> None: 
    372        """ 
    373        If we're in Vi temporary navigation (normal) mode, return to 
    374        insert/replace mode after executing one action. 
    375        """ 
    376        app = event.app 
    377 
    378        if app.editing_mode == EditingMode.VI: 
    379            # Not waiting for a text object and no argument has been given. 
    380            if app.vi_state.operator_func is None and self.arg is None: 
    381                app.vi_state.temporary_navigation_mode = False 
    382 
    383    def _start_timeout(self) -> None: 
    384        """ 
    385        Start auto flush timeout. Similar to Vim's `timeoutlen` option. 
    386 
    387        Start a background coroutine with a timer. When this timeout expires 
    388        and no key was pressed in the meantime, we flush all data in the queue 
    389        and call the appropriate key binding handlers. 
    390        """ 
    391        app = get_app() 
    392        timeout = app.timeoutlen 
    393 
    394        if timeout is None: 
    395            return 
    396 
    397        async def wait() -> None: 
    398            "Wait for timeout." 
    399            # This sleep can be cancelled. In that case we don't flush. 
    400            await sleep(timeout) 
    401 
    402            if len(self.key_buffer) > 0: 
    403                # (No keys pressed in the meantime.) 
    404                flush_keys() 
    405 
    406        def flush_keys() -> None: 
    407            "Flush keys." 
    408            self.feed(_Flush) 
    409            self.process_keys() 
    410 
    411        # Automatically flush keys. 
    412        if self._flush_wait_task: 
    413            self._flush_wait_task.cancel() 
    414        self._flush_wait_task = app.create_background_task(wait()) 
    415 
    416    def send_sigint(self) -> None: 
    417        """ 
    418        Send SIGINT. Immediately call the SIGINT key handler. 
    419        """ 
    420        self.feed(KeyPress(key=Keys.SIGINT), first=True) 
    421        self.process_keys() 
    422 
    423 
    424class KeyPressEvent: 
    425    """ 
    426    Key press event, delivered to key bindings. 
    427 
    428    :param key_processor_ref: Weak reference to the `KeyProcessor`. 
    429    :param arg: Repetition argument. 
    430    :param key_sequence: List of `KeyPress` instances. 
    431    :param previouskey_sequence: Previous list of `KeyPress` instances. 
    432    :param is_repeat: True when the previous event was delivered to the same handler. 
    433    """ 
    434 
    435    def __init__( 
    436        self, 
    437        key_processor_ref: weakref.ReferenceType[KeyProcessor], 
    438        arg: str | None, 
    439        key_sequence: list[KeyPress], 
    440        previous_key_sequence: list[KeyPress], 
    441        is_repeat: bool, 
    442    ) -> None: 
    443        self._key_processor_ref = key_processor_ref 
    444        self.key_sequence = key_sequence 
    445        self.previous_key_sequence = previous_key_sequence 
    446 
    447        #: True when the previous key sequence was handled by the same handler. 
    448        self.is_repeat = is_repeat 
    449 
    450        self._arg = arg 
    451        self._app = get_app() 
    452 
    453    def __repr__(self) -> str: 
    454        return f"KeyPressEvent(arg={self.arg!r}, key_sequence={self.key_sequence!r}, is_repeat={self.is_repeat!r})" 
    455 
    456    @property 
    457    def data(self) -> str: 
    458        return self.key_sequence[-1].data 
    459 
    460    @property 
    461    def key_processor(self) -> KeyProcessor: 
    462        processor = self._key_processor_ref() 
    463        if processor is None: 
    464            raise Exception("KeyProcessor was lost. This should not happen.") 
    465        return processor 
    466 
    467    @property 
    468    def app(self) -> Application[Any]: 
    469        """ 
    470        The current `Application` object. 
    471        """ 
    472        return self._app 
    473 
    474    @property 
    475    def current_buffer(self) -> Buffer: 
    476        """ 
    477        The current buffer. 
    478        """ 
    479        return self.app.current_buffer 
    480 
    481    @property 
    482    def arg(self) -> int: 
    483        """ 
    484        Repetition argument. 
    485        """ 
    486        if self._arg == "-": 
    487            return -1 
    488 
    489        result = int(self._arg or 1) 
    490 
    491        # Don't exceed a million. 
    492        if int(result) >= 1000000: 
    493            result = 1 
    494 
    495        return result 
    496 
    497    @property 
    498    def arg_present(self) -> bool: 
    499        """ 
    500        True if repetition argument was explicitly provided. 
    501        """ 
    502        return self._arg is not None 
    503 
    504    def append_to_arg_count(self, data: str) -> None: 
    505        """ 
    506        Add digit to the input argument. 
    507 
    508        :param data: the typed digit as string 
    509        """ 
    510        assert data in "-0123456789" 
    511        current = self._arg 
    512 
    513        if data == "-": 
    514            assert current is None or current == "-" 
    515            result = data 
    516        elif current is None: 
    517            result = data 
    518        else: 
    519            result = f"{current}{data}" 
    520 
    521        self.key_processor.arg = result 
    522 
    523    @property 
    524    def cli(self) -> Application[Any]: 
    525        "For backward-compatibility." 
    526        return self.app