1""" 
    2Parser for VT100 input stream. 
    3""" 
    4 
    5from __future__ import annotations 
    6 
    7import re 
    8from typing import Callable, Dict, Generator 
    9 
    10from ..key_binding.key_processor import KeyPress 
    11from ..keys import Keys 
    12from .ansi_escape_sequences import ANSI_SEQUENCES 
    13 
    14__all__ = [ 
    15    "Vt100Parser", 
    16] 
    17 
    18 
    19# Regex matching any CPR response 
    20# (Note that we use '\Z' instead of '$', because '$' could include a trailing 
    21# newline.) 
    22_cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z") 
    23 
    24# Mouse events: 
    25# Typical: "Esc[MaB*"  Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M" 
    26_mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z") 
    27 
    28# Regex matching any valid prefix of a CPR response. 
    29# (Note that it doesn't contain the last character, the 'R'. The prefix has to 
    30# be shorter.) 
    31_cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z") 
    32 
    33_mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z") 
    34 
    35 
    36class _Flush: 
    37    """Helper object to indicate flush operation to the parser.""" 
    38 
    39    pass 
    40 
    41 
    42class _IsPrefixOfLongerMatchCache(Dict[str, bool]): 
    43    """ 
    44    Dictionary that maps input sequences to a boolean indicating whether there is 
    45    any key that start with this characters. 
    46    """ 
    47 
    48    def __missing__(self, prefix: str) -> bool: 
    49        # (hard coded) If this could be a prefix of a CPR response, return 
    50        # True. 
    51        if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match( 
    52            prefix 
    53        ): 
    54            result = True 
    55        else: 
    56            # If this could be a prefix of anything else, also return True. 
    57            result = any( 
    58                v 
    59                for k, v in ANSI_SEQUENCES.items() 
    60                if k.startswith(prefix) and k != prefix 
    61            ) 
    62 
    63        self[prefix] = result 
    64        return result 
    65 
    66 
    67_IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache() 
    68 
    69 
    70class Vt100Parser: 
    71    """ 
    72    Parser for VT100 input stream. 
    73    Data can be fed through the `feed` method and the given callback will be 
    74    called with KeyPress objects. 
    75 
    76    :: 
    77 
    78        def callback(key): 
    79            pass 
    80        i = Vt100Parser(callback) 
    81        i.feed('data\x01...') 
    82 
    83    :attr feed_key_callback: Function that will be called when a key is parsed. 
    84    """ 
    85 
    86    # Lookup table of ANSI escape sequences for a VT100 terminal 
    87    # Hint: in order to know what sequences your terminal writes to stdin, run 
    88    #       "od -c" and start typing. 
    89    def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None: 
    90        self.feed_key_callback = feed_key_callback 
    91        self.reset() 
    92 
    93    def reset(self, request: bool = False) -> None: 
    94        self._in_bracketed_paste = False 
    95        self._start_parser() 
    96 
    97    def _start_parser(self) -> None: 
    98        """ 
    99        Start the parser coroutine. 
    100        """ 
    101        self._input_parser = self._input_parser_generator() 
    102        self._input_parser.send(None)  # type: ignore 
    103 
    104    def _get_match(self, prefix: str) -> None | Keys | tuple[Keys, ...]: 
    105        """ 
    106        Return the key (or keys) that maps to this prefix. 
    107        """ 
    108        # (hard coded) If we match a CPR response, return Keys.CPRResponse. 
    109        # (This one doesn't fit in the ANSI_SEQUENCES, because it contains 
    110        # integer variables.) 
    111        if _cpr_response_re.match(prefix): 
    112            return Keys.CPRResponse 
    113 
    114        elif _mouse_event_re.match(prefix): 
    115            return Keys.Vt100MouseEvent 
    116 
    117        # Otherwise, use the mappings. 
    118        try: 
    119            return ANSI_SEQUENCES[prefix] 
    120        except KeyError: 
    121            return None 
    122 
    123    def _input_parser_generator(self) -> Generator[None, str | _Flush, None]: 
    124        """ 
    125        Coroutine (state machine) for the input parser. 
    126        """ 
    127        prefix = "" 
    128        retry = False 
    129        flush = False 
    130 
    131        while True: 
    132            flush = False 
    133 
    134            if retry: 
    135                retry = False 
    136            else: 
    137                # Get next character. 
    138                c = yield 
    139 
    140                if isinstance(c, _Flush): 
    141                    flush = True 
    142                else: 
    143                    prefix += c 
    144 
    145            # If we have some data, check for matches. 
    146            if prefix: 
    147                is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix] 
    148                match = self._get_match(prefix) 
    149 
    150                # Exact matches found, call handlers.. 
    151                if (flush or not is_prefix_of_longer_match) and match: 
    152                    self._call_handler(match, prefix) 
    153                    prefix = "" 
    154 
    155                # No exact match found. 
    156                elif (flush or not is_prefix_of_longer_match) and not match: 
    157                    found = False 
    158                    retry = True 
    159 
    160                    # Loop over the input, try the longest match first and 
    161                    # shift. 
    162                    for i in range(len(prefix), 0, -1): 
    163                        match = self._get_match(prefix[:i]) 
    164                        if match: 
    165                            self._call_handler(match, prefix[:i]) 
    166                            prefix = prefix[i:] 
    167                            found = True 
    168 
    169                    if not found: 
    170                        self._call_handler(prefix[0], prefix[0]) 
    171                        prefix = prefix[1:] 
    172 
    173    def _call_handler( 
    174        self, key: str | Keys | tuple[Keys, ...], insert_text: str 
    175    ) -> None: 
    176        """ 
    177        Callback to handler. 
    178        """ 
    179        if isinstance(key, tuple): 
    180            # Received ANSI sequence that corresponds with multiple keys 
    181            # (probably alt+something). Handle keys individually, but only pass 
    182            # data payload to first KeyPress (so that we won't insert it 
    183            # multiple times). 
    184            for i, k in enumerate(key): 
    185                self._call_handler(k, insert_text if i == 0 else "") 
    186        else: 
    187            if key == Keys.BracketedPaste: 
    188                self._in_bracketed_paste = True 
    189                self._paste_buffer = "" 
    190            else: 
    191                self.feed_key_callback(KeyPress(key, insert_text)) 
    192 
    193    def feed(self, data: str) -> None: 
    194        """ 
    195        Feed the input stream. 
    196 
    197        :param data: Input string (unicode). 
    198        """ 
    199        # Handle bracketed paste. (We bypass the parser that matches all other 
    200        # key presses and keep reading input until we see the end mark.) 
    201        # This is much faster then parsing character by character. 
    202        if self._in_bracketed_paste: 
    203            self._paste_buffer += data 
    204            end_mark = "\x1b[201~" 
    205 
    206            if end_mark in self._paste_buffer: 
    207                end_index = self._paste_buffer.index(end_mark) 
    208 
    209                # Feed content to key bindings. 
    210                paste_content = self._paste_buffer[:end_index] 
    211                self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content)) 
    212 
    213                # Quit bracketed paste mode and handle remaining input. 
    214                self._in_bracketed_paste = False 
    215                remaining = self._paste_buffer[end_index + len(end_mark) :] 
    216                self._paste_buffer = "" 
    217 
    218                self.feed(remaining) 
    219 
    220        # Handle normal input character by character. 
    221        else: 
    222            for i, c in enumerate(data): 
    223                if self._in_bracketed_paste: 
    224                    # Quit loop and process from this position when the parser 
    225                    # entered bracketed paste. 
    226                    self.feed(data[i:]) 
    227                    break 
    228                else: 
    229                    self._input_parser.send(c) 
    230 
    231    def flush(self) -> None: 
    232        """ 
    233        Flush the buffer of the input stream. 
    234 
    235        This will allow us to handle the escape key (or maybe meta) sooner. 
    236        The input received by the escape key is actually the same as the first 
    237        characters of e.g. Arrow-Up, so without knowing what follows the escape 
    238        sequence, we don't know whether escape has been pressed, or whether 
    239        it's something else. This flush function should be called after a 
    240        timeout, and processes everything that's still in the buffer as-is, so 
    241        without assuming any characters will follow. 
    242        """ 
    243        self._input_parser.send(_Flush()) 
    244 
    245    def feed_and_flush(self, data: str) -> None: 
    246        """ 
    247        Wrapper around ``feed`` and ``flush``. 
    248        """ 
    249        self.feed(data) 
    250        self.flush()