Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/key_binding/key_processor.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

239 statements  

1""" 

2An :class:`~.KeyProcessor` receives callbacks for the keystrokes parsed from 

3the input in the :class:`~prompt_toolkit.inputstream.InputStream` instance. 

4 

5The `KeyProcessor` will according to the implemented keybindings call the 

6correct callbacks when new key presses are feed through `feed`. 

7""" 

8 

9from __future__ import annotations 

10 

11import weakref 

12from asyncio import Task, sleep 

13from collections import deque 

14from typing import TYPE_CHECKING, Any, Generator 

15 

16from prompt_toolkit.application.current import get_app 

17from prompt_toolkit.enums import EditingMode 

18from prompt_toolkit.filters.app import vi_navigation_mode 

19from prompt_toolkit.keys import Keys 

20from prompt_toolkit.utils import Event 

21 

22from .key_bindings import Binding, KeyBindingsBase 

23 

24if TYPE_CHECKING: 

25 from prompt_toolkit.application import Application 

26 from prompt_toolkit.buffer import Buffer 

27 

28 

29__all__ = [ 

30 "KeyProcessor", 

31 "KeyPress", 

32 "KeyPressEvent", 

33] 

34 

35 

36class KeyPress: 

37 """ 

38 :param key: A `Keys` instance or text (one character). 

39 :param data: The received string on stdin. (Often vt100 escape codes.) 

40 """ 

41 

42 def __init__(self, key: Keys | str, data: str | None = None) -> None: 

43 assert isinstance(key, Keys) or len(key) == 1 

44 

45 if data is None: 

46 if isinstance(key, Keys): 

47 data = key.value 

48 else: 

49 data = key # 'key' is a one character string. 

50 

51 self.key = key 

52 self.data = data 

53 

54 def __repr__(self) -> str: 

55 return f"{self.__class__.__name__}(key={self.key!r}, data={self.data!r})" 

56 

57 def __eq__(self, other: object) -> bool: 

58 if not isinstance(other, KeyPress): 

59 return False 

60 return self.key == other.key and self.data == other.data 

61 

62 

63""" 

64Helper object to indicate flush operation in the KeyProcessor. 

65NOTE: the implementation is very similar to the VT100 parser. 

66""" 

67_Flush = KeyPress("?", data="_Flush") 

68 

69 

70class KeyProcessor: 

71 """ 

72 Statemachine that receives :class:`KeyPress` instances and according to the 

73 key bindings in the given :class:`KeyBindings`, calls the matching handlers. 

74 

75 :: 

76 

77 p = KeyProcessor(key_bindings) 

78 

79 # Send keys into the processor. 

80 p.feed(KeyPress(Keys.ControlX, '\x18')) 

81 p.feed(KeyPress(Keys.ControlC, '\x03') 

82 

83 # Process all the keys in the queue. 

84 p.process_keys() 

85 

86 # Now the ControlX-ControlC callback will be called if this sequence is 

87 # registered in the key bindings. 

88 

89 :param key_bindings: `KeyBindingsBase` instance. 

90 """ 

91 

92 def __init__(self, key_bindings: KeyBindingsBase) -> None: 

93 self._bindings = key_bindings 

94 

95 self.before_key_press = Event(self) 

96 self.after_key_press = Event(self) 

97 

98 self._flush_wait_task: Task[None] | None = None 

99 

100 self.reset() 

101 

102 def reset(self) -> None: 

103 self._previous_key_sequence: list[KeyPress] = [] 

104 self._previous_handler: Binding | None = None 

105 

106 # The queue of keys not yet send to our _process generator/state machine. 

107 self.input_queue: deque[KeyPress] = deque() 

108 

109 # The key buffer that is matched in the generator state machine. 

110 # (This is at at most the amount of keys that make up for one key binding.) 

111 self.key_buffer: list[KeyPress] = [] 

112 

113 #: Readline argument (for repetition of commands.) 

114 #: https://www.gnu.org/software/bash/manual/html_node/Readline-Arguments.html 

115 self.arg: str | None = None 

116 

117 # Start the processor coroutine. 

118 self._process_coroutine = self._process() 

119 self._process_coroutine.send(None) # type: ignore 

120 

121 def _get_matches(self, key_presses: list[KeyPress]) -> list[Binding]: 

122 """ 

123 For a list of :class:`KeyPress` instances. Give the matching handlers 

124 that would handle this. 

125 """ 

126 keys = tuple(k.key for k in key_presses) 

127 

128 # Try match, with mode flag 

