Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/input/vt100_parser.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

101 statements  

1""" 

2Parser for VT100 input stream. 

3""" 

4 

5from __future__ import annotations 

6 

7import re 

8from typing import Callable, Dict, Generator 

9 

10from ..key_binding.key_processor import KeyPress 

11from ..keys import Keys 

12from .ansi_escape_sequences import ANSI_SEQUENCES 

13 

14__all__ = [ 

15 "Vt100Parser", 

16] 

17 

18 

19# Regex matching any CPR response 

20# (Note that we use '\Z' instead of '$', because '$' could include a trailing 

21# newline.) 

22_cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z") 

23 

24# Mouse events: 

25# Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M" 

26_mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z") 

27 

28# Regex matching any valid prefix of a CPR response. 

29# (Note that it doesn't contain the last character, the 'R'. The prefix has to 

30# be shorter.) 

31_cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z") 

32 

33_mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z") 

34 

35 

36class _Flush: 

37 """Helper object to indicate flush operation to the parser.""" 

38 

39 pass 

40 

41 

42class _IsPrefixOfLongerMatchCache(Dict[str, bool]): 

43 """ 

44 Dictionary that maps input sequences to a boolean indicating whether there is 

45 any key that start with this characters. 

46 """ 

47 

48 def __missing__(self, prefix: str) -> bool: 

49 # (hard coded) If this could be a prefix of a CPR response, return 

50 # True. 

51 if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match( 

52 prefix 

53 ): 

54 result = True 

55 else: 

56 # If this could be a prefix of anything else, also return True. 

57 result = any( 

58 v 

59 for k, v in ANSI_SEQUENCES.items() 

60 if k.startswith(prefix) and k != prefix 

61 ) 

62 

63 self[prefix] = result 

64 return result 

65 

66 

67_IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache() 

68 

69 

70class Vt100Parser: 

71 """ 

72 Parser for VT100 input stream. 

73 Data can be fed through the `feed` method and the given callback will be 

74 called with KeyPress objects. 

75 

76 :: 

77 

78 def callback(key): 

79 pass 

80 i = Vt100Parser(callback) 

81 i.feed('data\x01...') 

82 

83 :attr feed_key_callback: Function that will be called when a key is parsed. 

84 """ 

85 

86 # Lookup table of ANSI escape sequences for a VT100 terminal 

87 # Hint: in order to know what sequences your terminal writes to stdin, run 

88 # "od -c" and start typing. 

89 def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None: 

90 self.feed_key_callback = feed_key_callback 

91 self.reset() 

92 

93 def reset(self, request: bool = False) -> None: 

94 self._in_bracketed_paste = False 

95 self._start_parser() 

96 

97 def _start_parser(self) -> None: 

98 """ 

99 Start the parser coroutine. 

100 """ 

101 self._input_parser = self._input_parser_generator() 

102 self._input_parser.send(None) # type: ignore 

103 

104 def _get_match(self, prefix: str) -> None | Keys | tuple[Keys, ...]: 

105 """ 

106 Return the key (or keys) that maps to this prefix. 

107 """ 

108 # (hard coded) If we match a CPR response, return Keys.CPRResponse. 

109 # (This one doesn't fit in the ANSI_SEQUENCES, because it contains 

110 # integer variables.) 

111 if _cpr_response_re.match(prefix): 

112 return Keys.CPRResponse 

113 

114 elif _mouse_event_re.match(prefix): 

115 return Keys.Vt100MouseEvent 

116 

117 # Otherwise, use the mappings. 

118 try: 

119 return ANSI_SEQUENCES[prefix] 

120 except KeyError: 

121 return None 

122 

123 def _input_parser_generator(self) -> Generator[None, str | _Flush, None]: 

124 """ 

125 Coroutine (state machine) for the input parser. 

126 """ 

127 prefix = "" 

128 retry = False 

129 flush = False 

130 

131 while True: 

132 flush = False 

133 

134 if retry: 

