Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/input/vt100_parser.py: 26%

99 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1""" 

2Parser for VT100 input stream. 

3""" 

4from __future__ import annotations 

5 

6import re 

7from typing import Callable, Dict, Generator 

8 

9from ..key_binding.key_processor import KeyPress 

10from ..keys import Keys 

11from .ansi_escape_sequences import ANSI_SEQUENCES 

12 

13__all__ = [ 

14 "Vt100Parser", 

15] 

16 

17 

18# Regex matching any CPR response 

19# (Note that we use '\Z' instead of '$', because '$' could include a trailing 

20# newline.) 

21_cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z") 

22 

23# Mouse events: 

24# Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M" 

25_mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z") 

26 

27# Regex matching any valid prefix of a CPR response. 

28# (Note that it doesn't contain the last character, the 'R'. The prefix has to 

29# be shorter.) 

30_cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z") 

31 

32_mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z") 

33 

34 

35class _Flush: 

36 """Helper object to indicate flush operation to the parser.""" 

37 

38 pass 

39 

40 

41class _IsPrefixOfLongerMatchCache(Dict[str, bool]): 

42 """ 

43 Dictionary that maps input sequences to a boolean indicating whether there is 

44 any key that start with this characters. 

45 """ 

46 

47 def __missing__(self, prefix: str) -> bool: 

48 # (hard coded) If this could be a prefix of a CPR response, return 

49 # True. 

50 if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match( 

51 prefix 

52 ): 

53 result = True 

54 else: 

55 # If this could be a prefix of anything else, also return True. 

56 result = any( 

57 v 

58 for k, v in ANSI_SEQUENCES.items() 

59 if k.startswith(prefix) and k != prefix 

60 ) 

61 

62 self[prefix] = result 

63 return result 

64 

65 

66_IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache() 

67 

68 

69class Vt100Parser: 

70 """ 

71 Parser for VT100 input stream. 

72 Data can be fed through the `feed` method and the given callback will be 

73 called with KeyPress objects. 

74 

75 :: 

76 

77 def callback(key): 

78 pass 

79 i = Vt100Parser(callback) 

80 i.feed('data\x01...') 

81 

82 :attr feed_key_callback: Function that will be called when a key is parsed. 

83 """ 

84 

85 # Lookup table of ANSI escape sequences for a VT100 terminal 

86 # Hint: in order to know what sequences your terminal writes to stdin, run 

87 # "od -c" and start typing. 

88 def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None: 

89 self.feed_key_callback = feed_key_callback 

90 self.reset() 

91 

92 def reset(self, request: bool = False) -> None: 

93 self._in_bracketed_paste = False 

94 self._start_parser() 

95 

96 def _start_parser(self) -> None: 

97 """ 

98 Start the parser coroutine. 

99 """ 

100 self._input_parser = self._input_parser_generator() 

101 self._input_parser.send(None) # type: ignore 

102 

103 def _get_match(self, prefix: str) -> None | Keys | tuple[Keys, ...]: 

104 """ 

105 Return the key (or keys) that maps to this prefix. 

106 """ 

107 # (hard coded) If we match a CPR response, return Keys.CPRResponse. 

108 # (This one doesn't fit in the ANSI_SEQUENCES, because it contains 

109 # integer variables.) 

110 if _cpr_response_re.match(prefix): 

111 return Keys.CPRResponse 

112 

113 elif _mouse_event_re.match(prefix): 

114 return Keys.Vt100MouseEvent 

115 

116 # Otherwise, use the mappings. 

117 try: 

118 return ANSI_SEQUENCES[prefix] 

119 except KeyError: 

120 return None 

121 

122 def _input_parser_generator(self) -> Generator[None, str | _Flush, None]: 

123 """ 

124 Coroutine (state machine) for the input parser. 

125 """ 

126 prefix = "" 

127 retry = False 

128 flush = False 

129 

130 while True: 

131 flush = False 

132 

133 if retry: 

