Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/key_binding/key_processor.py: 23%

236 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1""" 

2An :class:`~.KeyProcessor` receives callbacks for the keystrokes parsed from 

3the input in the :class:`~prompt_toolkit.inputstream.InputStream` instance. 

4 

5The `KeyProcessor` will according to the implemented keybindings call the 

6correct callbacks when new key presses are feed through `feed`. 

7""" 

8from __future__ import annotations 

9 

10import weakref 

11from asyncio import Task, sleep 

12from collections import deque 

13from typing import TYPE_CHECKING, Any, Generator 

14 

15from prompt_toolkit.application.current import get_app 

16from prompt_toolkit.enums import EditingMode 

17from prompt_toolkit.filters.app import vi_navigation_mode 

18from prompt_toolkit.keys import Keys 

19from prompt_toolkit.utils import Event 

20 

21from .key_bindings import Binding, KeyBindingsBase 

22 

23if TYPE_CHECKING: 

24 from prompt_toolkit.application import Application 

25 from prompt_toolkit.buffer import Buffer 

26 

27 

28__all__ = [ 

29 "KeyProcessor", 

30 "KeyPress", 

31 "KeyPressEvent", 

32] 

33 

34 

35class KeyPress: 

36 """ 

37 :param key: A `Keys` instance or text (one character). 

38 :param data: The received string on stdin. (Often vt100 escape codes.) 

39 """ 

40 

41 def __init__(self, key: Keys | str, data: str | None = None) -> None: 

42 assert isinstance(key, Keys) or len(key) == 1 

43 

44 if data is None: 

45 if isinstance(key, Keys): 

46 data = key.value 

47 else: 

48 data = key # 'key' is a one character string. 

49 

50 self.key = key 

51 self.data = data 

52 

53 def __repr__(self) -> str: 

54 return f"{self.__class__.__name__}(key={self.key!r}, data={self.data!r})" 

55 

56 def __eq__(self, other: object) -> bool: 

57 if not isinstance(other, KeyPress): 

58 return False 

59 return self.key == other.key and self.data == other.data 

60 

61 

62""" 

63Helper object to indicate flush operation in the KeyProcessor. 

64NOTE: the implementation is very similar to the VT100 parser. 

65""" 

66_Flush = KeyPress("?", data="_Flush") 

67 

68 

69class KeyProcessor: 

70 """ 

71 Statemachine that receives :class:`KeyPress` instances and according to the 

72 key bindings in the given :class:`KeyBindings`, calls the matching handlers. 

73 

74 :: 

75 

76 p = KeyProcessor(key_bindings) 

77 

78 # Send keys into the processor. 

79 p.feed(KeyPress(Keys.ControlX, '\x18')) 

80 p.feed(KeyPress(Keys.ControlC, '\x03') 

81 

82 # Process all the keys in the queue. 

83 p.process_keys() 

84 

85 # Now the ControlX-ControlC callback will be called if this sequence is 

86 # registered in the key bindings. 

87 

88 :param key_bindings: `KeyBindingsBase` instance. 

89 """ 

90 

91 def __init__(self, key_bindings: KeyBindingsBase) -> None: 

92 self._bindings = key_bindings 

93 

94 self.before_key_press = Event(self) 

95 self.after_key_press = Event(self) 

96 

97 self._flush_wait_task: Task[None] | None = None 

98 

99 self.reset() 

100 

101 def reset(self) -> None: 

102 self._previous_key_sequence: list[KeyPress] = [] 

103 self._previous_handler: Binding | None = None 

104 

105 # The queue of keys not yet send to our _process generator/state machine. 

106 self.input_queue: deque[KeyPress] = deque() 

107 

108 # The key buffer that is matched in the generator state machine. 

109 # (This is at at most the amount of keys that make up for one key binding.) 

110 self.key_buffer: list[KeyPress] = [] 

111 

112 #: Readline argument (for repetition of commands.) 

113 #: https://www.gnu.org/software/bash/manual/html_node/Readline-Arguments.html 

114 self.arg: str | None = None 

115 

116 # Start the processor coroutine. 

117 self._process_coroutine = self._process() 

118 self._process_coroutine.send(None) # type: ignore 

119 

120 def _get_matches(self, key_presses: list[KeyPress]) -> list[Binding]: 

121 """ 

122 For a list of :class:`KeyPress` instances. Give the matching handlers 

123 that would handle this. 

124 """ 

125 keys = tuple(k.key for k in key_presses) 

126 

127 # Try match, with mode flag 

