1"""
2Parser for VT100 input stream.
3"""
4
5from __future__ import annotations
6
7import re
8from typing import Callable, Dict, Generator
9
10from ..key_binding.key_processor import KeyPress
11from ..keys import Keys
12from .ansi_escape_sequences import ANSI_SEQUENCES
13
14__all__ = [
15 "Vt100Parser",
16]
17
18
19# Regex matching any CPR response
20# (Note that we use '\Z' instead of '$', because '$' could include a trailing
21# newline.)
22_cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z")
23
24# Mouse events:
25# Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M"
26_mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z")
27
28# Regex matching any valid prefix of a CPR response.
29# (Note that it doesn't contain the last character, the 'R'. The prefix has to
30# be shorter.)
31_cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z")
32
33_mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z")
34
35
36class _Flush:
37 """Helper object to indicate flush operation to the parser."""
38
39 pass
40
41
42class _IsPrefixOfLongerMatchCache(Dict[str, bool]):
43 """
44 Dictionary that maps input sequences to a boolean indicating whether there is
45 any key that start with this characters.
46 """
47
48 def __missing__(self, prefix: str) -> bool:
49 # (hard coded) If this could be a prefix of a CPR response, return
50 # True.
51 if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match(
52 prefix
53 ):
54 result = True
55 else:
56 # If this could be a prefix of anything else, also return True.
57 result = any(
58 v
59 for k, v in ANSI_SEQUENCES.items()
60 if k.startswith(prefix) and k != prefix
61 )
62
63 self[prefix] = result
64 return result
65
66
67_IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache()
68
69
70class Vt100Parser:
71 """
72 Parser for VT100 input stream.
73 Data can be fed through the `feed` method and the given callback will be
74 called with KeyPress objects.
75
76 ::
77
78 def callback(key):
79 pass
80 i = Vt100Parser(callback)
81 i.feed('data\x01...')
82
83 :attr feed_key_callback: Function that will be called when a key is parsed.
84 """
85
86 # Lookup table of ANSI escape sequences for a VT100 terminal
87 # Hint: in order to know what sequences your terminal writes to stdin, run
88 # "od -c" and start typing.
89 def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None:
90 self.feed_key_callback = feed_key_callback
91 self.reset()
92
93 def reset(self, request: bool = False) -> None:
94 self._in_bracketed_paste = False
95 self._start_parser()
96
97 def _start_parser(self) -> None:
98 """
99 Start the parser coroutine.
100 """
101 self._input_parser = self._input_parser_generator()
102 self._input_parser.send(None) # type: ignore
103
104 def _get_match(self, prefix: str) -> None | Keys | tuple[Keys, ...]:
105 """
106 Return the key (or keys) that maps to this prefix.
107 """
108 # (hard coded) If we match a CPR response, return Keys.CPRResponse.
109 # (This one doesn't fit in the ANSI_SEQUENCES, because it contains
110 # integer variables.)
111 if _cpr_response_re.match(prefix):
112 return Keys.CPRResponse
113
114 elif _mouse_event_re.match(prefix):
115 return Keys.Vt100MouseEvent
116
117 # Otherwise, use the mappings.
118 try:
119 return ANSI_SEQUENCES[prefix]
120 except KeyError:
121 return None
122
123 def _input_parser_generator(self) -> Generator[None, str | _Flush, None]:
124 """
125 Coroutine (state machine) for the input parser.
126 """
127 prefix = ""
128 retry = False
129 flush = False
130
131 while True:
132 flush = False
133
134 if retry:
135 retry = False
136 else:
137 # Get next character.
138 c = yield
139
140 if isinstance(c, _Flush):
141 flush = True
142 else:
143 prefix += c
144
145 # If we have some data, check for matches.
146 if prefix:
147 is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix]
148 match = self._get_match(prefix)
149
150 # Exact matches found, call handlers..
151 if (flush or not is_prefix_of_longer_match) and match:
152 self._call_handler(match, prefix)
153 prefix = ""
154
155 # No exact match found.
156 elif (flush or not is_prefix_of_longer_match) and not match:
157 found = False
158 retry = True
159
160 # Loop over the input, try the longest match first and
161 # shift.
162 for i in range(len(prefix), 0, -1):
163 match = self._get_match(prefix[:i])
164 if match:
165 self._call_handler(match, prefix[:i])
166 prefix = prefix[i:]
167 found = True
168
169 if not found:
170 self._call_handler(prefix[0], prefix[0])
171 prefix = prefix[1:]
172
173 def _call_handler(
174 self, key: str | Keys | tuple[Keys, ...], insert_text: str
175 ) -> None:
176 """
177 Callback to handler.
178 """
179 if isinstance(key, tuple):
180 # Received ANSI sequence that corresponds with multiple keys
181 # (probably alt+something). Handle keys individually, but only pass
182 # data payload to first KeyPress (so that we won't insert it
183 # multiple times).
184 for i, k in enumerate(key):
185 self._call_handler(k, insert_text if i == 0 else "")
186 else:
187 if key == Keys.BracketedPaste:
188 self._in_bracketed_paste = True
189 self._paste_buffer = ""
190 else:
191 self.feed_key_callback(KeyPress(key, insert_text))
192
193 def feed(self, data: str) -> None:
194 """
195 Feed the input stream.
196
197 :param data: Input string (unicode).
198 """
199 # Handle bracketed paste. (We bypass the parser that matches all other
200 # key presses and keep reading input until we see the end mark.)
201 # This is much faster then parsing character by character.
202 if self._in_bracketed_paste:
203 self._paste_buffer += data
204 end_mark = "\x1b[201~"
205
206 if end_mark in self._paste_buffer:
207 end_index = self._paste_buffer.index(end_mark)
208
209 # Feed content to key bindings.
210 paste_content = self._paste_buffer[:end_index]
211 self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content))
212
213 # Quit bracketed paste mode and handle remaining input.
214 self._in_bracketed_paste = False
215 remaining = self._paste_buffer[end_index + len(end_mark) :]
216 self._paste_buffer = ""
217
218 self.feed(remaining)
219
220 # Handle normal input character by character.
221 else:
222 for i, c in enumerate(data):
223 if self._in_bracketed_paste:
224 # Quit loop and process from this position when the parser
225 # entered bracketed paste.
226 self.feed(data[i:])
227 break
228 else:
229 self._input_parser.send(c)
230
231 def flush(self) -> None:
232 """
233 Flush the buffer of the input stream.
234
235 This will allow us to handle the escape key (or maybe meta) sooner.
236 The input received by the escape key is actually the same as the first
237 characters of e.g. Arrow-Up, so without knowing what follows the escape
238 sequence, we don't know whether escape has been pressed, or whether
239 it's something else. This flush function should be called after a
240 timeout, and processes everything that's still in the buffer as-is, so
241 without assuming any characters will follow.
242 """
243 self._input_parser.send(_Flush())
244
245 def feed_and_flush(self, data: str) -> None:
246 """
247 Wrapper around ``feed`` and ``flush``.
248 """
249 self.feed(data)
250 self.flush()