Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/input/vt100_parser.py: 26%
99 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1"""
2Parser for VT100 input stream.
3"""
4from __future__ import annotations
6import re
7from typing import Callable, Dict, Generator
9from ..key_binding.key_processor import KeyPress
10from ..keys import Keys
11from .ansi_escape_sequences import ANSI_SEQUENCES
13__all__ = [
14 "Vt100Parser",
15]
18# Regex matching any CPR response
19# (Note that we use '\Z' instead of '$', because '$' could include a trailing
20# newline.)
21_cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z")
23# Mouse events:
24# Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M"
25_mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z")
27# Regex matching any valid prefix of a CPR response.
28# (Note that it doesn't contain the last character, the 'R'. The prefix has to
29# be shorter.)
30_cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z")
32_mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z")
35class _Flush:
36 """Helper object to indicate flush operation to the parser."""
38 pass
41class _IsPrefixOfLongerMatchCache(Dict[str, bool]):
42 """
43 Dictionary that maps input sequences to a boolean indicating whether there is
44 any key that start with this characters.
45 """
47 def __missing__(self, prefix: str) -> bool:
48 # (hard coded) If this could be a prefix of a CPR response, return
49 # True.
50 if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match(
51 prefix
52 ):
53 result = True
54 else:
55 # If this could be a prefix of anything else, also return True.
56 result = any(
57 v
58 for k, v in ANSI_SEQUENCES.items()
59 if k.startswith(prefix) and k != prefix
60 )
62 self[prefix] = result
63 return result
66_IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache()
69class Vt100Parser:
70 """
71 Parser for VT100 input stream.
72 Data can be fed through the `feed` method and the given callback will be
73 called with KeyPress objects.
75 ::
77 def callback(key):
78 pass
79 i = Vt100Parser(callback)
80 i.feed('data\x01...')
82 :attr feed_key_callback: Function that will be called when a key is parsed.
83 """
85 # Lookup table of ANSI escape sequences for a VT100 terminal
86 # Hint: in order to know what sequences your terminal writes to stdin, run
87 # "od -c" and start typing.
88 def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None:
89 self.feed_key_callback = feed_key_callback
90 self.reset()
92 def reset(self, request: bool = False) -> None:
93 self._in_bracketed_paste = False
94 self._start_parser()
96 def _start_parser(self) -> None:
97 """
98 Start the parser coroutine.
99 """
100 self._input_parser = self._input_parser_generator()
101 self._input_parser.send(None) # type: ignore
103 def _get_match(self, prefix: str) -> None | Keys | tuple[Keys, ...]:
104 """
105 Return the key (or keys) that maps to this prefix.
106 """
107 # (hard coded) If we match a CPR response, return Keys.CPRResponse.
108 # (This one doesn't fit in the ANSI_SEQUENCES, because it contains
109 # integer variables.)
110 if _cpr_response_re.match(prefix):
111 return Keys.CPRResponse
113 elif _mouse_event_re.match(prefix):
114 return Keys.Vt100MouseEvent
116 # Otherwise, use the mappings.
117 try:
118 return ANSI_SEQUENCES[prefix]
119 except KeyError:
120 return None
122 def _input_parser_generator(self) -> Generator[None, str | _Flush, None]:
123 """
124 Coroutine (state machine) for the input parser.
125 """
126 prefix = ""
127 retry = False
128 flush = False
130 while True:
131 flush = False
133 if retry:
134 retry = False
135 else:
136 # Get next character.
137 c = yield
139 if isinstance(c, _Flush):
140 flush = True
141 else:
142 prefix += c
144 # If we have some data, check for matches.
145 if prefix:
146 is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix]
147 match = self._get_match(prefix)
149 # Exact matches found, call handlers..
150 if (flush or not is_prefix_of_longer_match) and match:
151 self._call_handler(match, prefix)
152 prefix = ""
154 # No exact match found.
155 elif (flush or not is_prefix_of_longer_match) and not match:
156 found = False
157 retry = True
159 # Loop over the input, try the longest match first and
160 # shift.
161 for i in range(len(prefix), 0, -1):
162 match = self._get_match(prefix[:i])
163 if match:
164 self._call_handler(match, prefix[:i])
165 prefix = prefix[i:]
166 found = True
168 if not found:
169 self._call_handler(prefix[0], prefix[0])
170 prefix = prefix[1:]
172 def _call_handler(
173 self, key: str | Keys | tuple[Keys, ...], insert_text: str
174 ) -> None:
175 """
176 Callback to handler.
177 """
178 if isinstance(key, tuple):
179 # Received ANSI sequence that corresponds with multiple keys
180 # (probably alt+something). Handle keys individually, but only pass
181 # data payload to first KeyPress (so that we won't insert it
182 # multiple times).
183 for i, k in enumerate(key):
184 self._call_handler(k, insert_text if i == 0 else "")
185 else:
186 if key == Keys.BracketedPaste:
187 self._in_bracketed_paste = True
188 self._paste_buffer = ""
189 else:
190 self.feed_key_callback(KeyPress(key, insert_text))
192 def feed(self, data: str) -> None:
193 """
194 Feed the input stream.
196 :param data: Input string (unicode).
197 """
198 # Handle bracketed paste. (We bypass the parser that matches all other
199 # key presses and keep reading input until we see the end mark.)
200 # This is much faster then parsing character by character.
201 if self._in_bracketed_paste:
202 self._paste_buffer += data
203 end_mark = "\x1b[201~"
205 if end_mark in self._paste_buffer:
206 end_index = self._paste_buffer.index(end_mark)
208 # Feed content to key bindings.
209 paste_content = self._paste_buffer[:end_index]
210 self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content))
212 # Quit bracketed paste mode and handle remaining input.
213 self._in_bracketed_paste = False
214 remaining = self._paste_buffer[end_index + len(end_mark) :]
215 self._paste_buffer = ""
217 self.feed(remaining)
219 # Handle normal input character by character.
220 else:
221 for i, c in enumerate(data):
222 if self._in_bracketed_paste:
223 # Quit loop and process from this position when the parser
224 # entered bracketed paste.
225 self.feed(data[i:])
226 break
227 else:
228 self._input_parser.send(c)
230 def flush(self) -> None:
231 """
232 Flush the buffer of the input stream.
234 This will allow us to handle the escape key (or maybe meta) sooner.
235 The input received by the escape key is actually the same as the first
236 characters of e.g. Arrow-Up, so without knowing what follows the escape
237 sequence, we don't know whether escape has been pressed, or whether
238 it's something else. This flush function should be called after a
239 timeout, and processes everything that's still in the buffer as-is, so
240 without assuming any characters will follow.
241 """
242 self._input_parser.send(_Flush())
244 def feed_and_flush(self, data: str) -> None:
245 """
246 Wrapper around ``feed`` and ``flush``.
247 """
248 self.feed(data)
249 self.flush()