1import re
2import asyncio
3import tokenize
4from io import StringIO
5from typing import List, Optional, Union, Tuple, ClassVar, Any
6from collections.abc import Callable, Generator
7import warnings
8
9import prompt_toolkit
10from prompt_toolkit.buffer import Buffer
11from prompt_toolkit.key_binding import KeyPressEvent
12from prompt_toolkit.key_binding.bindings import named_commands as nc
13from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion
14from prompt_toolkit.document import Document
15from prompt_toolkit.history import History
16from prompt_toolkit.shortcuts import PromptSession
17from prompt_toolkit.layout.processors import (
18 Processor,
19 Transformation,
20 TransformationInput,
21)
22
23from IPython.core.getipython import get_ipython
24from IPython.utils.tokenutil import generate_tokens
25
26from .filters import pass_through
27
28
29def _get_query(document: Document):
30 return document.lines[document.cursor_position_row]
31
32
33class AppendAutoSuggestionInAnyLine(Processor):
34 """
35 Append the auto suggestion to lines other than the last (appending to the
36 last line is natively supported by the prompt toolkit).
37
38 This has a private `_debug` attribute that can be set to True to display
39 debug information as virtual suggestion on the end of any line. You can do
40 so with:
41
42 >>> from IPython.terminal.shortcuts.auto_suggest import AppendAutoSuggestionInAnyLine
43 >>> AppendAutoSuggestionInAnyLine._debug = True
44
45 """
46
47 _debug: ClassVar[bool] = False
48
49 def __init__(self, style: str = "class:auto-suggestion") -> None:
50 self.style = style
51
52 def apply_transformation(self, ti: TransformationInput) -> Transformation:
53 """
54 Apply transformation to the line that is currently being edited.
55
56 This is a variation of the original implementation in prompt toolkit
57 that allows to not only append suggestions to any line, but also to show
58 multi-line suggestions.
59
60 As transformation are applied on a line-by-line basis; we need to trick
61 a bit, and elide any line that is after the line we are currently
62 editing, until we run out of completions. We cannot shift the existing
63 lines
64
65 There are multiple cases to handle:
66
67 The completions ends before the end of the buffer:
68 We can resume showing the normal line, and say that some code may
69 be hidden.
70
71 The completions ends at the end of the buffer
72 We can just say that some code may be hidden.
73
74 And separately:
75
76 The completions ends beyond the end of the buffer
77 We need to both say that some code may be hidden, and that some
78 lines are not shown.
79
80 """
81 last_line_number = ti.document.line_count - 1
82 is_last_line = ti.lineno == last_line_number
83
84 noop = lambda text: Transformation(
85 fragments=ti.fragments + [(self.style, " " + text if self._debug else "")]
86 )
87 if ti.document.line_count == 1:
88 return noop("noop:oneline")
89 if ti.document.cursor_position_row == last_line_number and is_last_line:
90 # prompt toolkit already appends something; just leave it be
91 return noop("noop:last line and cursor")
92
93 # first everything before the current line is unchanged.
94 if ti.lineno < ti.document.cursor_position_row:
95 return noop("noop:before cursor")
96
97 buffer = ti.buffer_control.buffer
98 if not buffer.suggestion or not ti.document.is_cursor_at_the_end_of_line:
99 return noop("noop:not eol")
100
101 delta = ti.lineno - ti.document.cursor_position_row
102 suggestions = buffer.suggestion.text.splitlines()
103
104 if len(suggestions) == 0:
105 return noop("noop: no suggestions")
106
107 if prompt_toolkit.VERSION < (3, 0, 49):
108 if len(suggestions) > 1 and prompt_toolkit.VERSION < (3, 0, 49):
109 if ti.lineno == ti.document.cursor_position_row:
110 return Transformation(
111 fragments=ti.fragments
112 + [
113 (
114 "red",
115 "(Cannot show multiline suggestion; requires prompt_toolkit > 3.0.49)",
116 )
117 ]
118 )
119 else:
120 return Transformation(fragments=ti.fragments)
121 elif len(suggestions) == 1:
122 if ti.lineno == ti.document.cursor_position_row:
123 return Transformation(
124 fragments=ti.fragments + [(self.style, suggestions[0])]
125 )
126 return Transformation(fragments=ti.fragments)
127
128 if delta == 0:
129 suggestion = suggestions[0]
130 return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
131 if is_last_line:
132 if delta < len(suggestions):
133 suggestion = f"… rest of suggestion ({len(suggestions) - delta} lines) and code hidden"
134 return Transformation([(self.style, suggestion)])
135
136 n_elided = len(suggestions)
137 for i in range(len(suggestions)):
138 ll = ti.get_line(last_line_number - i)
139 el = "".join(l[1] for l in ll).strip()
140 if el:
141 break
142 else:
143 n_elided -= 1
144 if n_elided:
145 return Transformation([(self.style, f"… {n_elided} line(s) hidden")])
146 else:
147 return Transformation(
148 ti.get_line(last_line_number - len(suggestions) + 1)
149 + ([(self.style, "shift-last-line")] if self._debug else [])
150 )
151
152 elif delta < len(suggestions):
153 suggestion = suggestions[delta]
154 return Transformation([(self.style, suggestion)])
155 else:
156 shift = ti.lineno - len(suggestions) + 1
157 return Transformation(ti.get_line(shift))
158
159
160class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory):
161 """
162 A subclass of AutoSuggestFromHistory that allow navigation to next/previous
163 suggestion from history. To do so it remembers the current position, but it
164 state need to carefully be cleared on the right events.
165 """
166
167 skip_lines: int
168 _connected_apps: list[PromptSession]
169
170 # handle to the currently running llm task that appends suggestions to the
171 # current buffer; we keep a handle to it in order to cancel it when there is a cursor movement, or
172 # another request.
173 _llm_task: asyncio.Task | None = None
174
175 # This is the constructor of the LLM provider from jupyter-ai
176 # to which we forward the request to generate inline completions.
177 _init_llm_provider: Callable | None
178
179 _llm_provider_instance: Any | None
180 _llm_prefixer: Callable = lambda self, x: "wrong"
181
182 def __init__(self):
183 super().__init__()
184 self.skip_lines = 0
185 self._connected_apps = []
186 self._llm_provider_instance = None
187 self._init_llm_provider = None
188 self._request_number = 0
189
190 def reset_history_position(self, _: Buffer) -> None:
191 self.skip_lines = 0
192
193 def disconnect(self) -> None:
194 self._cancel_running_llm_task()
195 for pt_app in self._connected_apps:
196 text_insert_event = pt_app.default_buffer.on_text_insert
197 text_insert_event.remove_handler(self.reset_history_position)
198
199 def connect(self, pt_app: PromptSession) -> None:
200 self._connected_apps.append(pt_app)
201 # note: `on_text_changed` could be used for a bit different behaviour
202 # on character deletion (i.e. resetting history position on backspace)
203 pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position)
204 pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss)
205
206 def get_suggestion(
207 self, buffer: Buffer, document: Document
208 ) -> Optional[Suggestion]:
209 text = _get_query(document)
210
211 if text.strip():
212 for suggestion, _ in self._find_next_match(
213 text, self.skip_lines, buffer.history
214 ):
215 return Suggestion(suggestion)
216
217 return None
218
219 def _dismiss(self, buffer, *args, **kwargs) -> None:
220 self._cancel_running_llm_task()
221 buffer.suggestion = None
222
223 def _find_match(
224 self, text: str, skip_lines: float, history: History, previous: bool
225 ) -> Generator[Tuple[str, float], None, None]:
226 """
227 text : str
228 Text content to find a match for, the user cursor is most of the
229 time at the end of this text.
230 skip_lines : float
231 number of items to skip in the search, this is used to indicate how
232 far in the list the user has navigated by pressing up or down.
233 The float type is used as the base value is +inf
234 history : History
235 prompt_toolkit History instance to fetch previous entries from.
236 previous : bool
237 Direction of the search, whether we are looking previous match
238 (True), or next match (False).
239
240 Yields
241 ------
242 Tuple with:
243 str:
244 current suggestion.
245 float:
246 will actually yield only ints, which is passed back via skip_lines,
247 which may be a +inf (float)
248
249
250 """
251 line_number = -1
252 for string in reversed(list(history.get_strings())):
253 for line in reversed(string.splitlines()):
254 line_number += 1
255 if not previous and line_number < skip_lines:
256 continue
257 # do not return empty suggestions as these
258 # close the auto-suggestion overlay (and are useless)
259 if line.startswith(text) and len(line) > len(text):
260 yield line[len(text) :], line_number
261 if previous and line_number >= skip_lines:
262 return
263
264 def _find_next_match(
265 self, text: str, skip_lines: float, history: History
266 ) -> Generator[Tuple[str, float], None, None]:
267 return self._find_match(text, skip_lines, history, previous=False)
268
269 def _find_previous_match(self, text: str, skip_lines: float, history: History):
270 return reversed(
271 list(self._find_match(text, skip_lines, history, previous=True))
272 )
273
274 def up(self, query: str, other_than: str, history: History) -> None:
275 self._cancel_running_llm_task()
276 for suggestion, line_number in self._find_next_match(
277 query, self.skip_lines, history
278 ):
279 # if user has history ['very.a', 'very', 'very.b'] and typed 'very'
280 # we want to switch from 'very.b' to 'very.a' because a) if the
281 # suggestion equals current text, prompt-toolkit aborts suggesting
282 # b) user likely would not be interested in 'very' anyways (they
283 # already typed it).
284 if query + suggestion != other_than:
285 self.skip_lines = line_number
286 break
287 else:
288 # no matches found, cycle back to beginning
289 self.skip_lines = 0
290
291 def down(self, query: str, other_than: str, history: History) -> None:
292 self._cancel_running_llm_task()
293 for suggestion, line_number in self._find_previous_match(
294 query, self.skip_lines, history
295 ):
296 if query + suggestion != other_than:
297 self.skip_lines = line_number
298 break
299 else:
300 # no matches found, cycle to end
301 for suggestion, line_number in self._find_previous_match(
302 query, float("Inf"), history
303 ):
304 if query + suggestion != other_than:
305 self.skip_lines = line_number
306 break
307
308 def _cancel_running_llm_task(self) -> None:
309 """
310 Try to cancel the currently running llm_task if exists, and set it to None.
311 """
312 if self._llm_task is not None:
313 if self._llm_task.done():
314 self._llm_task = None
315 return
316 cancelled = self._llm_task.cancel()
317 if cancelled:
318 self._llm_task = None
319 if not cancelled:
320 warnings.warn(
321 "LLM task not cancelled, does your provider support cancellation?"
322 )
323
324 @property
325 def _llm_provider(self):
326 """Lazy-initialized instance of the LLM provider.
327
328 Do not use in the constructor, as `_init_llm_provider` can trigger slow side-effects.
329 """
330 if self._llm_provider_instance is None and self._init_llm_provider:
331 self._llm_provider_instance = self._init_llm_provider()
332 return self._llm_provider_instance
333
334 async def _trigger_llm(self, buffer) -> None:
335 """
336 This will ask the current llm provider a suggestion for the current buffer.
337
338 If there is a currently running llm task, it will cancel it.
339 """
340 # we likely want to store the current cursor position, and cancel if the cursor has moved.
341 try:
342 import jupyter_ai_magics
343 except ModuleNotFoundError:
344 jupyter_ai_magics = None
345 if not self._llm_provider:
346 warnings.warn("No LLM provider found, cannot trigger LLM completions")
347 return
348 if jupyter_ai_magics is None:
349 warnings.warn("LLM Completion requires `jupyter_ai_magics` to be installed")
350
351 self._cancel_running_llm_task()
352
353 async def error_catcher(buffer):
354 """
355 This catches and log any errors, as otherwise this is just
356 lost in the void of the future running task.
357 """
358 try:
359 await self._trigger_llm_core(buffer)
360 except Exception as e:
361 get_ipython().log.error("error %s", e)
362 raise
363
364 # here we need a cancellable task so we can't just await the error caught
365 self._llm_task = asyncio.create_task(error_catcher(buffer))
366 await self._llm_task
367
368 async def _trigger_llm_core(self, buffer: Buffer):
369 """
370 This is the core of the current llm request.
371
372 Here we build a compatible `InlineCompletionRequest` and ask the llm
373 provider to stream it's response back to us iteratively setting it as
374 the suggestion on the current buffer.
375
376 Unlike with JupyterAi, as we do not have multiple cells, the cell id
377 is always set to `None`.
378
379 We set the prefix to the current cell content, but could also insert the
380 rest of the history or even just the non-fail history.
381
382 In the same way, we do not have cell id.
383
384 LLM provider may return multiple suggestion stream, but for the time
385 being we only support one.
386
387 Here we make the assumption that the provider will have
388 stream_inline_completions, I'm not sure it is the case for all
389 providers.
390 """
391 try:
392 import jupyter_ai.completions.models as jai_models
393 except ModuleNotFoundError:
394 jai_models = None
395
396 if not jai_models:
397 raise ValueError("jupyter-ai is not installed")
398
399 if not self._llm_provider:
400 raise ValueError("No LLM provider found, cannot trigger LLM completions")
401
402 hm = buffer.history.shell.history_manager
403 prefix = self._llm_prefixer(hm)
404 get_ipython().log.debug("prefix: %s", prefix)
405
406 self._request_number += 1
407 request_number = self._request_number
408
409 request = jai_models.InlineCompletionRequest(
410 number=request_number,
411 prefix=prefix + buffer.document.text_before_cursor,
412 suffix=buffer.document.text_after_cursor,
413 mime="text/x-python",
414 stream=True,
415 path=None,
416 language="python",
417 cell_id=None,
418 )
419
420 async for reply_and_chunks in self._llm_provider.stream_inline_completions(
421 request
422 ):
423 if self._request_number != request_number:
424 # If a new suggestion was requested, skip processing this one.
425 return
426 if isinstance(reply_and_chunks, jai_models.InlineCompletionReply):
427 if len(reply_and_chunks.list.items) > 1:
428 raise ValueError(
429 "Terminal IPython cannot deal with multiple LLM suggestions at once"
430 )
431 buffer.suggestion = Suggestion(
432 reply_and_chunks.list.items[0].insertText
433 )
434 buffer.on_suggestion_set.fire()
435 elif isinstance(reply_and_chunks, jai_models.InlineCompletionStreamChunk):
436 buffer.suggestion = Suggestion(reply_and_chunks.response.insertText)
437 buffer.on_suggestion_set.fire()
438 return
439
440
441async def llm_autosuggestion(event: KeyPressEvent):
442 """
443 Ask the AutoSuggester from history to delegate to ask an LLM for completion
444
445 This will first make sure that the current buffer have _MIN_LINES (7)
446 available lines to insert the LLM completion
447
448 Provisional as of 8.32, may change without warnings
449
450 """
451 _MIN_LINES = 5
452 provider = get_ipython().auto_suggest
453 if not isinstance(provider, NavigableAutoSuggestFromHistory):
454 return
455 doc = event.current_buffer.document
456 lines_to_insert = max(0, _MIN_LINES - doc.line_count + doc.cursor_position_row)
457 for _ in range(lines_to_insert):
458 event.current_buffer.insert_text("\n", move_cursor=False, fire_event=False)
459
460 await provider._trigger_llm(event.current_buffer)
461
462
463def accept_or_jump_to_end(event: KeyPressEvent):
464 """Apply autosuggestion or jump to end of line."""
465 buffer = event.current_buffer
466 d = buffer.document
467 after_cursor = d.text[d.cursor_position :]
468 lines = after_cursor.split("\n")
469 end_of_current_line = lines[0].strip()
470 suggestion = buffer.suggestion
471 if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""):
472 buffer.insert_text(suggestion.text)
473 else:
474 nc.end_of_line(event)
475
476
477def accept(event: KeyPressEvent):
478 """Accept autosuggestion"""
479 buffer = event.current_buffer
480 suggestion = buffer.suggestion
481 if suggestion:
482 buffer.insert_text(suggestion.text)
483 else:
484 nc.forward_char(event)
485
486
487def discard(event: KeyPressEvent):
488 """Discard autosuggestion"""
489 buffer = event.current_buffer
490 buffer.suggestion = None
491
492
493def accept_word(event: KeyPressEvent):
494 """Fill partial autosuggestion by word"""
495 buffer = event.current_buffer
496 suggestion = buffer.suggestion
497 if suggestion:
498 t = re.split(r"(\S+\s+)", suggestion.text)
499 buffer.insert_text(next((x for x in t if x), ""))
500 else:
501 nc.forward_word(event)
502
503
504def accept_character(event: KeyPressEvent):
505 """Fill partial autosuggestion by character"""
506 b = event.current_buffer
507 suggestion = b.suggestion
508 if suggestion and suggestion.text:
509 b.insert_text(suggestion.text[0])
510
511
512def accept_and_keep_cursor(event: KeyPressEvent):
513 """Accept autosuggestion and keep cursor in place"""
514 buffer = event.current_buffer
515 old_position = buffer.cursor_position
516 suggestion = buffer.suggestion
517 if suggestion:
518 buffer.insert_text(suggestion.text)
519 buffer.cursor_position = old_position
520
521
522def accept_and_move_cursor_left(event: KeyPressEvent):
523 """Accept autosuggestion and move cursor left in place"""
524 accept_and_keep_cursor(event)
525 nc.backward_char(event)
526
527
528def _update_hint(buffer: Buffer):
529 if buffer.auto_suggest:
530 suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document)
531 buffer.suggestion = suggestion
532
533
534def backspace_and_resume_hint(event: KeyPressEvent):
535 """Resume autosuggestions after deleting last character"""
536 nc.backward_delete_char(event)
537 _update_hint(event.current_buffer)
538
539
540def resume_hinting(event: KeyPressEvent):
541 """Resume autosuggestions"""
542 pass_through.reply(event)
543 # Order matters: if update happened first and event reply second, the
544 # suggestion would be auto-accepted if both actions are bound to same key.
545 _update_hint(event.current_buffer)
546
547
548def up_and_update_hint(event: KeyPressEvent):
549 """Go up and update hint"""
550 current_buffer = event.current_buffer
551
552 current_buffer.auto_up(count=event.arg)
553 _update_hint(current_buffer)
554
555
556def down_and_update_hint(event: KeyPressEvent):
557 """Go down and update hint"""
558 current_buffer = event.current_buffer
559
560 current_buffer.auto_down(count=event.arg)
561 _update_hint(current_buffer)
562
563
564def accept_token(event: KeyPressEvent):
565 """Fill partial autosuggestion by token"""
566 b = event.current_buffer
567 suggestion = b.suggestion
568
569 if suggestion:
570 prefix = _get_query(b.document)
571 text = prefix + suggestion.text
572
573 tokens: List[Optional[str]] = [None, None, None]
574 substrings = [""]
575 i = 0
576
577 for token in generate_tokens(StringIO(text).readline):
578 if token.type == tokenize.NEWLINE:
579 index = len(text)
580 else:
581 index = text.index(token[1], len(substrings[-1]))
582 substrings.append(text[:index])
583 tokenized_so_far = substrings[-1]
584 if tokenized_so_far.startswith(prefix):
585 if i == 0 and len(tokenized_so_far) > len(prefix):
586 tokens[0] = tokenized_so_far[len(prefix) :]
587 substrings.append(tokenized_so_far)
588 i += 1
589 tokens[i] = token[1]
590 if i == 2:
591 break
592 i += 1
593
594 if tokens[0]:
595 to_insert: str
596 insert_text = substrings[-2]
597 if tokens[1] and len(tokens[1]) == 1:
598 insert_text = substrings[-1]
599 to_insert = insert_text[len(prefix) :]
600 b.insert_text(to_insert)
601 return
602
603 nc.forward_word(event)
604
605
606Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None]
607
608
609def _swap_autosuggestion(
610 buffer: Buffer,
611 provider: NavigableAutoSuggestFromHistory,
612 direction_method: Callable,
613):
614 """
615 We skip most recent history entry (in either direction) if it equals the
616 current autosuggestion because if user cycles when auto-suggestion is shown
617 they most likely want something else than what was suggested (otherwise
618 they would have accepted the suggestion).
619 """
620 suggestion = buffer.suggestion
621 if not suggestion:
622 return
623
624 query = _get_query(buffer.document)
625 current = query + suggestion.text
626
627 direction_method(query=query, other_than=current, history=buffer.history)
628
629 new_suggestion = provider.get_suggestion(buffer, buffer.document)
630 buffer.suggestion = new_suggestion
631
632
633def swap_autosuggestion_up(event: KeyPressEvent):
634 """Get next autosuggestion from history."""
635 shell = get_ipython()
636 provider = shell.auto_suggest
637
638 if not isinstance(provider, NavigableAutoSuggestFromHistory):
639 return
640
641 return _swap_autosuggestion(
642 buffer=event.current_buffer, provider=provider, direction_method=provider.up
643 )
644
645
646def swap_autosuggestion_down(event: KeyPressEvent):
647 """Get previous autosuggestion from history."""
648 shell = get_ipython()
649 provider = shell.auto_suggest
650
651 if not isinstance(provider, NavigableAutoSuggestFromHistory):
652 return
653
654 return _swap_autosuggestion(
655 buffer=event.current_buffer,
656 provider=provider,
657 direction_method=provider.down,
658 )