Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/key_binding/key_processor.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

240 statements  

1""" 

2An :class:`~.KeyProcessor` receives callbacks for the keystrokes parsed from 

3the input in the :class:`~prompt_toolkit.inputstream.InputStream` instance. 

4 

5The `KeyProcessor` will according to the implemented keybindings call the 

6correct callbacks when new key presses are feed through `feed`. 

7""" 

8 

9from __future__ import annotations 

10 

11import weakref 

12from asyncio import Task, sleep 

13from collections import deque 

14from collections.abc import Generator 

15from typing import TYPE_CHECKING, Any 

16 

17from prompt_toolkit.application.current import get_app 

18from prompt_toolkit.enums import EditingMode 

19from prompt_toolkit.filters.app import vi_navigation_mode 

20from prompt_toolkit.keys import Keys 

21from prompt_toolkit.utils import Event 

22 

23from .key_bindings import Binding, KeyBindingsBase 

24 

25if TYPE_CHECKING: 

26 from prompt_toolkit.application import Application 

27 from prompt_toolkit.buffer import Buffer 

28 

29 

30__all__ = [ 

31 "KeyProcessor", 

32 "KeyPress", 

33 "KeyPressEvent", 

34] 

35 

36 

37class KeyPress: 

38 """ 

39 :param key: A `Keys` instance or text (one character). 

40 :param data: The received string on stdin. (Often vt100 escape codes.) 

41 """ 

42 

43 def __init__(self, key: Keys | str, data: str | None = None) -> None: 

44 assert isinstance(key, Keys) or len(key) == 1 

45 

46 if data is None: 

47 if isinstance(key, Keys): 

48 data = key.value 

49 else: 

50 data = key # 'key' is a one character string. 

51 

52 self.key = key 

53 self.data = data 

54 

55 def __repr__(self) -> str: 

56 return f"{self.__class__.__name__}(key={self.key!r}, data={self.data!r})" 

57 

58 def __eq__(self, other: object) -> bool: 

59 if not isinstance(other, KeyPress): 

60 return False 

61 return self.key == other.key and self.data == other.data 

62 

63 

64""" 

65Helper object to indicate flush operation in the KeyProcessor. 

66NOTE: the implementation is very similar to the VT100 parser. 

67""" 

68_Flush = KeyPress("?", data="_Flush") 

69 

70 

71class KeyProcessor: 

72 """ 

73 Statemachine that receives :class:`KeyPress` instances and according to the 

74 key bindings in the given :class:`KeyBindings`, calls the matching handlers. 

75 

76 :: 

77 

78 p = KeyProcessor(key_bindings) 

79 

80 # Send keys into the processor. 

81 p.feed(KeyPress(Keys.ControlX, '\x18')) 

82 p.feed(KeyPress(Keys.ControlC, '\x03') 

83 

84 # Process all the keys in the queue. 

85 p.process_keys() 

86 

87 # Now the ControlX-ControlC callback will be called if this sequence is 

88 # registered in the key bindings. 

89 

90 :param key_bindings: `KeyBindingsBase` instance. 

91 """ 

92 

93 def __init__(self, key_bindings: KeyBindingsBase) -> None: 

94 self._bindings = key_bindings 

95 

96 self.before_key_press = Event(self) 

97 self.after_key_press = Event(self) 

98 

99 self._flush_wait_task: Task[None] | None = None 

100 

101 self.reset() 

102 

103 def reset(self) -> None: 

104 self._previous_key_sequence: list[KeyPress] = [] 

105 self._previous_handler: Binding | None = None 

106 

107 # The queue of keys not yet send to our _process generator/state machine. 

108 self.input_queue: deque[KeyPress] = deque() 

109 

110 # The key buffer that is matched in the generator state machine. 

111 # (This is at at most the amount of keys that make up for one key binding.) 

112 self.key_buffer: list[KeyPress] = [] 

113 

114 #: Readline argument (for repetition of commands.) 

115 #: https://www.gnu.org/software/bash/manual/html_node/Readline-Arguments.html 

116 self.arg: str | None = None 

117 

118 # Start the processor coroutine. 

119 self._process_coroutine = self._process() 

120 self._process_coroutine.send(None) # type: ignore 

121 

122 def _get_matches(self, key_presses: list[KeyPress]) -> list[Binding]: 

123 """ 

124 For a list of :class:`KeyPress` instances. Give the matching handlers 

125 that would handle this. 

126 """ 

127 keys = tuple(k.key for k in key_presses) 

128 

