Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/key_binding/key_processor.py: 23%
236 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1"""
2An :class:`~.KeyProcessor` receives callbacks for the keystrokes parsed from
3the input in the :class:`~prompt_toolkit.inputstream.InputStream` instance.
5The `KeyProcessor` will according to the implemented keybindings call the
6correct callbacks when new key presses are feed through `feed`.
7"""
8from __future__ import annotations
10import weakref
11from asyncio import Task, sleep
12from collections import deque
13from typing import TYPE_CHECKING, Any, Generator
15from prompt_toolkit.application.current import get_app
16from prompt_toolkit.enums import EditingMode
17from prompt_toolkit.filters.app import vi_navigation_mode
18from prompt_toolkit.keys import Keys
19from prompt_toolkit.utils import Event
21from .key_bindings import Binding, KeyBindingsBase
23if TYPE_CHECKING:
24 from prompt_toolkit.application import Application
25 from prompt_toolkit.buffer import Buffer
28__all__ = [
29 "KeyProcessor",
30 "KeyPress",
31 "KeyPressEvent",
32]
35class KeyPress:
36 """
37 :param key: A `Keys` instance or text (one character).
38 :param data: The received string on stdin. (Often vt100 escape codes.)
39 """
41 def __init__(self, key: Keys | str, data: str | None = None) -> None:
42 assert isinstance(key, Keys) or len(key) == 1
44 if data is None:
45 if isinstance(key, Keys):
46 data = key.value
47 else:
48 data = key # 'key' is a one character string.
50 self.key = key
51 self.data = data
53 def __repr__(self) -> str:
54 return f"{self.__class__.__name__}(key={self.key!r}, data={self.data!r})"
56 def __eq__(self, other: object) -> bool:
57 if not isinstance(other, KeyPress):
58 return False
59 return self.key == other.key and self.data == other.data
62"""
63Helper object to indicate flush operation in the KeyProcessor.
64NOTE: the implementation is very similar to the VT100 parser.
65"""
66_Flush = KeyPress("?", data="_Flush")
69class KeyProcessor:
70 """
71 Statemachine that receives :class:`KeyPress` instances and according to the
72 key bindings in the given :class:`KeyBindings`, calls the matching handlers.
74 ::
76 p = KeyProcessor(key_bindings)
78 # Send keys into the processor.
79 p.feed(KeyPress(Keys.ControlX, '\x18'))
80 p.feed(KeyPress(Keys.ControlC, '\x03')
82 # Process all the keys in the queue.
83 p.process_keys()
85 # Now the ControlX-ControlC callback will be called if this sequence is
86 # registered in the key bindings.
88 :param key_bindings: `KeyBindingsBase` instance.
89 """
91 def __init__(self, key_bindings: KeyBindingsBase) -> None:
92 self._bindings = key_bindings
94 self.before_key_press = Event(self)
95 self.after_key_press = Event(self)
97 self._flush_wait_task: Task[None] | None = None
99 self.reset()
101 def reset(self) -> None:
102 self._previous_key_sequence: list[KeyPress] = []
103 self._previous_handler: Binding | None = None
105 # The queue of keys not yet send to our _process generator/state machine.
106 self.input_queue: deque[KeyPress] = deque()
108 # The key buffer that is matched in the generator state machine.
109 # (This is at at most the amount of keys that make up for one key binding.)
110 self.key_buffer: list[KeyPress] = []
112 #: Readline argument (for repetition of commands.)
113 #: https://www.gnu.org/software/bash/manual/html_node/Readline-Arguments.html
114 self.arg: str | None = None
116 # Start the processor coroutine.
117 self._process_coroutine = self._process()
118 self._process_coroutine.send(None) # type: ignore
120 def _get_matches(self, key_presses: list[KeyPress]) -> list[Binding]:
121 """
122 For a list of :class:`KeyPress` instances. Give the matching handlers
123 that would handle this.
124 """
125 keys = tuple(k.key for k in key_presses)
127 # Try match, with mode flag
128 return [b for b in self._bindings.get_bindings_for_keys(keys) if b.filter()]
130 def _is_prefix_of_longer_match(self, key_presses: list[KeyPress]) -> bool:
131 """
132 For a list of :class:`KeyPress` instances. Return True if there is any
133 handler that is bound to a suffix of this keys.
134 """
135 keys = tuple(k.key for k in key_presses)
137 # Get the filters for all the key bindings that have a longer match.
138 # Note that we transform it into a `set`, because we don't care about
139 # the actual bindings and executing it more than once doesn't make
140 # sense. (Many key bindings share the same filter.)
141 filters = {
142 b.filter for b in self._bindings.get_bindings_starting_with_keys(keys)
143 }
145 # When any key binding is active, return True.
146 return any(f() for f in filters)
148 def _process(self) -> Generator[None, KeyPress, None]:
149 """
150 Coroutine implementing the key match algorithm. Key strokes are sent
151 into this generator, and it calls the appropriate handlers.
152 """
153 buffer = self.key_buffer
154 retry = False
156 while True:
157 flush = False
159 if retry:
160 retry = False
161 else:
162 key = yield
163 if key is _Flush:
164 flush = True
165 else:
166 buffer.append(key)
168 # If we have some key presses, check for matches.
169 if buffer:
170 matches = self._get_matches(buffer)
172 if flush:
173 is_prefix_of_longer_match = False
174 else:
175 is_prefix_of_longer_match = self._is_prefix_of_longer_match(buffer)
177 # When eager matches were found, give priority to them and also
178 # ignore all the longer matches.
179 eager_matches = [m for m in matches if m.eager()]
181 if eager_matches:
182 matches = eager_matches
183 is_prefix_of_longer_match = False
185 # Exact matches found, call handler.
186 if not is_prefix_of_longer_match and matches:
187 self._call_handler(matches[-1], key_sequence=buffer[:])
188 del buffer[:] # Keep reference.
190 # No match found.
191 elif not is_prefix_of_longer_match and not matches:
192 retry = True
193 found = False
195 # Loop over the input, try longest match first and shift.
196 for i in range(len(buffer), 0, -1):
197 matches = self._get_matches(buffer[:i])
198 if matches:
199 self._call_handler(matches[-1], key_sequence=buffer[:i])
200 del buffer[:i]
201 found = True
202 break
204 if not found:
205 del buffer[:1]
207 def feed(self, key_press: KeyPress, first: bool = False) -> None:
208 """
209 Add a new :class:`KeyPress` to the input queue.
210 (Don't forget to call `process_keys` in order to process the queue.)
212 :param first: If true, insert before everything else.
213 """
214 if first:
215 self.input_queue.appendleft(key_press)
216 else:
217 self.input_queue.append(key_press)
219 def feed_multiple(self, key_presses: list[KeyPress], first: bool = False) -> None:
220 """
221 :param first: If true, insert before everything else.
222 """
223 if first:
224 self.input_queue.extendleft(reversed(key_presses))
225 else:
226 self.input_queue.extend(key_presses)
228 def process_keys(self) -> None:
229 """
230 Process all the keys in the `input_queue`.
231 (To be called after `feed`.)
233 Note: because of the `feed`/`process_keys` separation, it is
234 possible to call `feed` from inside a key binding.
235 This function keeps looping until the queue is empty.
236 """
237 app = get_app()
239 def not_empty() -> bool:
240 # When the application result is set, stop processing keys. (E.g.
241 # if ENTER was received, followed by a few additional key strokes,
242 # leave the other keys in the queue.)
243 if app.is_done:
244 # But if there are still CPRResponse keys in the queue, these
245 # need to be processed.
246 return any(k for k in self.input_queue if k.key == Keys.CPRResponse)
247 else:
248 return bool(self.input_queue)
250 def get_next() -> KeyPress:
251 if app.is_done:
252 # Only process CPR responses. Everything else is typeahead.
253 cpr = [k for k in self.input_queue if k.key == Keys.CPRResponse][0]
254 self.input_queue.remove(cpr)
255 return cpr
256 else:
257 return self.input_queue.popleft()
259 is_flush = False
261 while not_empty():
262 # Process next key.
263 key_press = get_next()
265 is_flush = key_press is _Flush
266 is_cpr = key_press.key == Keys.CPRResponse
268 if not is_flush and not is_cpr:
269 self.before_key_press.fire()
271 try:
272 self._process_coroutine.send(key_press)
273 except Exception:
274 # If for some reason something goes wrong in the parser, (maybe
275 # an exception was raised) restart the processor for next time.
276 self.reset()
277 self.empty_queue()
278 raise
280 if not is_flush and not is_cpr:
281 self.after_key_press.fire()
283 # Skip timeout if the last key was flush.
284 if not is_flush:
285 self._start_timeout()
287 def empty_queue(self) -> list[KeyPress]:
288 """
289 Empty the input queue. Return the unprocessed input.
290 """
291 key_presses = list(self.input_queue)
292 self.input_queue.clear()
294 # Filter out CPRs. We don't want to return these.
295 key_presses = [k for k in key_presses if k.key != Keys.CPRResponse]
296 return key_presses
298 def _call_handler(self, handler: Binding, key_sequence: list[KeyPress]) -> None:
299 app = get_app()
300 was_recording_emacs = app.emacs_state.is_recording
301 was_recording_vi = bool(app.vi_state.recording_register)
302 was_temporary_navigation_mode = app.vi_state.temporary_navigation_mode
303 arg = self.arg
304 self.arg = None
306 event = KeyPressEvent(
307 weakref.ref(self),
308 arg=arg,
309 key_sequence=key_sequence,
310 previous_key_sequence=self._previous_key_sequence,
311 is_repeat=(handler == self._previous_handler),
312 )
314 # Save the state of the current buffer.
315 if handler.save_before(event):
316 event.app.current_buffer.save_to_undo_stack()
318 # Call handler.
319 from prompt_toolkit.buffer import EditReadOnlyBuffer
321 try:
322 handler.call(event)
323 self._fix_vi_cursor_position(event)
325 except EditReadOnlyBuffer:
326 # When a key binding does an attempt to change a buffer which is
327 # read-only, we can ignore that. We sound a bell and go on.
328 app.output.bell()
330 if was_temporary_navigation_mode:
331 self._leave_vi_temp_navigation_mode(event)
333 self._previous_key_sequence = key_sequence
334 self._previous_handler = handler
336 # Record the key sequence in our macro. (Only if we're in macro mode
337 # before and after executing the key.)
338 if handler.record_in_macro():
339 if app.emacs_state.is_recording and was_recording_emacs:
340 recording = app.emacs_state.current_recording
341 if recording is not None: # Should always be true, given that
342 # `was_recording_emacs` is set.
343 recording.extend(key_sequence)
345 if app.vi_state.recording_register and was_recording_vi:
346 for k in key_sequence:
347 app.vi_state.current_recording += k.data
349 def _fix_vi_cursor_position(self, event: KeyPressEvent) -> None:
350 """
351 After every command, make sure that if we are in Vi navigation mode, we
352 never put the cursor after the last character of a line. (Unless it's
353 an empty line.)
354 """
355 app = event.app
356 buff = app.current_buffer
357 preferred_column = buff.preferred_column
359 if (
360 vi_navigation_mode()
361 and buff.document.is_cursor_at_the_end_of_line
362 and len(buff.document.current_line) > 0
363 ):
364 buff.cursor_position -= 1
366 # Set the preferred_column for arrow up/down again.
367 # (This was cleared after changing the cursor position.)
368 buff.preferred_column = preferred_column
370 def _leave_vi_temp_navigation_mode(self, event: KeyPressEvent) -> None:
371 """
372 If we're in Vi temporary navigation (normal) mode, return to
373 insert/replace mode after executing one action.
374 """
375 app = event.app
377 if app.editing_mode == EditingMode.VI:
378 # Not waiting for a text object and no argument has been given.
379 if app.vi_state.operator_func is None and self.arg is None:
380 app.vi_state.temporary_navigation_mode = False
382 def _start_timeout(self) -> None:
383 """
384 Start auto flush timeout. Similar to Vim's `timeoutlen` option.
386 Start a background coroutine with a timer. When this timeout expires
387 and no key was pressed in the meantime, we flush all data in the queue
388 and call the appropriate key binding handlers.
389 """
390 app = get_app()
391 timeout = app.timeoutlen
393 if timeout is None:
394 return
396 async def wait() -> None:
397 "Wait for timeout."
398 # This sleep can be cancelled. In that case we don't flush.
399 await sleep(timeout)
401 if len(self.key_buffer) > 0:
402 # (No keys pressed in the meantime.)
403 flush_keys()
405 def flush_keys() -> None:
406 "Flush keys."
407 self.feed(_Flush)
408 self.process_keys()
410 # Automatically flush keys.
411 if self._flush_wait_task:
412 self._flush_wait_task.cancel()
413 self._flush_wait_task = app.create_background_task(wait())
415 def send_sigint(self) -> None:
416 """
417 Send SIGINT. Immediately call the SIGINT key handler.
418 """
419 self.feed(KeyPress(key=Keys.SIGINT), first=True)
420 self.process_keys()
423class KeyPressEvent:
424 """
425 Key press event, delivered to key bindings.
427 :param key_processor_ref: Weak reference to the `KeyProcessor`.
428 :param arg: Repetition argument.
429 :param key_sequence: List of `KeyPress` instances.
430 :param previouskey_sequence: Previous list of `KeyPress` instances.
431 :param is_repeat: True when the previous event was delivered to the same handler.
432 """
434 def __init__(
435 self,
436 key_processor_ref: weakref.ReferenceType[KeyProcessor],
437 arg: str | None,
438 key_sequence: list[KeyPress],
439 previous_key_sequence: list[KeyPress],
440 is_repeat: bool,
441 ) -> None:
442 self._key_processor_ref = key_processor_ref
443 self.key_sequence = key_sequence
444 self.previous_key_sequence = previous_key_sequence
446 #: True when the previous key sequence was handled by the same handler.
447 self.is_repeat = is_repeat
449 self._arg = arg
450 self._app = get_app()
452 def __repr__(self) -> str:
453 return "KeyPressEvent(arg={!r}, key_sequence={!r}, is_repeat={!r})".format(
454 self.arg,
455 self.key_sequence,
456 self.is_repeat,
457 )
459 @property
460 def data(self) -> str:
461 return self.key_sequence[-1].data
463 @property
464 def key_processor(self) -> KeyProcessor:
465 processor = self._key_processor_ref()
466 if processor is None:
467 raise Exception("KeyProcessor was lost. This should not happen.")
468 return processor
470 @property
471 def app(self) -> Application[Any]:
472 """
473 The current `Application` object.
474 """
475 return self._app
477 @property
478 def current_buffer(self) -> Buffer:
479 """
480 The current buffer.
481 """
482 return self.app.current_buffer
484 @property
485 def arg(self) -> int:
486 """
487 Repetition argument.
488 """
489 if self._arg == "-":
490 return -1
492 result = int(self._arg or 1)
494 # Don't exceed a million.
495 if int(result) >= 1000000:
496 result = 1
498 return result
500 @property
501 def arg_present(self) -> bool:
502 """
503 True if repetition argument was explicitly provided.
504 """
505 return self._arg is not None
507 def append_to_arg_count(self, data: str) -> None:
508 """
509 Add digit to the input argument.
511 :param data: the typed digit as string
512 """
513 assert data in "-0123456789"
514 current = self._arg
516 if data == "-":
517 assert current is None or current == "-"
518 result = data
519 elif current is None:
520 result = data
521 else:
522 result = f"{current}{data}"
524 self.key_processor.arg = result
526 @property
527 def cli(self) -> Application[Any]:
528 "For backward-compatibility."
529 return self.app