129 return [b for b in self._bindings.get_bindings_for_keys(keys) if b.filter()] 

130 

131 def _is_prefix_of_longer_match(self, key_presses: list[KeyPress]) -> bool: 

132 """ 

133 For a list of :class:`KeyPress` instances. Return True if there is any 

134 handler that is bound to a suffix of this keys. 

135 """ 

136 keys = tuple(k.key for k in key_presses) 

137 

138 # Get the filters for all the key bindings that have a longer match. 

139 # Note that we transform it into a `set`, because we don't care about 

140 # the actual bindings and executing it more than once doesn't make 

141 # sense. (Many key bindings share the same filter.) 

142 filters = { 

143 b.filter for b in self._bindings.get_bindings_starting_with_keys(keys) 

144 } 

145 

146 # When any key binding is active, return True. 

147 return any(f() for f in filters) 

148 

149 def _process(self) -> Generator[None, KeyPress, None]: 

150 """ 

151 Coroutine implementing the key match algorithm. Key strokes are sent 

152 into this generator, and it calls the appropriate handlers. 

153 """ 

154 buffer = self.key_buffer 

155 retry = False 

156 

157 while True: 

158 flush = False 

159 

160 if retry: 

161 retry = False 

162 else: 

163 key = yield 

164 if key is _Flush: 

165 flush = True 

166 else: 

167 buffer.append(key) 

168 

169 # If we have some key presses, check for matches. 

170 if buffer: 

171 matches = self._get_matches(buffer) 

172 

173 if flush: 

174 is_prefix_of_longer_match = False 

175 else: 

176 is_prefix_of_longer_match = self._is_prefix_of_longer_match(buffer) 

177 

178 # When eager matches were found, give priority to them and also 

179 # ignore all the longer matches. 

180 eager_matches = [m for m in matches if m.eager()] 

181 

182 if eager_matches: 

183 matches = eager_matches 

184 is_prefix_of_longer_match = False 

185 

186 # Exact matches found, call handler. 

187 if not is_prefix_of_longer_match and matches: 

188 self._call_handler(matches[-1], key_sequence=buffer[:]) 

189 del buffer[:] # Keep reference. 

190 

191 # No match found. 

192 elif not is_prefix_of_longer_match and not matches: 

193 retry = True 

194 found = False 

195 

196 # Loop over the input, try longest match first and shift. 

197 for i in range(len(buffer), 0, -1): 

198 matches = self._get_matches(buffer[:i]) 

199 if matches: 

200 self._call_handler(matches[-1], key_sequence=buffer[:i]) 

201 del buffer[:i] 

202 found = True 

203 break 

204 

205 if not found: 

206 del buffer[:1] 

207 

208 def feed(self, key_press: KeyPress, first: bool = False) -> None: 

209 """ 

210 Add a new :class:`KeyPress` to the input queue. 

211 (Don't forget to call `process_keys` in order to process the queue.) 

212 

213 :param first: If true, insert before everything else. 

214 """ 

215 if first: 

216 self.input_queue.appendleft(key_press) 

217 else: 

218 self.input_queue.append(key_press) 

219 

220 def feed_multiple(self, key_presses: list[KeyPress], first: bool = False) -> None: 

221 """ 

222 :param first: If true, insert before everything else. 

223 """ 

224 if first: 

225 self.input_queue.extendleft(reversed(key_presses)) 

226 else: 

227 self.input_queue.extend(key_presses) 

228 

229 def process_keys(self) -> None: 

230 """ 

231 Process all the keys in the `input_queue`. 

232 (To be called after `feed`.) 

233 

234 Note: because of the `feed`/`process_keys` separation, it is 

235 possible to call `feed` from inside a key binding. 

236 This function keeps looping until the queue is empty. 

237 """ 

238 app = get_app() 

239 

240 def not_empty() -> bool: 

241 # When the application result is set, stop processing keys. (E.g. 

242 # if ENTER was received, followed by a few additional key strokes, 

243 # leave the other keys in the queue.) 

244 if app.is_done: 

245 # But if there are still CPRResponse keys in the queue, these 

246 # need to be processed. 

247 return any(k for k in self.input_queue if k.key == Keys.CPRResponse) 

248 else: 

249 return bool(self.input_queue) 

250 

251 def get_next() -> KeyPress: 

252 if app.is_done: 

253 # Only process CPR responses. Everything else is typeahead. 

254 cpr = [k for k in self.input_queue if k.key == Keys.CPRResponse][0] 