128 return [b for b in self._bindings.get_bindings_for_keys(keys) if b.filter()] 

129 

130 def _is_prefix_of_longer_match(self, key_presses: list[KeyPress]) -> bool: 

131 """ 

132 For a list of :class:`KeyPress` instances. Return True if there is any 

133 handler that is bound to a suffix of this keys. 

134 """ 

135 keys = tuple(k.key for k in key_presses) 

136 

137 # Get the filters for all the key bindings that have a longer match. 

138 # Note that we transform it into a `set`, because we don't care about 

139 # the actual bindings and executing it more than once doesn't make 

140 # sense. (Many key bindings share the same filter.) 

141 filters = { 

142 b.filter for b in self._bindings.get_bindings_starting_with_keys(keys) 

143 } 

144 

145 # When any key binding is active, return True. 

146 return any(f() for f in filters) 

147 

148 def _process(self) -> Generator[None, KeyPress, None]: 

149 """ 

150 Coroutine implementing the key match algorithm. Key strokes are sent 

151 into this generator, and it calls the appropriate handlers. 

152 """ 

153 buffer = self.key_buffer 

154 retry = False 

155 

156 while True: 

157 flush = False 

158 

159 if retry: 

160 retry = False 

161 else: 

162 key = yield 

163 if key is _Flush: 

164 flush = True 

165 else: 

166 buffer.append(key) 

167 

168 # If we have some key presses, check for matches. 

169 if buffer: 

170 matches = self._get_matches(buffer) 

171 

172 if flush: 

173 is_prefix_of_longer_match = False 

174 else: 

175 is_prefix_of_longer_match = self._is_prefix_of_longer_match(buffer) 

176 

177 # When eager matches were found, give priority to them and also 

178 # ignore all the longer matches. 

179 eager_matches = [m for m in matches if m.eager()] 

180 

181 if eager_matches: 

182 matches = eager_matches 

183 is_prefix_of_longer_match = False 

184 

185 # Exact matches found, call handler. 

186 if not is_prefix_of_longer_match and matches: 

187 self._call_handler(matches[-1], key_sequence=buffer[:]) 

188 del buffer[:] # Keep reference. 

189 

190 # No match found. 

191 elif not is_prefix_of_longer_match and not matches: 

192 retry = True 

193 found = False 

194 

195 # Loop over the input, try longest match first and shift. 

196 for i in range(len(buffer), 0, -1): 

197 matches = self._get_matches(buffer[:i]) 

198 if matches: 

199 self._call_handler(matches[-1], key_sequence=buffer[:i]) 

200 del buffer[:i] 

201 found = True 

202 break 

203 

204 if not found: 

205 del buffer[:1] 

206 

207 def feed(self, key_press: KeyPress, first: bool = False) -> None: 

208 """ 

209 Add a new :class:`KeyPress` to the input queue. 

210 (Don't forget to call `process_keys` in order to process the queue.) 

211 

212 :param first: If true, insert before everything else. 

213 """ 

214 if first: 

215 self.input_queue.appendleft(key_press) 

216 else: 

217 self.input_queue.append(key_press) 

218 

219 def feed_multiple(self, key_presses: list[KeyPress], first: bool = False) -> None: 

220 """ 

221 :param first: If true, insert before everything else. 

222 """ 

223 if first: 

224 self.input_queue.extendleft(reversed(key_presses)) 

225 else: 

226 self.input_queue.extend(key_presses) 

227 

228 def process_keys(self) -> None: 

229 """ 

230 Process all the keys in the `input_queue`. 

231 (To be called after `feed`.) 

232 

233 Note: because of the `feed`/`process_keys` separation, it is 

234 possible to call `feed` from inside a key binding. 

235 This function keeps looping until the queue is empty. 

236 """ 

237 app = get_app() 

238 

239 def not_empty() -> bool: 

240 # When the application result is set, stop processing keys. (E.g. 

241 # if ENTER was received, followed by a few additional key strokes, 

242 # leave the other keys in the queue.) 

243 if app.is_done: 

244 # But if there are still CPRResponse keys in the queue, these 

245 # need to be processed. 

246 return any(k for k in self.input_queue if k.key == Keys.CPRResponse) 

247 else: 

248 return bool(self.input_queue) 

249 

250 def get_next() -> KeyPress: 

251 if app.is_done: 

252 # Only process CPR responses. Everything else is typeahead. 

253 cpr = [k for k in self.input_queue if k.key == Keys.CPRResponse][0] 

254 self.input_queue.remove(cpr) 

