1import re
2import asyncio
3import tokenize
4from io import StringIO
5from typing import Callable, List, Optional, Union, Generator, Tuple, ClassVar, Any
6import warnings
7
8import prompt_toolkit
9from prompt_toolkit.buffer import Buffer
10from prompt_toolkit.key_binding import KeyPressEvent
11from prompt_toolkit.key_binding.bindings import named_commands as nc
12from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion
13from prompt_toolkit.document import Document
14from prompt_toolkit.history import History
15from prompt_toolkit.shortcuts import PromptSession
16from prompt_toolkit.layout.processors import (
17 Processor,
18 Transformation,
19 TransformationInput,
20)
21
22from IPython.core.getipython import get_ipython
23from IPython.utils.tokenutil import generate_tokens
24
25from .filters import pass_through
26
27
28def _get_query(document: Document):
29 return document.lines[document.cursor_position_row]
30
31
32class AppendAutoSuggestionInAnyLine(Processor):
33 """
34 Append the auto suggestion to lines other than the last (appending to the
35 last line is natively supported by the prompt toolkit).
36
37 This has a private `_debug` attribute that can be set to True to display
38 debug information as virtual suggestion on the end of any line. You can do
39 so with:
40
41 >>> from IPython.terminal.shortcuts.auto_suggest import AppendAutoSuggestionInAnyLine
42 >>> AppendAutoSuggestionInAnyLine._debug = True
43
44 """
45
46 _debug: ClassVar[bool] = False
47
48 def __init__(self, style: str = "class:auto-suggestion") -> None:
49 self.style = style
50
51 def apply_transformation(self, ti: TransformationInput) -> Transformation:
52 """
53 Apply transformation to the line that is currently being edited.
54
55 This is a variation of the original implementation in prompt toolkit
56 that allows to not only append suggestions to any line, but also to show
57 multi-line suggestions.
58
59 As transformation are applied on a line-by-line basis; we need to trick
60 a bit, and elide any line that is after the line we are currently
61 editing, until we run out of completions. We cannot shift the existing
62 lines
63
64 There are multiple cases to handle:
65
66 The completions ends before the end of the buffer:
67 We can resume showing the normal line, and say that some code may
68 be hidden.
69
70 The completions ends at the end of the buffer
71 We can just say that some code may be hidden.
72
73 And separately:
74
75 The completions ends beyond the end of the buffer
76 We need to both say that some code may be hidden, and that some
77 lines are not shown.
78
79 """
80 last_line_number = ti.document.line_count - 1
81 is_last_line = ti.lineno == last_line_number
82
83 noop = lambda text: Transformation(
84 fragments=ti.fragments + [(self.style, " " + text if self._debug else "")]
85 )
86 if ti.document.line_count == 1:
87 return noop("noop:oneline")
88 if ti.document.cursor_position_row == last_line_number and is_last_line:
89 # prompt toolkit already appends something; just leave it be
90 return noop("noop:last line and cursor")
91
92 # first everything before the current line is unchanged.
93 if ti.lineno < ti.document.cursor_position_row:
94 return noop("noop:before cursor")
95
96 buffer = ti.buffer_control.buffer
97 if not buffer.suggestion or not ti.document.is_cursor_at_the_end_of_line:
98 return noop("noop:not eol")
99
100 delta = ti.lineno - ti.document.cursor_position_row
101 suggestions = buffer.suggestion.text.splitlines()
102
103 if len(suggestions) == 0:
104 return noop("noop: no suggestions")
105
106 if prompt_toolkit.VERSION < (3, 0, 49):
107 if len(suggestions) > 1 and prompt_toolkit.VERSION < (3, 0, 49):
108 if ti.lineno == ti.document.cursor_position_row:
109 return Transformation(
110 fragments=ti.fragments
111 + [
112 (
113 "red",
114 "(Cannot show multiline suggestion; requires prompt_toolkit > 3.0.49)",
115 )
116 ]
117 )
118 else:
119 return Transformation(fragments=ti.fragments)
120 elif len(suggestions) == 1:
121 if ti.lineno == ti.document.cursor_position_row:
122 return Transformation(
123 fragments=ti.fragments + [(self.style, suggestions[0])]
124 )
125 return Transformation(fragments=ti.fragments)
126
127 if delta == 0:
128 suggestion = suggestions[0]
129 return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
130 if is_last_line:
131 if delta < len(suggestions):
132 suggestion = f"… rest of suggestion ({len(suggestions) - delta} lines) and code hidden"
133 return Transformation([(self.style, suggestion)])
134
135 n_elided = len(suggestions)
136 for i in range(len(suggestions)):
137 ll = ti.get_line(last_line_number - i)
138 el = "".join(l[1] for l in ll).strip()
139 if el:
140 break
141 else:
142 n_elided -= 1
143 if n_elided:
144 return Transformation([(self.style, f"… {n_elided} line(s) hidden")])
145 else:
146 return Transformation(
147 ti.get_line(last_line_number - len(suggestions) + 1)
148 + ([(self.style, "shift-last-line")] if self._debug else [])
149 )
150
151 elif delta < len(suggestions):
152 suggestion = suggestions[delta]
153 return Transformation([(self.style, suggestion)])
154 else:
155 shift = ti.lineno - len(suggestions) + 1
156 return Transformation(ti.get_line(shift))
157
158
159class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory):
160 """
161 A subclass of AutoSuggestFromHistory that allow navigation to next/previous
162 suggestion from history. To do so it remembers the current position, but it
163 state need to carefully be cleared on the right events.
164 """
165
166 skip_lines: int
167 _connected_apps: list[PromptSession]
168
169 # handle to the currently running llm task that appends suggestions to the
170 # current buffer; we keep a handle to it in order to cancel it when there is a cursor movement, or
171 # another request.
172 _llm_task: asyncio.Task | None = None
173
174 # This is the constructor of the LLM provider from jupyter-ai
175 # to which we forward the request to generate inline completions.
176 _init_llm_provider: Callable | None
177
178 _llm_provider_instance: Any | None
179 _llm_prefixer: Callable = lambda self, x: "wrong"
180
181 def __init__(self):
182 super().__init__()
183 self.skip_lines = 0
184 self._connected_apps = []
185 self._llm_provider_instance = None
186 self._init_llm_provider = None
187 self._request_number = 0
188
189 def reset_history_position(self, _: Buffer) -> None:
190 self.skip_lines = 0
191
192 def disconnect(self) -> None:
193 self._cancel_running_llm_task()
194 for pt_app in self._connected_apps:
195 text_insert_event = pt_app.default_buffer.on_text_insert
196 text_insert_event.remove_handler(self.reset_history_position)
197
198 def connect(self, pt_app: PromptSession) -> None:
199 self._connected_apps.append(pt_app)
200 # note: `on_text_changed` could be used for a bit different behaviour
201 # on character deletion (i.e. resetting history position on backspace)
202 pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position)
203 pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss)
204
205 def get_suggestion(
206 self, buffer: Buffer, document: Document
207 ) -> Optional[Suggestion]:
208 text = _get_query(document)
209
210 if text.strip():
211 for suggestion, _ in self._find_next_match(
212 text, self.skip_lines, buffer.history
213 ):
214 return Suggestion(suggestion)
215
216 return None
217
218 def _dismiss(self, buffer, *args, **kwargs) -> None:
219 self._cancel_running_llm_task()
220 buffer.suggestion = None
221
222 def _find_match(
223 self, text: str, skip_lines: float, history: History, previous: bool
224 ) -> Generator[Tuple[str, float], None, None]:
225 """
226 text : str
227 Text content to find a match for, the user cursor is most of the
228 time at the end of this text.
229 skip_lines : float
230 number of items to skip in the search, this is used to indicate how
231 far in the list the user has navigated by pressing up or down.
232 The float type is used as the base value is +inf
233 history : History
234 prompt_toolkit History instance to fetch previous entries from.
235 previous : bool
236 Direction of the search, whether we are looking previous match
237 (True), or next match (False).
238
239 Yields
240 ------
241 Tuple with:
242 str:
243 current suggestion.
244 float:
245 will actually yield only ints, which is passed back via skip_lines,
246 which may be a +inf (float)
247
248
249 """
250 line_number = -1
251 for string in reversed(list(history.get_strings())):
252 for line in reversed(string.splitlines()):
253 line_number += 1
254 if not previous and line_number < skip_lines:
255 continue
256 # do not return empty suggestions as these
257 # close the auto-suggestion overlay (and are useless)
258 if line.startswith(text) and len(line) > len(text):
259 yield line[len(text) :], line_number
260 if previous and line_number >= skip_lines:
261 return
262
263 def _find_next_match(
264 self, text: str, skip_lines: float, history: History
265 ) -> Generator[Tuple[str, float], None, None]:
266 return self._find_match(text, skip_lines, history, previous=False)
267
268 def _find_previous_match(self, text: str, skip_lines: float, history: History):
269 return reversed(
270 list(self._find_match(text, skip_lines, history, previous=True))
271 )
272
273 def up(self, query: str, other_than: str, history: History) -> None:
274 self._cancel_running_llm_task()
275 for suggestion, line_number in self._find_next_match(
276 query, self.skip_lines, history
277 ):
278 # if user has history ['very.a', 'very', 'very.b'] and typed 'very'
279 # we want to switch from 'very.b' to 'very.a' because a) if the
280 # suggestion equals current text, prompt-toolkit aborts suggesting
281 # b) user likely would not be interested in 'very' anyways (they
282 # already typed it).
283 if query + suggestion != other_than:
284 self.skip_lines = line_number
285 break
286 else:
287 # no matches found, cycle back to beginning
288 self.skip_lines = 0
289
290 def down(self, query: str, other_than: str, history: History) -> None:
291 self._cancel_running_llm_task()
292 for suggestion, line_number in self._find_previous_match(
293 query, self.skip_lines, history
294 ):
295 if query + suggestion != other_than:
296 self.skip_lines = line_number
297 break
298 else:
299 # no matches found, cycle to end
300 for suggestion, line_number in self._find_previous_match(
301 query, float("Inf"), history
302 ):
303 if query + suggestion != other_than:
304 self.skip_lines = line_number
305 break
306
307 def _cancel_running_llm_task(self) -> None:
308 """
309 Try to cancel the currently running llm_task if exists, and set it to None.
310 """
311 if self._llm_task is not None:
312 if self._llm_task.done():
313 self._llm_task = None
314 return
315 cancelled = self._llm_task.cancel()
316 if cancelled:
317 self._llm_task = None
318 if not cancelled:
319 warnings.warn(
320 "LLM task not cancelled, does your provider support cancellation?"
321 )
322
323 @property
324 def _llm_provider(self):
325 """Lazy-initialized instance of the LLM provider.
326
327 Do not use in the constructor, as `_init_llm_provider` can trigger slow side-effects.
328 """
329 if self._llm_provider_instance is None and self._init_llm_provider:
330 self._llm_provider_instance = self._init_llm_provider()
331 return self._llm_provider_instance
332
333 async def _trigger_llm(self, buffer) -> None:
334 """
335 This will ask the current llm provider a suggestion for the current buffer.
336
337 If there is a currently running llm task, it will cancel it.
338 """
339 # we likely want to store the current cursor position, and cancel if the cursor has moved.
340 try:
341 import jupyter_ai_magics
342 except ModuleNotFoundError:
343 jupyter_ai_magics = None
344 if not self._llm_provider:
345 warnings.warn("No LLM provider found, cannot trigger LLM completions")
346 return
347 if jupyter_ai_magics is None:
348 warnings.warn("LLM Completion requires `jupyter_ai_magics` to be installed")
349
350 self._cancel_running_llm_task()
351
352 async def error_catcher(buffer):
353 """
354 This catches and log any errors, as otherwise this is just
355 lost in the void of the future running task.
356 """
357 try:
358 await self._trigger_llm_core(buffer)
359 except Exception as e:
360 get_ipython().log.error("error %s", e)
361 raise
362
363 # here we need a cancellable task so we can't just await the error caught
364 self._llm_task = asyncio.create_task(error_catcher(buffer))
365 await self._llm_task
366
367 async def _trigger_llm_core(self, buffer: Buffer):
368 """
369 This is the core of the current llm request.
370
371 Here we build a compatible `InlineCompletionRequest` and ask the llm
372 provider to stream it's response back to us iteratively setting it as
373 the suggestion on the current buffer.
374
375 Unlike with JupyterAi, as we do not have multiple cells, the cell id
376 is always set to `None`.
377
378 We set the prefix to the current cell content, but could also insert the
379 rest of the history or even just the non-fail history.
380
381 In the same way, we do not have cell id.
382
383 LLM provider may return multiple suggestion stream, but for the time
384 being we only support one.
385
386 Here we make the assumption that the provider will have
387 stream_inline_completions, I'm not sure it is the case for all
388 providers.
389 """
390 try:
391 import jupyter_ai.completions.models as jai_models
392 except ModuleNotFoundError:
393 jai_models = None
394
395 if not jai_models:
396 raise ValueError("jupyter-ai is not installed")
397
398 if not self._llm_provider:
399 raise ValueError("No LLM provider found, cannot trigger LLM completions")
400
401 hm = buffer.history.shell.history_manager
402 prefix = self._llm_prefixer(hm)
403 get_ipython().log.debug("prefix: %s", prefix)
404
405 self._request_number += 1
406 request_number = self._request_number
407
408 request = jai_models.InlineCompletionRequest(
409 number=request_number,
410 prefix=prefix + buffer.document.text_before_cursor,
411 suffix=buffer.document.text_after_cursor,
412 mime="text/x-python",
413 stream=True,
414 path=None,
415 language="python",
416 cell_id=None,
417 )
418
419 async for reply_and_chunks in self._llm_provider.stream_inline_completions(
420 request
421 ):
422 if self._request_number != request_number:
423 # If a new suggestion was requested, skip processing this one.
424 return
425 if isinstance(reply_and_chunks, jai_models.InlineCompletionReply):
426 if len(reply_and_chunks.list.items) > 1:
427 raise ValueError(
428 "Terminal IPython cannot deal with multiple LLM suggestions at once"
429 )
430 buffer.suggestion = Suggestion(
431 reply_and_chunks.list.items[0].insertText
432 )
433 buffer.on_suggestion_set.fire()
434 elif isinstance(reply_and_chunks, jai_models.InlineCompletionStreamChunk):
435 buffer.suggestion = Suggestion(reply_and_chunks.response.insertText)
436 buffer.on_suggestion_set.fire()
437 return
438
439
440async def llm_autosuggestion(event: KeyPressEvent):
441 """
442 Ask the AutoSuggester from history to delegate to ask an LLM for completion
443
444 This will first make sure that the current buffer have _MIN_LINES (7)
445 available lines to insert the LLM completion
446
447 Provisional as of 8.32, may change without warnings
448
449 """
450 _MIN_LINES = 5
451 provider = get_ipython().auto_suggest
452 if not isinstance(provider, NavigableAutoSuggestFromHistory):
453 return
454 doc = event.current_buffer.document
455 lines_to_insert = max(0, _MIN_LINES - doc.line_count + doc.cursor_position_row)
456 for _ in range(lines_to_insert):
457 event.current_buffer.insert_text("\n", move_cursor=False, fire_event=False)
458
459 await provider._trigger_llm(event.current_buffer)
460
461
462def accept_or_jump_to_end(event: KeyPressEvent):
463 """Apply autosuggestion or jump to end of line."""
464 buffer = event.current_buffer
465 d = buffer.document
466 after_cursor = d.text[d.cursor_position :]
467 lines = after_cursor.split("\n")
468 end_of_current_line = lines[0].strip()
469 suggestion = buffer.suggestion
470 if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""):
471 buffer.insert_text(suggestion.text)
472 else:
473 nc.end_of_line(event)
474
475
476def accept(event: KeyPressEvent):
477 """Accept autosuggestion"""
478 buffer = event.current_buffer
479 suggestion = buffer.suggestion
480 if suggestion:
481 buffer.insert_text(suggestion.text)
482 else:
483 nc.forward_char(event)
484
485
486def discard(event: KeyPressEvent):
487 """Discard autosuggestion"""
488 buffer = event.current_buffer
489 buffer.suggestion = None
490
491
492def accept_word(event: KeyPressEvent):
493 """Fill partial autosuggestion by word"""
494 buffer = event.current_buffer
495 suggestion = buffer.suggestion
496 if suggestion:
497 t = re.split(r"(\S+\s+)", suggestion.text)
498 buffer.insert_text(next((x for x in t if x), ""))
499 else:
500 nc.forward_word(event)
501
502
503def accept_character(event: KeyPressEvent):
504 """Fill partial autosuggestion by character"""
505 b = event.current_buffer
506 suggestion = b.suggestion
507 if suggestion and suggestion.text:
508 b.insert_text(suggestion.text[0])
509
510
511def accept_and_keep_cursor(event: KeyPressEvent):
512 """Accept autosuggestion and keep cursor in place"""
513 buffer = event.current_buffer
514 old_position = buffer.cursor_position
515 suggestion = buffer.suggestion
516 if suggestion:
517 buffer.insert_text(suggestion.text)
518 buffer.cursor_position = old_position
519
520
521def accept_and_move_cursor_left(event: KeyPressEvent):
522 """Accept autosuggestion and move cursor left in place"""
523 accept_and_keep_cursor(event)
524 nc.backward_char(event)
525
526
527def _update_hint(buffer: Buffer):
528 if buffer.auto_suggest:
529 suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document)
530 buffer.suggestion = suggestion
531
532
533def backspace_and_resume_hint(event: KeyPressEvent):
534 """Resume autosuggestions after deleting last character"""
535 nc.backward_delete_char(event)
536 _update_hint(event.current_buffer)
537
538
539def resume_hinting(event: KeyPressEvent):
540 """Resume autosuggestions"""
541 pass_through.reply(event)
542 # Order matters: if update happened first and event reply second, the
543 # suggestion would be auto-accepted if both actions are bound to same key.
544 _update_hint(event.current_buffer)
545
546
547def up_and_update_hint(event: KeyPressEvent):
548 """Go up and update hint"""
549 current_buffer = event.current_buffer
550
551 current_buffer.auto_up(count=event.arg)
552 _update_hint(current_buffer)
553
554
555def down_and_update_hint(event: KeyPressEvent):
556 """Go down and update hint"""
557 current_buffer = event.current_buffer
558
559 current_buffer.auto_down(count=event.arg)
560 _update_hint(current_buffer)
561
562
563def accept_token(event: KeyPressEvent):
564 """Fill partial autosuggestion by token"""
565 b = event.current_buffer
566 suggestion = b.suggestion
567
568 if suggestion:
569 prefix = _get_query(b.document)
570 text = prefix + suggestion.text
571
572 tokens: List[Optional[str]] = [None, None, None]
573 substrings = [""]
574 i = 0
575
576 for token in generate_tokens(StringIO(text).readline):
577 if token.type == tokenize.NEWLINE:
578 index = len(text)
579 else:
580 index = text.index(token[1], len(substrings[-1]))
581 substrings.append(text[:index])
582 tokenized_so_far = substrings[-1]
583 if tokenized_so_far.startswith(prefix):
584 if i == 0 and len(tokenized_so_far) > len(prefix):
585 tokens[0] = tokenized_so_far[len(prefix) :]
586 substrings.append(tokenized_so_far)
587 i += 1
588 tokens[i] = token[1]
589 if i == 2:
590 break
591 i += 1
592
593 if tokens[0]:
594 to_insert: str
595 insert_text = substrings[-2]
596 if tokens[1] and len(tokens[1]) == 1:
597 insert_text = substrings[-1]
598 to_insert = insert_text[len(prefix) :]
599 b.insert_text(to_insert)
600 return
601
602 nc.forward_word(event)
603
604
605Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None]
606
607
608def _swap_autosuggestion(
609 buffer: Buffer,
610 provider: NavigableAutoSuggestFromHistory,
611 direction_method: Callable,
612):
613 """
614 We skip most recent history entry (in either direction) if it equals the
615 current autosuggestion because if user cycles when auto-suggestion is shown
616 they most likely want something else than what was suggested (otherwise
617 they would have accepted the suggestion).
618 """
619 suggestion = buffer.suggestion
620 if not suggestion:
621 return
622
623 query = _get_query(buffer.document)
624 current = query + suggestion.text
625
626 direction_method(query=query, other_than=current, history=buffer.history)
627
628 new_suggestion = provider.get_suggestion(buffer, buffer.document)
629 buffer.suggestion = new_suggestion
630
631
632def swap_autosuggestion_up(event: KeyPressEvent):
633 """Get next autosuggestion from history."""
634 shell = get_ipython()
635 provider = shell.auto_suggest
636
637 if not isinstance(provider, NavigableAutoSuggestFromHistory):
638 return
639
640 return _swap_autosuggestion(
641 buffer=event.current_buffer, provider=provider, direction_method=provider.up
642 )
643
644
645def swap_autosuggestion_down(event: KeyPressEvent):
646 """Get previous autosuggestion from history."""
647 shell = get_ipython()
648 provider = shell.auto_suggest
649
650 if not isinstance(provider, NavigableAutoSuggestFromHistory):
651 return
652
653 return _swap_autosuggestion(
654 buffer=event.current_buffer,
655 provider=provider,
656 direction_method=provider.down,
657 )