255 self.input_queue.remove(cpr) 

256 return cpr 

257 else: 

258 return self.input_queue.popleft() 

259 

260 is_flush = False 

261 

262 while not_empty(): 

263 # Process next key. 

264 key_press = get_next() 

265 

266 is_flush = key_press is _Flush 

267 is_cpr = key_press.key == Keys.CPRResponse 

268 

269 if not is_flush and not is_cpr: 

270 self.before_key_press.fire() 

271 

272 try: 

273 self._process_coroutine.send(key_press) 

274 except Exception: 

275 # If for some reason something goes wrong in the parser, (maybe 

276 # an exception was raised) restart the processor for next time. 

277 self.reset() 

278 self.empty_queue() 

279 raise 

280 

281 if not is_flush and not is_cpr: 

282 self.after_key_press.fire() 

283 

284 # Skip timeout if the last key was flush. 

285 if not is_flush: 

286 self._start_timeout() 

287 

288 def empty_queue(self) -> list[KeyPress]: 

289 """ 

290 Empty the input queue. Return the unprocessed input. 

291 """ 

292 key_presses = list(self.input_queue) 

293 self.input_queue.clear() 

294 

295 # Filter out CPRs. We don't want to return these. 

296 key_presses = [k for k in key_presses if k.key != Keys.CPRResponse] 

297 return key_presses 

298 

299 def _call_handler(self, handler: Binding, key_sequence: list[KeyPress]) -> None: 

300 app = get_app() 

301 was_recording_emacs = app.emacs_state.is_recording 

302 was_recording_vi = bool(app.vi_state.recording_register) 

303 was_temporary_navigation_mode = app.vi_state.temporary_navigation_mode 

304 arg = self.arg 

305 self.arg = None 

306 

307 event = KeyPressEvent( 

308 weakref.ref(self), 

309 arg=arg, 

310 key_sequence=key_sequence, 

311 previous_key_sequence=self._previous_key_sequence, 

312 is_repeat=(handler == self._previous_handler), 

313 ) 

314 

315 # Save the state of the current buffer. 

316 if handler.save_before(event): 

317 event.app.current_buffer.save_to_undo_stack() 

318 

319 # Call handler. 

320 from prompt_toolkit.buffer import EditReadOnlyBuffer 

321 

322 try: 

323 handler.call(event) 

324 self._fix_vi_cursor_position(event) 

325 

326 except EditReadOnlyBuffer: 

327 # When a key binding does an attempt to change a buffer which is 

328 # read-only, we can ignore that. We sound a bell and go on. 

329 app.output.bell() 

330 

331 if was_temporary_navigation_mode: 

332 self._leave_vi_temp_navigation_mode(event) 

333 

334 self._previous_key_sequence = key_sequence 

335 self._previous_handler = handler 

336 

337 # Record the key sequence in our macro. (Only if we're in macro mode 

338 # before and after executing the key.) 

339 if handler.record_in_macro(): 

340 if app.emacs_state.is_recording and was_recording_emacs: 

341 recording = app.emacs_state.current_recording 

342 if recording is not None: # Should always be true, given that 

343 # `was_recording_emacs` is set. 

344 recording.extend(key_sequence) 

345 

346 if app.vi_state.recording_register and was_recording_vi: 

347 for k in key_sequence: 

348 app.vi_state.current_recording += k.data 

349 

350 def _fix_vi_cursor_position(self, event: KeyPressEvent) -> None: 

351 """ 

352 After every command, make sure that if we are in Vi navigation mode, we 

353 never put the cursor after the last character of a line. (Unless it's 

354 an empty line.) 

355 """ 

356 app = event.app 

357 buff = app.current_buffer 

358 preferred_column = buff.preferred_column 

359 

360 if ( 

361 vi_navigation_mode() 

362 and buff.document.is_cursor_at_the_end_of_line 

363 and len(buff.document.current_line) > 0 

364 ): 

365 buff.cursor_position -= 1 

366 

367 # Set the preferred_column for arrow up/down again. 

368 # (This was cleared after changing the cursor position.) 

369 buff.preferred_column = preferred_column 

370 

371 def _leave_vi_temp_navigation_mode(self, event: KeyPressEvent) -> None: 

372 """ 

373 If we're in Vi temporary navigation (normal) mode, return to 

374 insert/replace mode after executing one action. 

375 """ 

376 app = event.app 

377 

378 if app.editing_mode == EditingMode.VI: 

379 # Not waiting for a text object and no argument has been given. 