129 # Try match, with mode flag 

130 return [b for b in self._bindings.get_bindings_for_keys(keys) if b.filter()] 

131 

132 def _is_prefix_of_longer_match(self, key_presses: list[KeyPress]) -> bool: 

133 """ 

134 For a list of :class:`KeyPress` instances. Return True if there is any 

135 handler that is bound to a suffix of this keys. 

136 """ 

137 keys = tuple(k.key for k in key_presses) 

138 

139 # Get the filters for all the key bindings that have a longer match. 

140 # Note that we transform it into a `set`, because we don't care about 

141 # the actual bindings and executing it more than once doesn't make 

142 # sense. (Many key bindings share the same filter.) 

143 filters = { 

144 b.filter for b in self._bindings.get_bindings_starting_with_keys(keys) 

145 } 

146 

147 # When any key binding is active, return True. 

148 return any(f() for f in filters) 

149 

150 def _process(self) -> Generator[None, KeyPress, None]: 

151 """ 

152 Coroutine implementing the key match algorithm. Key strokes are sent 

153 into this generator, and it calls the appropriate handlers. 

154 """ 

155 buffer = self.key_buffer 

156 retry = False 

157 

158 while True: 

159 flush = False 

160 

161 if retry: 

162 retry = False 

163 else: 

164 key = yield 

165 if key is _Flush: 

166 flush = True 

167 else: 

168 buffer.append(key) 

169 

170 # If we have some key presses, check for matches. 

171 if buffer: 

172 matches = self._get_matches(buffer) 

173 

174 if flush: 

175 is_prefix_of_longer_match = False 

176 else: 

177 is_prefix_of_longer_match = self._is_prefix_of_longer_match(buffer) 

178 

179 # When eager matches were found, give priority to them and also 

180 # ignore all the longer matches. 

181 eager_matches = [m for m in matches if m.eager()] 

182 

183 if eager_matches: 

184 matches = eager_matches 

185 is_prefix_of_longer_match = False 

186 

187 # Exact matches found, call handler. 

188 if not is_prefix_of_longer_match and matches: 

189 self._call_handler(matches[-1], key_sequence=buffer[:]) 

190 del buffer[:] # Keep reference. 

191 

192 # No match found. 

193 elif not is_prefix_of_longer_match and not matches: 

194 retry = True 

195 found = False 

196 

197 # Loop over the input, try longest match first and shift. 

198 for i in range(len(buffer), 0, -1): 

199 matches = self._get_matches(buffer[:i]) 

200 if matches: 

201 self._call_handler(matches[-1], key_sequence=buffer[:i]) 

202 del buffer[:i] 

203 found = True 

204 break 

205 

206 if not found: 

207 del buffer[:1] 

208 

209 def feed(self, key_press: KeyPress, first: bool = False) -> None: 

210 """ 

211 Add a new :class:`KeyPress` to the input queue. 

212 (Don't forget to call `process_keys` in order to process the queue.) 

213 

214 :param first: If true, insert before everything else. 

215 """ 

216 if first: 

217 self.input_queue.appendleft(key_press) 

218 else: 

219 self.input_queue.append(key_press) 

220 

221 def feed_multiple(self, key_presses: list[KeyPress], first: bool = False) -> None: 

222 """ 

223 :param first: If true, insert before everything else. 

224 """ 

225 if first: 

226 self.input_queue.extendleft(reversed(key_presses)) 

227 else: 

228 self.input_queue.extend(key_presses) 

229 

230 def process_keys(self) -> None: 

231 """ 

232 Process all the keys in the `input_queue`. 

233 (To be called after `feed`.) 

234 

235 Note: because of the `feed`/`process_keys` separation, it is 

236 possible to call `feed` from inside a key binding. 

237 This function keeps looping until the queue is empty. 

238 """ 

239 app = get_app() 

240 

241 def not_empty() -> bool: 

242 # When the application result is set, stop processing keys. (E.g. 

243 # if ENTER was received, followed by a few additional key strokes, 

244 # leave the other keys in the queue.) 

245 if app.is_done: 

246 # But if there are still CPRResponse keys in the queue, these 

247 # need to be processed. 

248 return any(k for k in self.input_queue if k.key == Keys.CPRResponse) 

249 else: 

250 return bool(self.input_queue) 

251 

252 def get_next() -> KeyPress: 

253 if app.is_done: 

254 # Only process CPR responses. Everything else is typeahead. 

255 cpr = [k for k in self.input_queue if k.key == Keys.CPRResponse][0] 