255 return cpr 

256 else: 

257 return self.input_queue.popleft() 

258 

259 is_flush = False 

260 

261 while not_empty(): 

262 # Process next key. 

263 key_press = get_next() 

264 

265 is_flush = key_press is _Flush 

266 is_cpr = key_press.key == Keys.CPRResponse 

267 

268 if not is_flush and not is_cpr: 

269 self.before_key_press.fire() 

270 

271 try: 

272 self._process_coroutine.send(key_press) 

273 except Exception: 

274 # If for some reason something goes wrong in the parser, (maybe 

275 # an exception was raised) restart the processor for next time. 

276 self.reset() 

277 self.empty_queue() 

278 raise 

279 

280 if not is_flush and not is_cpr: 

281 self.after_key_press.fire() 

282 

283 # Skip timeout if the last key was flush. 

284 if not is_flush: 

285 self._start_timeout() 

286 

287 def empty_queue(self) -> list[KeyPress]: 

288 """ 

289 Empty the input queue. Return the unprocessed input. 

290 """ 

291 key_presses = list(self.input_queue) 

292 self.input_queue.clear() 

293 

294 # Filter out CPRs. We don't want to return these. 

295 key_presses = [k for k in key_presses if k.key != Keys.CPRResponse] 

296 return key_presses 

297 

298 def _call_handler(self, handler: Binding, key_sequence: list[KeyPress]) -> None: 

299 app = get_app() 

300 was_recording_emacs = app.emacs_state.is_recording 

301 was_recording_vi = bool(app.vi_state.recording_register) 

302 was_temporary_navigation_mode = app.vi_state.temporary_navigation_mode 

303 arg = self.arg 

304 self.arg = None 

305 

306 event = KeyPressEvent( 

307 weakref.ref(self), 

308 arg=arg, 

309 key_sequence=key_sequence, 

310 previous_key_sequence=self._previous_key_sequence, 

311 is_repeat=(handler == self._previous_handler), 

312 ) 

313 

314 # Save the state of the current buffer. 

315 if handler.save_before(event): 

316 event.app.current_buffer.save_to_undo_stack() 

317 

318 # Call handler. 

319 from prompt_toolkit.buffer import EditReadOnlyBuffer 

320 

321 try: 

322 handler.call(event) 

323 self._fix_vi_cursor_position(event) 

324 

325 except EditReadOnlyBuffer: 

326 # When a key binding does an attempt to change a buffer which is 

327 # read-only, we can ignore that. We sound a bell and go on. 

328 app.output.bell() 

329 

330 if was_temporary_navigation_mode: 

331 self._leave_vi_temp_navigation_mode(event) 

332 

333 self._previous_key_sequence = key_sequence 

334 self._previous_handler = handler 

335 

336 # Record the key sequence in our macro. (Only if we're in macro mode 

337 # before and after executing the key.) 

338 if handler.record_in_macro(): 

339 if app.emacs_state.is_recording and was_recording_emacs: 

340 recording = app.emacs_state.current_recording 

341 if recording is not None: # Should always be true, given that 

342 # `was_recording_emacs` is set. 

343 recording.extend(key_sequence) 

344 

345 if app.vi_state.recording_register and was_recording_vi: 

346 for k in key_sequence: 

347 app.vi_state.current_recording += k.data 

348 

349 def _fix_vi_cursor_position(self, event: KeyPressEvent) -> None: 

350 """ 

351 After every command, make sure that if we are in Vi navigation mode, we 

352 never put the cursor after the last character of a line. (Unless it's 

353 an empty line.) 

354 """ 

355 app = event.app 

356 buff = app.current_buffer 

357 preferred_column = buff.preferred_column 

358 

359 if ( 

360 vi_navigation_mode() 

361 and buff.document.is_cursor_at_the_end_of_line 

362 and len(buff.document.current_line) > 0 

363 ): 

364 buff.cursor_position -= 1 

365 

366 # Set the preferred_column for arrow up/down again. 

367 # (This was cleared after changing the cursor position.) 

368 buff.preferred_column = preferred_column 

369 

370 def _leave_vi_temp_navigation_mode(self, event: KeyPressEvent) -> None: 

371 """ 

372 If we're in Vi temporary navigation (normal) mode, return to 

373 insert/replace mode after executing one action. 

374 """ 

375 app = event.app 

376 

377 if app.editing_mode == EditingMode.VI: 

378 # Not waiting for a text object and no argument has been given. 

379 if app.vi_state.operator_func is None and self.arg is None: 