380 if app.vi_state.operator_func is None and self.arg is None: 

381 app.vi_state.temporary_navigation_mode = False 

382 

383 def _start_timeout(self) -> None: 

384 """ 

385 Start auto flush timeout. Similar to Vim's `timeoutlen` option. 

386 

387 Start a background coroutine with a timer. When this timeout expires 

388 and no key was pressed in the meantime, we flush all data in the queue 

389 and call the appropriate key binding handlers. 

390 """ 

391 app = get_app() 

392 timeout = app.timeoutlen 

393 

394 if timeout is None: 

395 return 

396 

397 async def wait() -> None: 

398 "Wait for timeout." 

399 # This sleep can be cancelled. In that case we don't flush. 

400 await sleep(timeout) 

401 

402 if len(self.key_buffer) > 0: 

403 # (No keys pressed in the meantime.) 

404 flush_keys() 

405 

406 def flush_keys() -> None: 

407 "Flush keys." 

408 self.feed(_Flush) 

409 self.process_keys() 

410 

411 # Automatically flush keys. 

412 if self._flush_wait_task: 

413 self._flush_wait_task.cancel() 

414 self._flush_wait_task = app.create_background_task(wait()) 

415 

416 def send_sigint(self) -> None: 

417 """ 

418 Send SIGINT. Immediately call the SIGINT key handler. 

419 """ 

420 self.feed(KeyPress(key=Keys.SIGINT), first=True) 

421 self.process_keys() 

422 

423 

424class KeyPressEvent: 

425 """ 

426 Key press event, delivered to key bindings. 

427 

428 :param key_processor_ref: Weak reference to the `KeyProcessor`. 

429 :param arg: Repetition argument. 

430 :param key_sequence: List of `KeyPress` instances. 

431 :param previouskey_sequence: Previous list of `KeyPress` instances. 

432 :param is_repeat: True when the previous event was delivered to the same handler. 

433 """ 

434 

435 def __init__( 

436 self, 

437 key_processor_ref: weakref.ReferenceType[KeyProcessor], 

438 arg: str | None, 

439 key_sequence: list[KeyPress], 

440 previous_key_sequence: list[KeyPress], 

441 is_repeat: bool, 

442 ) -> None: 

443 self._key_processor_ref = key_processor_ref 

444 self.key_sequence = key_sequence 

445 self.previous_key_sequence = previous_key_sequence 

446 

447 #: True when the previous key sequence was handled by the same handler. 

448 self.is_repeat = is_repeat 

449 

450 self._arg = arg 

451 self._app = get_app() 

452 

453 def __repr__(self) -> str: 

454 return f"KeyPressEvent(arg={self.arg!r}, key_sequence={self.key_sequence!r}, is_repeat={self.is_repeat!r})" 

455 

456 @property 

457 def data(self) -> str: 

458 return self.key_sequence[-1].data 

459 

460 @property 

461 def key_processor(self) -> KeyProcessor: 

462 processor = self._key_processor_ref() 

463 if processor is None: 

464 raise Exception("KeyProcessor was lost. This should not happen.") 

465 return processor 

466 

467 @property 

468 def app(self) -> Application[Any]: 

469 """ 

470 The current `Application` object. 

471 """ 

472 return self._app 

473 

474 @property 

475 def current_buffer(self) -> Buffer: 

476 """ 

477 The current buffer. 

478 """ 

479 return self.app.current_buffer 

480 

481 @property 

482 def arg(self) -> int: 

483 """ 

484 Repetition argument. 

485 """ 

486 if self._arg == "-": 

487 return -1 

488 

489 result = int(self._arg or 1) 

490 

491 # Don't exceed a million. 

492 if int(result) >= 1000000: 

493 result = 1 

494 

495 return result 

496 

497 @property 

498 def arg_present(self) -> bool: 

499 """ 

500 True if repetition argument was explicitly provided. 

501 """ 

502 return self._arg is not None 

503 

504 def append_to_arg_count(self, data: str) -> None: 

505 """ 

506 Add digit to the input argument. 

507 

508 :param data: the typed digit as string 

509 """ 

510 assert data in "-0123456789" 

511 current = self._arg 

512 

513 if data == "-": 

514 assert current is None or current == "-" 

515 result = data 

516 elif current is None: 

517 result = data 

518 else: 

519 result = f"{current}{data}" 

520 

521 self.key_processor.arg = result 

522 

523 @property 

524 def cli(self) -> Application[Any]: 

525 "For backward-compatibility." 

526 return self.app