135 retry = False 

136 else: 

137 # Get next character. 

138 c = yield 

139 

140 if isinstance(c, _Flush): 

141 flush = True 

142 else: 

143 prefix += c 

144 

145 # If we have some data, check for matches. 

146 if prefix: 

147 is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix] 

148 match = self._get_match(prefix) 

149 

150 # Exact matches found, call handlers.. 

151 if (flush or not is_prefix_of_longer_match) and match: 

152 self._call_handler(match, prefix) 

153 prefix = "" 

154 

155 # No exact match found. 

156 elif (flush or not is_prefix_of_longer_match) and not match: 

157 found = False 

158 retry = True 

159 

160 # Loop over the input, try the longest match first and 

161 # shift. 

162 for i in range(len(prefix), 0, -1): 

163 match = self._get_match(prefix[:i]) 

164 if match: 

165 self._call_handler(match, prefix[:i]) 

166 prefix = prefix[i:] 

167 found = True 

168 

169 if not found: 

170 self._call_handler(prefix[0], prefix[0]) 

171 prefix = prefix[1:] 

172 

173 def _call_handler( 

174 self, key: str | Keys | tuple[Keys, ...], insert_text: str 

175 ) -> None: 

176 """ 

177 Callback to handler. 

178 """ 

179 if isinstance(key, tuple): 

180 # Received ANSI sequence that corresponds with multiple keys 

181 # (probably alt+something). Handle keys individually, but only pass 

182 # data payload to first KeyPress (so that we won't insert it 

183 # multiple times). 

184 for i, k in enumerate(key): 

185 self._call_handler(k, insert_text if i == 0 else "") 

186 else: 

187 if key == Keys.BracketedPaste: 

188 self._in_bracketed_paste = True 

189 self._paste_buffer = "" 

190 else: 

191 self.feed_key_callback(KeyPress(key, insert_text)) 

192 

193 def feed(self, data: str) -> None: 

194 """ 

195 Feed the input stream. 

196 

197 :param data: Input string (unicode). 

198 """ 

199 # Handle bracketed paste. (We bypass the parser that matches all other 

200 # key presses and keep reading input until we see the end mark.) 

201 # This is much faster then parsing character by character. 

202 if self._in_bracketed_paste: 

203 self._paste_buffer += data 

204 end_mark = "\x1b[201~" 

205 

206 if end_mark in self._paste_buffer: 

207 end_index = self._paste_buffer.index(end_mark) 

208 

209 # Feed content to key bindings. 

210 paste_content = self._paste_buffer[:end_index] 

211 self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content)) 

212 

213 # Quit bracketed paste mode and handle remaining input. 

214 self._in_bracketed_paste = False 

215 remaining = self._paste_buffer[end_index + len(end_mark) :] 

216 self._paste_buffer = "" 

217 

218 self.feed(remaining) 

219 

220 # Handle normal input character by character. 

221 else: 

222 for i, c in enumerate(data): 

223 if self._in_bracketed_paste: 

224 # Quit loop and process from this position when the parser 

225 # entered bracketed paste. 

226 self.feed(data[i:]) 

227 break 

228 else: 

229 self._input_parser.send(c) 

230 

231 def flush(self) -> None: 

232 """ 

233 Flush the buffer of the input stream. 

234 

235 This will allow us to handle the escape key (or maybe meta) sooner. 

236 The input received by the escape key is actually the same as the first 

237 characters of e.g. Arrow-Up, so without knowing what follows the escape 

238 sequence, we don't know whether escape has been pressed, or whether 

239 it's something else. This flush function should be called after a 

240 timeout, and processes everything that's still in the buffer as-is, so 

241 without assuming any characters will follow. 

242 """ 

243 self._input_parser.send(_Flush()) 

244 

245 def feed_and_flush(self, data: str) -> None: 

246 """ 

247 Wrapper around ``feed`` and ``flush``. 

248 """ 

249 self.feed(data) 

250 self.flush()