1"""
2An :class:`~.KeyProcessor` receives callbacks for the keystrokes parsed from
3the input in the :class:`~prompt_toolkit.inputstream.InputStream` instance.
4
5The `KeyProcessor` will according to the implemented keybindings call the
6correct callbacks when new key presses are feed through `feed`.
7"""
8
9from __future__ import annotations
10
11import weakref
12from asyncio import Task, sleep
13from collections import deque
14from typing import TYPE_CHECKING, Any, Generator
15
16from prompt_toolkit.application.current import get_app
17from prompt_toolkit.enums import EditingMode
18from prompt_toolkit.filters.app import vi_navigation_mode
19from prompt_toolkit.keys import Keys
20from prompt_toolkit.utils import Event
21
22from .key_bindings import Binding, KeyBindingsBase
23
24if TYPE_CHECKING:
25 from prompt_toolkit.application import Application
26 from prompt_toolkit.buffer import Buffer
27
28
29__all__ = [
30 "KeyProcessor",
31 "KeyPress",
32 "KeyPressEvent",
33]
34
35
36class KeyPress:
37 """
38 :param key: A `Keys` instance or text (one character).
39 :param data: The received string on stdin. (Often vt100 escape codes.)
40 """
41
42 def __init__(self, key: Keys | str, data: str | None = None) -> None:
43 assert isinstance(key, Keys) or len(key) == 1
44
45 if data is None:
46 if isinstance(key, Keys):
47 data = key.value
48 else:
49 data = key # 'key' is a one character string.
50
51 self.key = key
52 self.data = data
53
54 def __repr__(self) -> str:
55 return f"{self.__class__.__name__}(key={self.key!r}, data={self.data!r})"
56
57 def __eq__(self, other: object) -> bool:
58 if not isinstance(other, KeyPress):
59 return False
60 return self.key == other.key and self.data == other.data
61
62
63"""
64Helper object to indicate flush operation in the KeyProcessor.
65NOTE: the implementation is very similar to the VT100 parser.
66"""
67_Flush = KeyPress("?", data="_Flush")
68
69
70class KeyProcessor:
71 """
72 Statemachine that receives :class:`KeyPress` instances and according to the
73 key bindings in the given :class:`KeyBindings`, calls the matching handlers.
74
75 ::
76
77 p = KeyProcessor(key_bindings)
78
79 # Send keys into the processor.
80 p.feed(KeyPress(Keys.ControlX, '\x18'))
81 p.feed(KeyPress(Keys.ControlC, '\x03')
82
83 # Process all the keys in the queue.
84 p.process_keys()
85
86 # Now the ControlX-ControlC callback will be called if this sequence is
87 # registered in the key bindings.
88
89 :param key_bindings: `KeyBindingsBase` instance.
90 """
91
92 def __init__(self, key_bindings: KeyBindingsBase) -> None:
93 self._bindings = key_bindings
94
95 self.before_key_press = Event(self)
96 self.after_key_press = Event(self)
97
98 self._flush_wait_task: Task[None] | None = None
99
100 self.reset()
101
102 def reset(self) -> None:
103 self._previous_key_sequence: list[KeyPress] = []
104 self._previous_handler: Binding | None = None
105
106 # The queue of keys not yet send to our _process generator/state machine.
107 self.input_queue: deque[KeyPress] = deque()
108
109 # The key buffer that is matched in the generator state machine.
110 # (This is at at most the amount of keys that make up for one key binding.)
111 self.key_buffer: list[KeyPress] = []
112
113 #: Readline argument (for repetition of commands.)
114 #: https://www.gnu.org/software/bash/manual/html_node/Readline-Arguments.html
115 self.arg: str | None = None
116
117 # Start the processor coroutine.
118 self._process_coroutine = self._process()
119 self._process_coroutine.send(None) # type: ignore
120
121 def _get_matches(self, key_presses: list[KeyPress]) -> list[Binding]:
122 """
123 For a list of :class:`KeyPress` instances. Give the matching handlers
124 that would handle this.
125 """
126 keys = tuple(k.key for k in key_presses)
127
128 # Try match, with mode flag
129 return [b for b in self._bindings.get_bindings_for_keys(keys) if b.filter()]
130
131 def _is_prefix_of_longer_match(self, key_presses: list[KeyPress]) -> bool:
132 """
133 For a list of :class:`KeyPress` instances. Return True if there is any
134 handler that is bound to a suffix of this keys.
135 """
136 keys = tuple(k.key for k in key_presses)
137
138 # Get the filters for all the key bindings that have a longer match.
139 # Note that we transform it into a `set`, because we don't care about
140 # the actual bindings and executing it more than once doesn't make
141 # sense. (Many key bindings share the same filter.)
142 filters = {
143 b.filter for b in self._bindings.get_bindings_starting_with_keys(keys)
144 }
145
146 # When any key binding is active, return True.
147 return any(f() for f in filters)
148
149 def _process(self) -> Generator[None, KeyPress, None]:
150 """
151 Coroutine implementing the key match algorithm. Key strokes are sent
152 into this generator, and it calls the appropriate handlers.
153 """
154 buffer = self.key_buffer
155 retry = False
156
157 while True:
158 flush = False
159
160 if retry:
161 retry = False
162 else:
163 key = yield
164 if key is _Flush:
165 flush = True
166 else:
167 buffer.append(key)
168
169 # If we have some key presses, check for matches.
170 if buffer:
171 matches = self._get_matches(buffer)
172
173 if flush:
174 is_prefix_of_longer_match = False
175 else:
176 is_prefix_of_longer_match = self._is_prefix_of_longer_match(buffer)
177
178 # When eager matches were found, give priority to them and also
179 # ignore all the longer matches.
180 eager_matches = [m for m in matches if m.eager()]
181
182 if eager_matches:
183 matches = eager_matches
184 is_prefix_of_longer_match = False
185
186 # Exact matches found, call handler.
187 if not is_prefix_of_longer_match and matches:
188 self._call_handler(matches[-1], key_sequence=buffer[:])
189 del buffer[:] # Keep reference.
190
191 # No match found.
192 elif not is_prefix_of_longer_match and not matches:
193 retry = True
194 found = False
195
196 # Loop over the input, try longest match first and shift.
197 for i in range(len(buffer), 0, -1):
198 matches = self._get_matches(buffer[:i])
199 if matches:
200 self._call_handler(matches[-1], key_sequence=buffer[:i])
201 del buffer[:i]
202 found = True
203 break
204
205 if not found:
206 del buffer[:1]
207
208 def feed(self, key_press: KeyPress, first: bool = False) -> None:
209 """
210 Add a new :class:`KeyPress` to the input queue.
211 (Don't forget to call `process_keys` in order to process the queue.)
212
213 :param first: If true, insert before everything else.
214 """
215 if first:
216 self.input_queue.appendleft(key_press)
217 else:
218 self.input_queue.append(key_press)
219
220 def feed_multiple(self, key_presses: list[KeyPress], first: bool = False) -> None:
221 """
222 :param first: If true, insert before everything else.
223 """
224 if first:
225 self.input_queue.extendleft(reversed(key_presses))
226 else:
227 self.input_queue.extend(key_presses)
228
229 def process_keys(self) -> None:
230 """
231 Process all the keys in the `input_queue`.
232 (To be called after `feed`.)
233
234 Note: because of the `feed`/`process_keys` separation, it is
235 possible to call `feed` from inside a key binding.
236 This function keeps looping until the queue is empty.
237 """
238 app = get_app()
239
240 def not_empty() -> bool:
241 # When the application result is set, stop processing keys. (E.g.
242 # if ENTER was received, followed by a few additional key strokes,
243 # leave the other keys in the queue.)
244 if app.is_done:
245 # But if there are still CPRResponse keys in the queue, these
246 # need to be processed.
247 return any(k for k in self.input_queue if k.key == Keys.CPRResponse)
248 else:
249 return bool(self.input_queue)
250
251 def get_next() -> KeyPress:
252 if app.is_done:
253 # Only process CPR responses. Everything else is typeahead.
254 cpr = [k for k in self.input_queue if k.key == Keys.CPRResponse][0]
255 self.input_queue.remove(cpr)
256 return cpr
257 else:
258 return self.input_queue.popleft()
259
260 is_flush = False
261
262 while not_empty():
263 # Process next key.
264 key_press = get_next()
265
266 is_flush = key_press is _Flush
267 is_cpr = key_press.key == Keys.CPRResponse
268
269 if not is_flush and not is_cpr:
270 self.before_key_press.fire()
271
272 try:
273 self._process_coroutine.send(key_press)
274 except Exception:
275 # If for some reason something goes wrong in the parser, (maybe
276 # an exception was raised) restart the processor for next time.
277 self.reset()
278 self.empty_queue()
279 raise
280
281 if not is_flush and not is_cpr:
282 self.after_key_press.fire()
283
284 # Skip timeout if the last key was flush.
285 if not is_flush:
286 self._start_timeout()
287
288 def empty_queue(self) -> list[KeyPress]:
289 """
290 Empty the input queue. Return the unprocessed input.
291 """
292 key_presses = list(self.input_queue)
293 self.input_queue.clear()
294
295 # Filter out CPRs. We don't want to return these.
296 key_presses = [k for k in key_presses if k.key != Keys.CPRResponse]
297 return key_presses
298
299 def _call_handler(self, handler: Binding, key_sequence: list[KeyPress]) -> None:
300 app = get_app()
301 was_recording_emacs = app.emacs_state.is_recording
302 was_recording_vi = bool(app.vi_state.recording_register)
303 was_temporary_navigation_mode = app.vi_state.temporary_navigation_mode
304 arg = self.arg
305 self.arg = None
306
307 event = KeyPressEvent(
308 weakref.ref(self),
309 arg=arg,
310 key_sequence=key_sequence,
311 previous_key_sequence=self._previous_key_sequence,
312 is_repeat=(handler == self._previous_handler),
313 )
314
315 # Save the state of the current buffer.
316 if handler.save_before(event):
317 event.app.current_buffer.save_to_undo_stack()
318
319 # Call handler.
320 from prompt_toolkit.buffer import EditReadOnlyBuffer
321
322 try:
323 handler.call(event)
324 self._fix_vi_cursor_position(event)
325
326 except EditReadOnlyBuffer:
327 # When a key binding does an attempt to change a buffer which is
328 # read-only, we can ignore that. We sound a bell and go on.
329 app.output.bell()
330
331 if was_temporary_navigation_mode:
332 self._leave_vi_temp_navigation_mode(event)
333
334 self._previous_key_sequence = key_sequence
335 self._previous_handler = handler
336
337 # Record the key sequence in our macro. (Only if we're in macro mode
338 # before and after executing the key.)
339 if handler.record_in_macro():
340 if app.emacs_state.is_recording and was_recording_emacs:
341 recording = app.emacs_state.current_recording
342 if recording is not None: # Should always be true, given that
343 # `was_recording_emacs` is set.
344 recording.extend(key_sequence)
345
346 if app.vi_state.recording_register and was_recording_vi:
347 for k in key_sequence:
348 app.vi_state.current_recording += k.data
349
350 def _fix_vi_cursor_position(self, event: KeyPressEvent) -> None:
351 """
352 After every command, make sure that if we are in Vi navigation mode, we
353 never put the cursor after the last character of a line. (Unless it's
354 an empty line.)
355 """
356 app = event.app
357 buff = app.current_buffer
358 preferred_column = buff.preferred_column
359
360 if (
361 vi_navigation_mode()
362 and buff.document.is_cursor_at_the_end_of_line
363 and len(buff.document.current_line) > 0
364 ):
365 buff.cursor_position -= 1
366
367 # Set the preferred_column for arrow up/down again.
368 # (This was cleared after changing the cursor position.)
369 buff.preferred_column = preferred_column
370
371 def _leave_vi_temp_navigation_mode(self, event: KeyPressEvent) -> None:
372 """
373 If we're in Vi temporary navigation (normal) mode, return to
374 insert/replace mode after executing one action.
375 """
376 app = event.app
377
378 if app.editing_mode == EditingMode.VI:
379 # Not waiting for a text object and no argument has been given.
380 if app.vi_state.operator_func is None and self.arg is None:
381 app.vi_state.temporary_navigation_mode = False
382
383 def _start_timeout(self) -> None:
384 """
385 Start auto flush timeout. Similar to Vim's `timeoutlen` option.
386
387 Start a background coroutine with a timer. When this timeout expires
388 and no key was pressed in the meantime, we flush all data in the queue
389 and call the appropriate key binding handlers.
390 """
391 app = get_app()
392 timeout = app.timeoutlen
393
394 if timeout is None:
395 return
396
397 async def wait() -> None:
398 "Wait for timeout."
399 # This sleep can be cancelled. In that case we don't flush.
400 await sleep(timeout)
401
402 if len(self.key_buffer) > 0:
403 # (No keys pressed in the meantime.)
404 flush_keys()
405
406 def flush_keys() -> None:
407 "Flush keys."
408 self.feed(_Flush)
409 self.process_keys()
410
411 # Automatically flush keys.
412 if self._flush_wait_task:
413 self._flush_wait_task.cancel()
414 self._flush_wait_task = app.create_background_task(wait())
415
416 def send_sigint(self) -> None:
417 """
418 Send SIGINT. Immediately call the SIGINT key handler.
419 """
420 self.feed(KeyPress(key=Keys.SIGINT), first=True)
421 self.process_keys()
422
423
424class KeyPressEvent:
425 """
426 Key press event, delivered to key bindings.
427
428 :param key_processor_ref: Weak reference to the `KeyProcessor`.
429 :param arg: Repetition argument.
430 :param key_sequence: List of `KeyPress` instances.
431 :param previouskey_sequence: Previous list of `KeyPress` instances.
432 :param is_repeat: True when the previous event was delivered to the same handler.
433 """
434
435 def __init__(
436 self,
437 key_processor_ref: weakref.ReferenceType[KeyProcessor],
438 arg: str | None,
439 key_sequence: list[KeyPress],
440 previous_key_sequence: list[KeyPress],
441 is_repeat: bool,
442 ) -> None:
443 self._key_processor_ref = key_processor_ref
444 self.key_sequence = key_sequence
445 self.previous_key_sequence = previous_key_sequence
446
447 #: True when the previous key sequence was handled by the same handler.
448 self.is_repeat = is_repeat
449
450 self._arg = arg
451 self._app = get_app()
452
453 def __repr__(self) -> str:
454 return f"KeyPressEvent(arg={self.arg!r}, key_sequence={self.key_sequence!r}, is_repeat={self.is_repeat!r})"
455
456 @property
457 def data(self) -> str:
458 return self.key_sequence[-1].data
459
460 @property
461 def key_processor(self) -> KeyProcessor:
462 processor = self._key_processor_ref()
463 if processor is None:
464 raise Exception("KeyProcessor was lost. This should not happen.")
465 return processor
466
467 @property
468 def app(self) -> Application[Any]:
469 """
470 The current `Application` object.
471 """
472 return self._app
473
474 @property
475 def current_buffer(self) -> Buffer:
476 """
477 The current buffer.
478 """
479 return self.app.current_buffer
480
481 @property
482 def arg(self) -> int:
483 """
484 Repetition argument.
485 """
486 if self._arg == "-":
487 return -1
488
489 result = int(self._arg or 1)
490
491 # Don't exceed a million.
492 if int(result) >= 1000000:
493 result = 1
494
495 return result
496
497 @property
498 def arg_present(self) -> bool:
499 """
500 True if repetition argument was explicitly provided.
501 """
502 return self._arg is not None
503
504 def append_to_arg_count(self, data: str) -> None:
505 """
506 Add digit to the input argument.
507
508 :param data: the typed digit as string
509 """
510 assert data in "-0123456789"
511 current = self._arg
512
513 if data == "-":
514 assert current is None or current == "-"
515 result = data
516 elif current is None:
517 result = data
518 else:
519 result = f"{current}{data}"
520
521 self.key_processor.arg = result
522
523 @property
524 def cli(self) -> Application[Any]:
525 "For backward-compatibility."
526 return self.app