256 self.input_queue.remove(cpr) 

257 return cpr 

258 else: 

259 return self.input_queue.popleft() 

260 

261 is_flush = False 

262 

263 while not_empty(): 

264 # Process next key. 

265 key_press = get_next() 

266 

267 is_flush = key_press is _Flush 

268 is_cpr = key_press.key == Keys.CPRResponse 

269 

270 if not is_flush and not is_cpr: 

271 self.before_key_press.fire() 

272 

273 try: 

274 self._process_coroutine.send(key_press) 

275 except Exception: 

276 # If for some reason something goes wrong in the parser, (maybe 

277 # an exception was raised) restart the processor for next time. 

278 self.reset() 

279 self.empty_queue() 

280 raise 

281 

282 if not is_flush and not is_cpr: 

283 self.after_key_press.fire() 

284 

285 # Skip timeout if the last key was flush. 

286 if not is_flush: 

287 self._start_timeout() 

288 

289 def empty_queue(self) -> list[KeyPress]: 

290 """ 

291 Empty the input queue. Return the unprocessed input. 

292 """ 

293 key_presses = list(self.input_queue) 

294 self.input_queue.clear() 

295 

296 # Filter out CPRs. We don't want to return these. 

297 key_presses = [k for k in key_presses if k.key != Keys.CPRResponse] 

298 return key_presses 

299 

300 def _call_handler(self, handler: Binding, key_sequence: list[KeyPress]) -> None: 

301 app = get_app() 

302 was_recording_emacs = app.emacs_state.is_recording 

303 was_recording_vi = bool(app.vi_state.recording_register) 

304 was_temporary_navigation_mode = app.vi_state.temporary_navigation_mode 

305 arg = self.arg 

306 self.arg = None 

307 

308 event = KeyPressEvent( 

309 weakref.ref(self), 

310 arg=arg, 

311 key_sequence=key_sequence, 

312 previous_key_sequence=self._previous_key_sequence, 

313 is_repeat=(handler == self._previous_handler), 

314 ) 

315 

316 # Save the state of the current buffer. 

317 if handler.save_before(event): 

318 event.app.current_buffer.save_to_undo_stack() 

319 

320 # Call handler. 

321 from prompt_toolkit.buffer import EditReadOnlyBuffer 

322 

323 try: 

324 handler.call(event) 

325 self._fix_vi_cursor_position(event) 

326 

327 except EditReadOnlyBuffer: 

328 # When a key binding does an attempt to change a buffer which is 

329 # read-only, we can ignore that. We sound a bell and go on. 

330 app.output.bell() 

331 

332 if was_temporary_navigation_mode: 

333 self._leave_vi_temp_navigation_mode(event) 

334 

335 self._previous_key_sequence = key_sequence 

336 self._previous_handler = handler 

337 

338 # Record the key sequence in our macro. (Only if we're in macro mode 

339 # before and after executing the key.) 

340 if handler.record_in_macro(): 

341 if app.emacs_state.is_recording and was_recording_emacs: 

342 recording = app.emacs_state.current_recording 

343 if recording is not None: # Should always be true, given that 

344 # `was_recording_emacs` is set. 

345 recording.extend(key_sequence) 

346 

347 if app.vi_state.recording_register and was_recording_vi: 

348 for k in key_sequence: 

349 app.vi_state.current_recording += k.data 

350 

351 def _fix_vi_cursor_position(self, event: KeyPressEvent) -> None: 

352 """ 

353 After every command, make sure that if we are in Vi navigation mode, we 

354 never put the cursor after the last character of a line. (Unless it's 

355 an empty line.) 

356 """ 

357 app = event.app 

358 buff = app.current_buffer 

359 preferred_column = buff.preferred_column 

360 

361 if ( 

362 vi_navigation_mode() 

363 and buff.document.is_cursor_at_the_end_of_line 

364 and len(buff.document.current_line) > 0 

365 ): 

366 buff.cursor_position -= 1 

367 

368 # Set the preferred_column for arrow up/down again. 

369 # (This was cleared after changing the cursor position.) 

370 buff.preferred_column = preferred_column 

371 

372 def _leave_vi_temp_navigation_mode(self, event: KeyPressEvent) -> None: 

373 """ 

374 If we're in Vi temporary navigation (normal) mode, return to 

375 insert/replace mode after executing one action. 

376 """ 

377 app = event.app 

378 

379 if app.editing_mode == EditingMode.VI: 

380 # Not waiting for a text object and no argument has been given. 