380 app.vi_state.temporary_navigation_mode = False 

381 

382 def _start_timeout(self) -> None: 

383 """ 

384 Start auto flush timeout. Similar to Vim's `timeoutlen` option. 

385 

386 Start a background coroutine with a timer. When this timeout expires 

387 and no key was pressed in the meantime, we flush all data in the queue 

388 and call the appropriate key binding handlers. 

389 """ 

390 app = get_app() 

391 timeout = app.timeoutlen 

392 

393 if timeout is None: 

394 return 

395 

396 async def wait() -> None: 

397 "Wait for timeout." 

398 # This sleep can be cancelled. In that case we don't flush. 

399 await sleep(timeout) 

400 

401 if len(self.key_buffer) > 0: 

402 # (No keys pressed in the meantime.) 

403 flush_keys() 

404 

405 def flush_keys() -> None: 

406 "Flush keys." 

407 self.feed(_Flush) 

408 self.process_keys() 

409 

410 # Automatically flush keys. 

411 if self._flush_wait_task: 

412 self._flush_wait_task.cancel() 

413 self._flush_wait_task = app.create_background_task(wait()) 

414 

415 def send_sigint(self) -> None: 

416 """ 

417 Send SIGINT. Immediately call the SIGINT key handler. 

418 """ 

419 self.feed(KeyPress(key=Keys.SIGINT), first=True) 

420 self.process_keys() 

421 

422 

423class KeyPressEvent: 

424 """ 

425 Key press event, delivered to key bindings. 

426 

427 :param key_processor_ref: Weak reference to the `KeyProcessor`. 

428 :param arg: Repetition argument. 

429 :param key_sequence: List of `KeyPress` instances. 

430 :param previouskey_sequence: Previous list of `KeyPress` instances. 

431 :param is_repeat: True when the previous event was delivered to the same handler. 

432 """ 

433 

434 def __init__( 

435 self, 

436 key_processor_ref: weakref.ReferenceType[KeyProcessor], 

437 arg: str | None, 

438 key_sequence: list[KeyPress], 

439 previous_key_sequence: list[KeyPress], 

440 is_repeat: bool, 

441 ) -> None: 

442 self._key_processor_ref = key_processor_ref 

443 self.key_sequence = key_sequence 

444 self.previous_key_sequence = previous_key_sequence 

445 

446 #: True when the previous key sequence was handled by the same handler. 

447 self.is_repeat = is_repeat 

448 

449 self._arg = arg 

450 self._app = get_app() 

451 

452 def __repr__(self) -> str: 

453 return "KeyPressEvent(arg={!r}, key_sequence={!r}, is_repeat={!r})".format( 

454 self.arg, 

455 self.key_sequence, 

456 self.is_repeat, 

457 ) 

458 

459 @property 

460 def data(self) -> str: 

461 return self.key_sequence[-1].data 

462 

463 @property 

464 def key_processor(self) -> KeyProcessor: 

465 processor = self._key_processor_ref() 

466 if processor is None: 

467 raise Exception("KeyProcessor was lost. This should not happen.") 

468 return processor 

469 

470 @property 

471 def app(self) -> Application[Any]: 

472 """ 

473 The current `Application` object. 

474 """ 

475 return self._app 

476 

477 @property 

478 def current_buffer(self) -> Buffer: 

479 """ 

480 The current buffer. 

481 """ 

482 return self.app.current_buffer 

483 

484 @property 

485 def arg(self) -> int: 

486 """ 

487 Repetition argument. 

488 """ 

489 if self._arg == "-": 

490 return -1 

491 

492 result = int(self._arg or 1) 

493 

494 # Don't exceed a million. 

495 if int(result) >= 1000000: 

496 result = 1 

497 

498 return result 

499 

500 @property 

501 def arg_present(self) -> bool: 

502 """ 

503 True if repetition argument was explicitly provided. 

504 """ 

505 return self._arg is not None 

506 

507 def append_to_arg_count(self, data: str) -> None: 

508 """ 

509 Add digit to the input argument. 

510 

511 :param data: the typed digit as string 

512 """ 

513 assert data in "-0123456789" 

514 current = self._arg 

515 

516 if data == "-": 

517 assert current is None or current == "-" 

518 result = data 

519 elif current is None: 

520 result = data 

521 else: 

522 result = f"{current}{data}" 

523 

524 self.key_processor.arg = result 

525 

526 @property 

527 def cli(self) -> Application[Any]: 

528 "For backward-compatibility." 

529 return self.app