134 retry = False 

135 else: 

136 # Get next character. 

137 c = yield 

138 

139 if isinstance(c, _Flush): 

140 flush = True 

141 else: 

142 prefix += c 

143 

144 # If we have some data, check for matches. 

145 if prefix: 

146 is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix] 

147 match = self._get_match(prefix) 

148 

149 # Exact matches found, call handlers.. 

150 if (flush or not is_prefix_of_longer_match) and match: 

151 self._call_handler(match, prefix) 

152 prefix = "" 

153 

154 # No exact match found. 

155 elif (flush or not is_prefix_of_longer_match) and not match: 

156 found = False 

157 retry = True 

158 

159 # Loop over the input, try the longest match first and 

160 # shift. 

161 for i in range(len(prefix), 0, -1): 

162 match = self._get_match(prefix[:i]) 

163 if match: 

164 self._call_handler(match, prefix[:i]) 

165 prefix = prefix[i:] 

166 found = True 

167 

168 if not found: 

169 self._call_handler(prefix[0], prefix[0]) 

170 prefix = prefix[1:] 

171 

172 def _call_handler( 

173 self, key: str | Keys | tuple[Keys, ...], insert_text: str 

174 ) -> None: 

175 """ 

176 Callback to handler. 

177 """ 

178 if isinstance(key, tuple): 

179 # Received ANSI sequence that corresponds with multiple keys 

180 # (probably alt+something). Handle keys individually, but only pass 

181 # data payload to first KeyPress (so that we won't insert it 

182 # multiple times). 

183 for i, k in enumerate(key): 

184 self._call_handler(k, insert_text if i == 0 else "") 

185 else: 

186 if key == Keys.BracketedPaste: 

187 self._in_bracketed_paste = True 

188 self._paste_buffer = "" 

189 else: 

190 self.feed_key_callback(KeyPress(key, insert_text)) 

191 

192 def feed(self, data: str) -> None: 

193 """ 

194 Feed the input stream. 

195 

196 :param data: Input string (unicode). 

197 """ 

198 # Handle bracketed paste. (We bypass the parser that matches all other 

199 # key presses and keep reading input until we see the end mark.) 

200 # This is much faster then parsing character by character. 

201 if self._in_bracketed_paste: 

202 self._paste_buffer += data 

203 end_mark = "\x1b[201~" 

204 

205 if end_mark in self._paste_buffer: 

206 end_index = self._paste_buffer.index(end_mark) 

207 

208 # Feed content to key bindings. 

209 paste_content = self._paste_buffer[:end_index] 

210 self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content)) 

211 

212 # Quit bracketed paste mode and handle remaining input. 

213 self._in_bracketed_paste = False 

214 remaining = self._paste_buffer[end_index + len(end_mark) :] 

215 self._paste_buffer = "" 

216 

217 self.feed(remaining) 

218 

219 # Handle normal input character by character. 

220 else: 

221 for i, c in enumerate(data): 

222 if self._in_bracketed_paste: 

223 # Quit loop and process from this position when the parser 

224 # entered bracketed paste. 

225 self.feed(data[i:]) 

226 break 

227 else: 

228 self._input_parser.send(c) 

229 

230 def flush(self) -> None: 

231 """ 

232 Flush the buffer of the input stream. 

233 

234 This will allow us to handle the escape key (or maybe meta) sooner. 

235 The input received by the escape key is actually the same as the first 

236 characters of e.g. Arrow-Up, so without knowing what follows the escape 

237 sequence, we don't know whether escape has been pressed, or whether 

238 it's something else. This flush function should be called after a 

239 timeout, and processes everything that's still in the buffer as-is, so 

240 without assuming any characters will follow. 

241 """ 

242 self._input_parser.send(_Flush()) 

243 

244 def feed_and_flush(self, data: str) -> None: 

245 """ 

246 Wrapper around ``feed`` and ``flush``. 

247 """ 

248 self.feed(data) 

249 self.flush()