381 if app.vi_state.operator_func is None and self.arg is None: 

382 app.vi_state.temporary_navigation_mode = False 

383 

384 def _start_timeout(self) -> None: 

385 """ 

386 Start auto flush timeout. Similar to Vim's `timeoutlen` option. 

387 

388 Start a background coroutine with a timer. When this timeout expires 

389 and no key was pressed in the meantime, we flush all data in the queue 

390 and call the appropriate key binding handlers. 

391 """ 

392 app = get_app() 

393 timeout = app.timeoutlen 

394 

395 if timeout is None: 

396 return 

397 

398 async def wait() -> None: 

399 "Wait for timeout." 

400 # This sleep can be cancelled. In that case we don't flush. 

401 await sleep(timeout) 

402 

403 if len(self.key_buffer) > 0: 

404 # (No keys pressed in the meantime.) 

405 flush_keys() 

406 

407 def flush_keys() -> None: 

408 "Flush keys." 

409 self.feed(_Flush) 

410 self.process_keys() 

411 

412 # Automatically flush keys. 

413 if self._flush_wait_task: 

414 self._flush_wait_task.cancel() 

415 self._flush_wait_task = app.create_background_task(wait()) 

416 

417 def send_sigint(self) -> None: 

418 """ 

419 Send SIGINT. Immediately call the SIGINT key handler. 

420 """ 

421 self.feed(KeyPress(key=Keys.SIGINT), first=True) 

422 self.process_keys() 

423 

424 

425class KeyPressEvent: 

426 """ 

427 Key press event, delivered to key bindings. 

428 

429 :param key_processor_ref: Weak reference to the `KeyProcessor`. 

430 :param arg: Repetition argument. 

431 :param key_sequence: List of `KeyPress` instances. 

432 :param previouskey_sequence: Previous list of `KeyPress` instances. 

433 :param is_repeat: True when the previous event was delivered to the same handler. 

434 """ 

435 

436 def __init__( 

437 self, 

438 key_processor_ref: weakref.ReferenceType[KeyProcessor], 

439 arg: str | None, 

440 key_sequence: list[KeyPress], 

441 previous_key_sequence: list[KeyPress], 

442 is_repeat: bool, 

443 ) -> None: 

444 self._key_processor_ref = key_processor_ref 

445 self.key_sequence = key_sequence 

446 self.previous_key_sequence = previous_key_sequence 

447 

448 #: True when the previous key sequence was handled by the same handler. 

449 self.is_repeat = is_repeat 

450 

451 self._arg = arg 

452 self._app = get_app() 

453 

454 def __repr__(self) -> str: 

455 return f"KeyPressEvent(arg={self.arg!r}, key_sequence={self.key_sequence!r}, is_repeat={self.is_repeat!r})" 

456 

457 @property 

458 def data(self) -> str: 

459 return self.key_sequence[-1].data 

460 

461 @property 

462 def key_processor(self) -> KeyProcessor: 

463 processor = self._key_processor_ref() 

464 if processor is None: 

465 raise Exception("KeyProcessor was lost. This should not happen.") 

466 return processor 

467 

468 @property 

469 def app(self) -> Application[Any]: 

470 """ 

471 The current `Application` object. 

472 """ 

473 return self._app 

474 

475 @property 

476 def current_buffer(self) -> Buffer: 

477 """ 

478 The current buffer. 

479 """ 

480 return self.app.current_buffer 

481 

482 @property 

483 def arg(self) -> int: 

484 """ 

485 Repetition argument. 

486 """ 

487 if self._arg == "-": 

488 return -1 

489 

490 result = int(self._arg or 1) 

491 

492 # Don't exceed a million. 

493 if int(result) >= 1000000: 

494 result = 1 

495 

496 return result 

497 

498 @property 

499 def arg_present(self) -> bool: 

500 """ 

501 True if repetition argument was explicitly provided. 

502 """ 

503 return self._arg is not None 

504 

505 def append_to_arg_count(self, data: str) -> None: 

506 """ 

507 Add digit to the input argument. 

508 

509 :param data: the typed digit as string 

510 """ 

511 assert data in "-0123456789" 

512 current = self._arg 

513 

514 if data == "-": 

515 assert current is None or current == "-" 

516 result = data 

517 elif current is None: 

518 result = data 

519 else: 

520 result = f"{current}{data}" 

521 

522 self.key_processor.arg = result 

523 

524 @property 

525 def cli(self) -> Application[Any]: 

526 "For backward-compatibility." 

527 return self.app