1"""
2An :class:`~.KeyProcessor` receives callbacks for the keystrokes parsed from
3the input in the :class:`~prompt_toolkit.inputstream.InputStream` instance.
4
5The `KeyProcessor` will according to the implemented keybindings call the
6correct callbacks when new key presses are feed through `feed`.
7"""
8
9from __future__ import annotations
10
11import weakref
12from asyncio import Task, sleep
13from collections import deque
14from collections.abc import Generator
15from typing import TYPE_CHECKING, Any
16
17from prompt_toolkit.application.current import get_app
18from prompt_toolkit.enums import EditingMode
19from prompt_toolkit.filters.app import vi_navigation_mode
20from prompt_toolkit.keys import Keys
21from prompt_toolkit.utils import Event
22
23from .key_bindings import Binding, KeyBindingsBase
24
25if TYPE_CHECKING:
26 from prompt_toolkit.application import Application
27 from prompt_toolkit.buffer import Buffer
28
29
30__all__ = [
31 "KeyProcessor",
32 "KeyPress",
33 "KeyPressEvent",
34]
35
36
37class KeyPress:
38 """
39 :param key: A `Keys` instance or text (one character).
40 :param data: The received string on stdin. (Often vt100 escape codes.)
41 """
42
43 def __init__(self, key: Keys | str, data: str | None = None) -> None:
44 assert isinstance(key, Keys) or len(key) == 1
45
46 if data is None:
47 if isinstance(key, Keys):
48 data = key.value
49 else:
50 data = key # 'key' is a one character string.
51
52 self.key = key
53 self.data = data
54
55 def __repr__(self) -> str:
56 return f"{self.__class__.__name__}(key={self.key!r}, data={self.data!r})"
57
58 def __eq__(self, other: object) -> bool:
59 if not isinstance(other, KeyPress):
60 return False
61 return self.key == other.key and self.data == other.data
62
63
64"""
65Helper object to indicate flush operation in the KeyProcessor.
66NOTE: the implementation is very similar to the VT100 parser.
67"""
68_Flush = KeyPress("?", data="_Flush")
69
70
71class KeyProcessor:
72 """
73 Statemachine that receives :class:`KeyPress` instances and according to the
74 key bindings in the given :class:`KeyBindings`, calls the matching handlers.
75
76 ::
77
78 p = KeyProcessor(key_bindings)
79
80 # Send keys into the processor.
81 p.feed(KeyPress(Keys.ControlX, '\x18'))
82 p.feed(KeyPress(Keys.ControlC, '\x03')
83
84 # Process all the keys in the queue.
85 p.process_keys()
86
87 # Now the ControlX-ControlC callback will be called if this sequence is
88 # registered in the key bindings.
89
90 :param key_bindings: `KeyBindingsBase` instance.
91 """
92
93 def __init__(self, key_bindings: KeyBindingsBase) -> None:
94 self._bindings = key_bindings
95
96 self.before_key_press = Event(self)
97 self.after_key_press = Event(self)
98
99 self._flush_wait_task: Task[None] | None = None
100
101 self.reset()
102
103 def reset(self) -> None:
104 self._previous_key_sequence: list[KeyPress] = []
105 self._previous_handler: Binding | None = None
106
107 # The queue of keys not yet send to our _process generator/state machine.
108 self.input_queue: deque[KeyPress] = deque()
109
110 # The key buffer that is matched in the generator state machine.
111 # (This is at at most the amount of keys that make up for one key binding.)
112 self.key_buffer: list[KeyPress] = []
113
114 #: Readline argument (for repetition of commands.)
115 #: https://www.gnu.org/software/bash/manual/html_node/Readline-Arguments.html
116 self.arg: str | None = None
117
118 # Start the processor coroutine.
119 self._process_coroutine = self._process()
120 self._process_coroutine.send(None) # type: ignore
121
122 def _get_matches(self, key_presses: list[KeyPress]) -> list[Binding]:
123 """
124 For a list of :class:`KeyPress` instances. Give the matching handlers
125 that would handle this.
126 """
127 keys = tuple(k.key for k in key_presses)
128
129 # Try match, with mode flag
130 return [b for b in self._bindings.get_bindings_for_keys(keys) if b.filter()]
131
132 def _is_prefix_of_longer_match(self, key_presses: list[KeyPress]) -> bool:
133 """
134 For a list of :class:`KeyPress` instances. Return True if there is any
135 handler that is bound to a suffix of this keys.
136 """
137 keys = tuple(k.key for k in key_presses)
138
139 # Get the filters for all the key bindings that have a longer match.
140 # Note that we transform it into a `set`, because we don't care about
141 # the actual bindings and executing it more than once doesn't make
142 # sense. (Many key bindings share the same filter.)
143 filters = {
144 b.filter for b in self._bindings.get_bindings_starting_with_keys(keys)
145 }
146
147 # When any key binding is active, return True.
148 return any(f() for f in filters)
149
150 def _process(self) -> Generator[None, KeyPress, None]:
151 """
152 Coroutine implementing the key match algorithm. Key strokes are sent
153 into this generator, and it calls the appropriate handlers.
154 """
155 buffer = self.key_buffer
156 retry = False
157
158 while True:
159 flush = False
160
161 if retry:
162 retry = False
163 else:
164 key = yield
165 if key is _Flush:
166 flush = True
167 else:
168 buffer.append(key)
169
170 # If we have some key presses, check for matches.
171 if buffer:
172 matches = self._get_matches(buffer)
173
174 if flush:
175 is_prefix_of_longer_match = False
176 else:
177 is_prefix_of_longer_match = self._is_prefix_of_longer_match(buffer)
178
179 # When eager matches were found, give priority to them and also
180 # ignore all the longer matches.
181 eager_matches = [m for m in matches if m.eager()]
182
183 if eager_matches:
184 matches = eager_matches
185 is_prefix_of_longer_match = False
186
187 # Exact matches found, call handler.
188 if not is_prefix_of_longer_match and matches:
189 self._call_handler(matches[-1], key_sequence=buffer[:])
190 del buffer[:] # Keep reference.
191
192 # No match found.
193 elif not is_prefix_of_longer_match and not matches:
194 retry = True
195 found = False
196
197 # Loop over the input, try longest match first and shift.
198 for i in range(len(buffer), 0, -1):
199 matches = self._get_matches(buffer[:i])
200 if matches:
201 self._call_handler(matches[-1], key_sequence=buffer[:i])
202 del buffer[:i]
203 found = True
204 break
205
206 if not found:
207 del buffer[:1]
208
209 def feed(self, key_press: KeyPress, first: bool = False) -> None:
210 """
211 Add a new :class:`KeyPress` to the input queue.
212 (Don't forget to call `process_keys` in order to process the queue.)
213
214 :param first: If true, insert before everything else.
215 """
216 if first:
217 self.input_queue.appendleft(key_press)
218 else:
219 self.input_queue.append(key_press)
220
221 def feed_multiple(self, key_presses: list[KeyPress], first: bool = False) -> None:
222 """
223 :param first: If true, insert before everything else.
224 """
225 if first:
226 self.input_queue.extendleft(reversed(key_presses))
227 else:
228 self.input_queue.extend(key_presses)
229
230 def process_keys(self) -> None:
231 """
232 Process all the keys in the `input_queue`.
233 (To be called after `feed`.)
234
235 Note: because of the `feed`/`process_keys` separation, it is
236 possible to call `feed` from inside a key binding.
237 This function keeps looping until the queue is empty.
238 """
239 app = get_app()
240
241 def not_empty() -> bool:
242 # When the application result is set, stop processing keys. (E.g.
243 # if ENTER was received, followed by a few additional key strokes,
244 # leave the other keys in the queue.)
245 if app.is_done:
246 # But if there are still CPRResponse keys in the queue, these
247 # need to be processed.
248 return any(k for k in self.input_queue if k.key == Keys.CPRResponse)
249 else:
250 return bool(self.input_queue)
251
252 def get_next() -> KeyPress:
253 if app.is_done:
254 # Only process CPR responses. Everything else is typeahead.
255 cpr = [k for k in self.input_queue if k.key == Keys.CPRResponse][0]
256 self.input_queue.remove(cpr)
257 return cpr
258 else:
259 return self.input_queue.popleft()
260
261 is_flush = False
262
263 while not_empty():
264 # Process next key.
265 key_press = get_next()
266
267 is_flush = key_press is _Flush
268 is_cpr = key_press.key == Keys.CPRResponse
269
270 if not is_flush and not is_cpr:
271 self.before_key_press.fire()
272
273 try:
274 self._process_coroutine.send(key_press)
275 except Exception:
276 # If for some reason something goes wrong in the parser, (maybe
277 # an exception was raised) restart the processor for next time.
278 self.reset()
279 self.empty_queue()
280 raise
281
282 if not is_flush and not is_cpr:
283 self.after_key_press.fire()
284
285 # Skip timeout if the last key was flush.
286 if not is_flush:
287 self._start_timeout()
288
289 def empty_queue(self) -> list[KeyPress]:
290 """
291 Empty the input queue. Return the unprocessed input.
292 """
293 key_presses = list(self.input_queue)
294 self.input_queue.clear()
295
296 # Filter out CPRs. We don't want to return these.
297 key_presses = [k for k in key_presses if k.key != Keys.CPRResponse]
298 return key_presses
299
300 def _call_handler(self, handler: Binding, key_sequence: list[KeyPress]) -> None:
301 app = get_app()
302 was_recording_emacs = app.emacs_state.is_recording
303 was_recording_vi = bool(app.vi_state.recording_register)
304 was_temporary_navigation_mode = app.vi_state.temporary_navigation_mode
305 arg = self.arg
306 self.arg = None
307
308 event = KeyPressEvent(
309 weakref.ref(self),
310 arg=arg,
311 key_sequence=key_sequence,
312 previous_key_sequence=self._previous_key_sequence,
313 is_repeat=(handler == self._previous_handler),
314 )
315
316 # Save the state of the current buffer.
317 if handler.save_before(event):
318 event.app.current_buffer.save_to_undo_stack()
319
320 # Call handler.
321 from prompt_toolkit.buffer import EditReadOnlyBuffer
322
323 try:
324 handler.call(event)
325 self._fix_vi_cursor_position(event)
326
327 except EditReadOnlyBuffer:
328 # When a key binding does an attempt to change a buffer which is
329 # read-only, we can ignore that. We sound a bell and go on.
330 app.output.bell()
331
332 if was_temporary_navigation_mode:
333 self._leave_vi_temp_navigation_mode(event)
334
335 self._previous_key_sequence = key_sequence
336 self._previous_handler = handler
337
338 # Record the key sequence in our macro. (Only if we're in macro mode
339 # before and after executing the key.)
340 if handler.record_in_macro():
341 if app.emacs_state.is_recording and was_recording_emacs:
342 recording = app.emacs_state.current_recording
343 if recording is not None: # Should always be true, given that
344 # `was_recording_emacs` is set.
345 recording.extend(key_sequence)
346
347 if app.vi_state.recording_register and was_recording_vi:
348 for k in key_sequence:
349 app.vi_state.current_recording += k.data
350
351 def _fix_vi_cursor_position(self, event: KeyPressEvent) -> None:
352 """
353 After every command, make sure that if we are in Vi navigation mode, we
354 never put the cursor after the last character of a line. (Unless it's
355 an empty line.)
356 """
357 app = event.app
358 buff = app.current_buffer
359 preferred_column = buff.preferred_column
360
361 if (
362 vi_navigation_mode()
363 and buff.document.is_cursor_at_the_end_of_line
364 and len(buff.document.current_line) > 0
365 ):
366 buff.cursor_position -= 1
367
368 # Set the preferred_column for arrow up/down again.
369 # (This was cleared after changing the cursor position.)
370 buff.preferred_column = preferred_column
371
372 def _leave_vi_temp_navigation_mode(self, event: KeyPressEvent) -> None:
373 """
374 If we're in Vi temporary navigation (normal) mode, return to
375 insert/replace mode after executing one action.
376 """
377 app = event.app
378
379 if app.editing_mode == EditingMode.VI:
380 # Not waiting for a text object and no argument has been given.
381 if app.vi_state.operator_func is None and self.arg is None:
382 app.vi_state.temporary_navigation_mode = False
383
384 def _start_timeout(self) -> None:
385 """
386 Start auto flush timeout. Similar to Vim's `timeoutlen` option.
387
388 Start a background coroutine with a timer. When this timeout expires
389 and no key was pressed in the meantime, we flush all data in the queue
390 and call the appropriate key binding handlers.
391 """
392 app = get_app()
393 timeout = app.timeoutlen
394
395 if timeout is None:
396 return
397
398 async def wait() -> None:
399 "Wait for timeout."
400 # This sleep can be cancelled. In that case we don't flush.
401 await sleep(timeout)
402
403 if len(self.key_buffer) > 0:
404 # (No keys pressed in the meantime.)
405 flush_keys()
406
407 def flush_keys() -> None:
408 "Flush keys."
409 self.feed(_Flush)
410 self.process_keys()
411
412 # Automatically flush keys.
413 if self._flush_wait_task:
414 self._flush_wait_task.cancel()
415 self._flush_wait_task = app.create_background_task(wait())
416
417 def send_sigint(self) -> None:
418 """
419 Send SIGINT. Immediately call the SIGINT key handler.
420 """
421 self.feed(KeyPress(key=Keys.SIGINT), first=True)
422 self.process_keys()
423
424
425class KeyPressEvent:
426 """
427 Key press event, delivered to key bindings.
428
429 :param key_processor_ref: Weak reference to the `KeyProcessor`.
430 :param arg: Repetition argument.
431 :param key_sequence: List of `KeyPress` instances.
432 :param previouskey_sequence: Previous list of `KeyPress` instances.
433 :param is_repeat: True when the previous event was delivered to the same handler.
434 """
435
436 def __init__(
437 self,
438 key_processor_ref: weakref.ReferenceType[KeyProcessor],
439 arg: str | None,
440 key_sequence: list[KeyPress],
441 previous_key_sequence: list[KeyPress],
442 is_repeat: bool,
443 ) -> None:
444 self._key_processor_ref = key_processor_ref
445 self.key_sequence = key_sequence
446 self.previous_key_sequence = previous_key_sequence
447
448 #: True when the previous key sequence was handled by the same handler.
449 self.is_repeat = is_repeat
450
451 self._arg = arg
452 self._app = get_app()
453
454 def __repr__(self) -> str:
455 return f"KeyPressEvent(arg={self.arg!r}, key_sequence={self.key_sequence!r}, is_repeat={self.is_repeat!r})"
456
457 @property
458 def data(self) -> str:
459 return self.key_sequence[-1].data
460
461 @property
462 def key_processor(self) -> KeyProcessor:
463 processor = self._key_processor_ref()
464 if processor is None:
465 raise Exception("KeyProcessor was lost. This should not happen.")
466 return processor
467
468 @property
469 def app(self) -> Application[Any]:
470 """
471 The current `Application` object.
472 """
473 return self._app
474
475 @property
476 def current_buffer(self) -> Buffer:
477 """
478 The current buffer.
479 """
480 return self.app.current_buffer
481
482 @property
483 def arg(self) -> int:
484 """
485 Repetition argument.
486 """
487 if self._arg == "-":
488 return -1
489
490 result = int(self._arg or 1)
491
492 # Don't exceed a million.
493 if int(result) >= 1000000:
494 result = 1
495
496 return result
497
498 @property
499 def arg_present(self) -> bool:
500 """
501 True if repetition argument was explicitly provided.
502 """
503 return self._arg is not None
504
505 def append_to_arg_count(self, data: str) -> None:
506 """
507 Add digit to the input argument.
508
509 :param data: the typed digit as string
510 """
511 assert data in "-0123456789"
512 current = self._arg
513
514 if data == "-":
515 assert current is None or current == "-"
516 result = data
517 elif current is None:
518 result = data
519 else:
520 result = f"{current}{data}"
521
522 self.key_processor.arg = result
523
524 @property
525 def cli(self) -> Application[Any]:
526 "For backward-compatibility."
527 return self.app