Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/terminal/shortcuts/auto_suggest.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

323 statements  

1import re 

2import asyncio 

3import tokenize 

4from io import StringIO 

5from typing import List, Optional, Union, Tuple, ClassVar, Any 

6from collections.abc import Callable, Generator 

7import warnings 

8 

9import prompt_toolkit 

10from prompt_toolkit.buffer import Buffer 

11from prompt_toolkit.key_binding import KeyPressEvent 

12from prompt_toolkit.key_binding.bindings import named_commands as nc 

13from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion 

14from prompt_toolkit.document import Document 

15from prompt_toolkit.history import History 

16from prompt_toolkit.shortcuts import PromptSession 

17from prompt_toolkit.layout.processors import ( 

18 Processor, 

19 Transformation, 

20 TransformationInput, 

21) 

22 

23from IPython.core.getipython import get_ipython 

24from IPython.utils.tokenutil import generate_tokens 

25 

26from .filters import pass_through 

27 

28 

29def _get_query(document: Document): 

30 return document.lines[document.cursor_position_row] 

31 

32 

33class AppendAutoSuggestionInAnyLine(Processor): 

34 """ 

35 Append the auto suggestion to lines other than the last (appending to the 

36 last line is natively supported by the prompt toolkit). 

37 

38 This has a private `_debug` attribute that can be set to True to display 

39 debug information as virtual suggestion on the end of any line. You can do 

40 so with: 

41 

42 >>> from IPython.terminal.shortcuts.auto_suggest import AppendAutoSuggestionInAnyLine 

43 >>> AppendAutoSuggestionInAnyLine._debug = True 

44 

45 """ 

46 

47 _debug: ClassVar[bool] = False 

48 

49 def __init__(self, style: str = "class:auto-suggestion") -> None: 

50 self.style = style 

51 

52 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

53 """ 

54 Apply transformation to the line that is currently being edited. 

55 

56 This is a variation of the original implementation in prompt toolkit 

57 that allows to not only append suggestions to any line, but also to show 

58 multi-line suggestions. 

59 

60 As transformation are applied on a line-by-line basis; we need to trick 

61 a bit, and elide any line that is after the line we are currently 

62 editing, until we run out of completions. We cannot shift the existing 

63 lines 

64 

65 There are multiple cases to handle: 

66 

67 The completions ends before the end of the buffer: 

68 We can resume showing the normal line, and say that some code may 

69 be hidden. 

70 

71 The completions ends at the end of the buffer 

72 We can just say that some code may be hidden. 

73 

74 And separately: 

75 

76 The completions ends beyond the end of the buffer 

77 We need to both say that some code may be hidden, and that some 

78 lines are not shown. 

79 

80 """ 

81 last_line_number = ti.document.line_count - 1 

82 is_last_line = ti.lineno == last_line_number 

83 

84 noop = lambda text: Transformation( 

85 fragments=ti.fragments + [(self.style, " " + text if self._debug else "")] 

86 ) 

87 if ti.document.line_count == 1: 

88 return noop("noop:oneline") 

89 if ti.document.cursor_position_row == last_line_number and is_last_line: 

90 # prompt toolkit already appends something; just leave it be 

91 return noop("noop:last line and cursor") 

92 

93 # first everything before the current line is unchanged. 

94 if ti.lineno < ti.document.cursor_position_row: 

95 return noop("noop:before cursor") 

96 

97 buffer = ti.buffer_control.buffer 

98 if not buffer.suggestion or not ti.document.is_cursor_at_the_end_of_line: 

99 return noop("noop:not eol") 

100 

101 delta = ti.lineno - ti.document.cursor_position_row 

102 suggestions = buffer.suggestion.text.splitlines() 

103 

104 if len(suggestions) == 0: 

105 return noop("noop: no suggestions") 

106 

107 if prompt_toolkit.VERSION < (3, 0, 49): 

108 if len(suggestions) > 1 and prompt_toolkit.VERSION < (3, 0, 49): 

109 if ti.lineno == ti.document.cursor_position_row: 

110 return Transformation( 

111 fragments=ti.fragments 

112 + [ 

113 ( 

114 "red", 

115 "(Cannot show multiline suggestion; requires prompt_toolkit > 3.0.49)", 

116 ) 

117 ] 

118 ) 

119 else: 

120 return Transformation(fragments=ti.fragments) 

121 elif len(suggestions) == 1: 

122 if ti.lineno == ti.document.cursor_position_row: 

123 return Transformation( 

124 fragments=ti.fragments + [(self.style, suggestions[0])] 

125 ) 

126 return Transformation(fragments=ti.fragments) 

127 

128 if delta == 0: 

129 suggestion = suggestions[0] 

130 return Transformation(fragments=ti.fragments + [(self.style, suggestion)]) 

131 if is_last_line: 

132 if delta < len(suggestions): 

133 suggestion = f"… rest of suggestion ({len(suggestions) - delta} lines) and code hidden" 

134 return Transformation([(self.style, suggestion)]) 

135 

136 n_elided = len(suggestions) 

137 for i in range(len(suggestions)): 

138 ll = ti.get_line(last_line_number - i) 

139 el = "".join(l[1] for l in ll).strip() 

140 if el: 

141 break 

142 else: 

143 n_elided -= 1 

144 if n_elided: 

145 return Transformation([(self.style, f"… {n_elided} line(s) hidden")]) 

146 else: 

147 return Transformation( 

148 ti.get_line(last_line_number - len(suggestions) + 1) 

149 + ([(self.style, "shift-last-line")] if self._debug else []) 

150 ) 

151 

152 elif delta < len(suggestions): 

153 suggestion = suggestions[delta] 

154 return Transformation([(self.style, suggestion)]) 

155 else: 

156 shift = ti.lineno - len(suggestions) + 1 

157 return Transformation(ti.get_line(shift)) 

158 

159 

160class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory): 

161 """ 

162 A subclass of AutoSuggestFromHistory that allow navigation to next/previous 

163 suggestion from history. To do so it remembers the current position, but it 

164 state need to carefully be cleared on the right events. 

165 """ 

166 

167 skip_lines: int 

168 _connected_apps: list[PromptSession] 

169 

170 # handle to the currently running llm task that appends suggestions to the 

171 # current buffer; we keep a handle to it in order to cancel it when there is a cursor movement, or 

172 # another request. 

173 _llm_task: asyncio.Task | None = None 

174 

175 # This is the constructor of the LLM provider from jupyter-ai 

176 # to which we forward the request to generate inline completions. 

177 _init_llm_provider: Callable | None 

178 

179 _llm_provider_instance: Any | None 

180 _llm_prefixer: Callable = lambda self, x: "wrong" 

181 

182 def __init__(self): 

183 super().__init__() 

184 self.skip_lines = 0 

185 self._connected_apps = [] 

186 self._llm_provider_instance = None 

187 self._init_llm_provider = None 

188 self._request_number = 0 

189 

190 def reset_history_position(self, _: Buffer) -> None: 

191 self.skip_lines = 0 

192 

193 def disconnect(self) -> None: 

194 self._cancel_running_llm_task() 

195 for pt_app in self._connected_apps: 

196 text_insert_event = pt_app.default_buffer.on_text_insert 

197 text_insert_event.remove_handler(self.reset_history_position) 

198 

199 def connect(self, pt_app: PromptSession) -> None: 

200 self._connected_apps.append(pt_app) 

201 # note: `on_text_changed` could be used for a bit different behaviour 

202 # on character deletion (i.e. resetting history position on backspace) 

203 pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position) 

204 pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss) 

205 

206 def get_suggestion( 

207 self, buffer: Buffer, document: Document 

208 ) -> Optional[Suggestion]: 

209 text = _get_query(document) 

210 

211 if text.strip(): 

212 for suggestion, _ in self._find_next_match( 

213 text, self.skip_lines, buffer.history 

214 ): 

215 return Suggestion(suggestion) 

216 

217 return None 

218 

219 def _dismiss(self, buffer, *args, **kwargs) -> None: 

220 self._cancel_running_llm_task() 

221 buffer.suggestion = None 

222 

223 def _find_match( 

224 self, text: str, skip_lines: float, history: History, previous: bool 

225 ) -> Generator[Tuple[str, float], None, None]: 

226 """ 

227 text : str 

228 Text content to find a match for, the user cursor is most of the 

229 time at the end of this text. 

230 skip_lines : float 

231 number of items to skip in the search, this is used to indicate how 

232 far in the list the user has navigated by pressing up or down. 

233 The float type is used as the base value is +inf 

234 history : History 

235 prompt_toolkit History instance to fetch previous entries from. 

236 previous : bool 

237 Direction of the search, whether we are looking previous match 

238 (True), or next match (False). 

239 

240 Yields 

241 ------ 

242 Tuple with: 

243 str: 

244 current suggestion. 

245 float: 

246 will actually yield only ints, which is passed back via skip_lines, 

247 which may be a +inf (float) 

248 

249 

250 """ 

251 line_number = -1 

252 for string in reversed(list(history.get_strings())): 

253 for line in reversed(string.splitlines()): 

254 line_number += 1 

255 if not previous and line_number < skip_lines: 

256 continue 

257 # do not return empty suggestions as these 

258 # close the auto-suggestion overlay (and are useless) 

259 if line.startswith(text) and len(line) > len(text): 

260 yield line[len(text) :], line_number 

261 if previous and line_number >= skip_lines: 

262 return 

263 

264 def _find_next_match( 

265 self, text: str, skip_lines: float, history: History 

266 ) -> Generator[Tuple[str, float], None, None]: 

267 return self._find_match(text, skip_lines, history, previous=False) 

268 

269 def _find_previous_match(self, text: str, skip_lines: float, history: History): 

270 return reversed( 

271 list(self._find_match(text, skip_lines, history, previous=True)) 

272 ) 

273 

274 def up(self, query: str, other_than: str, history: History) -> None: 

275 self._cancel_running_llm_task() 

276 for suggestion, line_number in self._find_next_match( 

277 query, self.skip_lines, history 

278 ): 

279 # if user has history ['very.a', 'very', 'very.b'] and typed 'very' 

280 # we want to switch from 'very.b' to 'very.a' because a) if the 

281 # suggestion equals current text, prompt-toolkit aborts suggesting 

282 # b) user likely would not be interested in 'very' anyways (they 

283 # already typed it). 

284 if query + suggestion != other_than: 

285 self.skip_lines = line_number 

286 break 

287 else: 

288 # no matches found, cycle back to beginning 

289 self.skip_lines = 0 

290 

291 def down(self, query: str, other_than: str, history: History) -> None: 

292 self._cancel_running_llm_task() 

293 for suggestion, line_number in self._find_previous_match( 

294 query, self.skip_lines, history 

295 ): 

296 if query + suggestion != other_than: 

297 self.skip_lines = line_number 

298 break 

299 else: 

300 # no matches found, cycle to end 

301 for suggestion, line_number in self._find_previous_match( 

302 query, float("Inf"), history 

303 ): 

304 if query + suggestion != other_than: 

305 self.skip_lines = line_number 

306 break 

307 

308 def _cancel_running_llm_task(self) -> None: 

309 """ 

310 Try to cancel the currently running llm_task if exists, and set it to None. 

311 """ 

312 if self._llm_task is not None: 

313 if self._llm_task.done(): 

314 self._llm_task = None 

315 return 

316 cancelled = self._llm_task.cancel() 

317 if cancelled: 

318 self._llm_task = None 

319 if not cancelled: 

320 warnings.warn( 

321 "LLM task not cancelled, does your provider support cancellation?" 

322 ) 

323 

324 @property 

325 def _llm_provider(self): 

326 """Lazy-initialized instance of the LLM provider. 

327 

328 Do not use in the constructor, as `_init_llm_provider` can trigger slow side-effects. 

329 """ 

330 if self._llm_provider_instance is None and self._init_llm_provider: 

331 self._llm_provider_instance = self._init_llm_provider() 

332 return self._llm_provider_instance 

333 

334 async def _trigger_llm(self, buffer) -> None: 

335 """ 

336 This will ask the current llm provider a suggestion for the current buffer. 

337 

338 If there is a currently running llm task, it will cancel it. 

339 """ 

340 # we likely want to store the current cursor position, and cancel if the cursor has moved. 

341 try: 

342 import jupyter_ai_magics 

343 except ModuleNotFoundError: 

344 jupyter_ai_magics = None 

345 if not self._llm_provider: 

346 warnings.warn("No LLM provider found, cannot trigger LLM completions") 

347 return 

348 if jupyter_ai_magics is None: 

349 warnings.warn("LLM Completion requires `jupyter_ai_magics` to be installed") 

350 

351 self._cancel_running_llm_task() 

352 

353 async def error_catcher(buffer): 

354 """ 

355 This catches and log any errors, as otherwise this is just 

356 lost in the void of the future running task. 

357 """ 

358 try: 

359 await self._trigger_llm_core(buffer) 

360 except Exception as e: 

361 get_ipython().log.error("error %s", e) 

362 raise 

363 

364 # here we need a cancellable task so we can't just await the error caught 

365 self._llm_task = asyncio.create_task(error_catcher(buffer)) 

366 await self._llm_task 

367 

368 async def _trigger_llm_core(self, buffer: Buffer): 

369 """ 

370 This is the core of the current llm request. 

371 

372 Here we build a compatible `InlineCompletionRequest` and ask the llm 

373 provider to stream it's response back to us iteratively setting it as 

374 the suggestion on the current buffer. 

375 

376 Unlike with JupyterAi, as we do not have multiple cells, the cell id 

377 is always set to `None`. 

378 

379 We set the prefix to the current cell content, but could also insert the 

380 rest of the history or even just the non-fail history. 

381 

382 In the same way, we do not have cell id. 

383 

384 LLM provider may return multiple suggestion stream, but for the time 

385 being we only support one. 

386 

387 Here we make the assumption that the provider will have 

388 stream_inline_completions, I'm not sure it is the case for all 

389 providers. 

390 """ 

391 try: 

392 import jupyter_ai.completions.models as jai_models 

393 except ModuleNotFoundError: 

394 jai_models = None 

395 

396 if not jai_models: 

397 raise ValueError("jupyter-ai is not installed") 

398 

399 if not self._llm_provider: 

400 raise ValueError("No LLM provider found, cannot trigger LLM completions") 

401 

402 hm = buffer.history.shell.history_manager 

403 prefix = self._llm_prefixer(hm) 

404 get_ipython().log.debug("prefix: %s", prefix) 

405 

406 self._request_number += 1 

407 request_number = self._request_number 

408 

409 request = jai_models.InlineCompletionRequest( 

410 number=request_number, 

411 prefix=prefix + buffer.document.text_before_cursor, 

412 suffix=buffer.document.text_after_cursor, 

413 mime="text/x-python", 

414 stream=True, 

415 path=None, 

416 language="python", 

417 cell_id=None, 

418 ) 

419 

420 async for reply_and_chunks in self._llm_provider.stream_inline_completions( 

421 request 

422 ): 

423 if self._request_number != request_number: 

424 # If a new suggestion was requested, skip processing this one. 

425 return 

426 if isinstance(reply_and_chunks, jai_models.InlineCompletionReply): 

427 if len(reply_and_chunks.list.items) > 1: 

428 raise ValueError( 

429 "Terminal IPython cannot deal with multiple LLM suggestions at once" 

430 ) 

431 buffer.suggestion = Suggestion( 

432 reply_and_chunks.list.items[0].insertText 

433 ) 

434 buffer.on_suggestion_set.fire() 

435 elif isinstance(reply_and_chunks, jai_models.InlineCompletionStreamChunk): 

436 buffer.suggestion = Suggestion(reply_and_chunks.response.insertText) 

437 buffer.on_suggestion_set.fire() 

438 return 

439 

440 

441async def llm_autosuggestion(event: KeyPressEvent): 

442 """ 

443 Ask the AutoSuggester from history to delegate to ask an LLM for completion 

444 

445 This will first make sure that the current buffer have _MIN_LINES (7) 

446 available lines to insert the LLM completion 

447 

448 Provisional as of 8.32, may change without warnings 

449 

450 """ 

451 _MIN_LINES = 5 

452 provider = get_ipython().auto_suggest 

453 if not isinstance(provider, NavigableAutoSuggestFromHistory): 

454 return 

455 doc = event.current_buffer.document 

456 lines_to_insert = max(0, _MIN_LINES - doc.line_count + doc.cursor_position_row) 

457 for _ in range(lines_to_insert): 

458 event.current_buffer.insert_text("\n", move_cursor=False, fire_event=False) 

459 

460 await provider._trigger_llm(event.current_buffer) 

461 

462 

463def accept_or_jump_to_end(event: KeyPressEvent): 

464 """Apply autosuggestion or jump to end of line.""" 

465 buffer = event.current_buffer 

466 d = buffer.document 

467 after_cursor = d.text[d.cursor_position :] 

468 lines = after_cursor.split("\n") 

469 end_of_current_line = lines[0].strip() 

470 suggestion = buffer.suggestion 

471 if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""): 

472 buffer.insert_text(suggestion.text) 

473 else: 

474 nc.end_of_line(event) 

475 

476 

477def accept(event: KeyPressEvent): 

478 """Accept autosuggestion""" 

479 buffer = event.current_buffer 

480 suggestion = buffer.suggestion 

481 if suggestion: 

482 buffer.insert_text(suggestion.text) 

483 else: 

484 nc.forward_char(event) 

485 

486 

487def discard(event: KeyPressEvent): 

488 """Discard autosuggestion""" 

489 buffer = event.current_buffer 

490 buffer.suggestion = None 

491 

492 

493def accept_word(event: KeyPressEvent): 

494 """Fill partial autosuggestion by word""" 

495 buffer = event.current_buffer 

496 suggestion = buffer.suggestion 

497 if suggestion: 

498 t = re.split(r"(\S+\s+)", suggestion.text) 

499 buffer.insert_text(next((x for x in t if x), "")) 

500 else: 

501 nc.forward_word(event) 

502 

503 

504def accept_character(event: KeyPressEvent): 

505 """Fill partial autosuggestion by character""" 

506 b = event.current_buffer 

507 suggestion = b.suggestion 

508 if suggestion and suggestion.text: 

509 b.insert_text(suggestion.text[0]) 

510 

511 

512def accept_and_keep_cursor(event: KeyPressEvent): 

513 """Accept autosuggestion and keep cursor in place""" 

514 buffer = event.current_buffer 

515 old_position = buffer.cursor_position 

516 suggestion = buffer.suggestion 

517 if suggestion: 

518 buffer.insert_text(suggestion.text) 

519 buffer.cursor_position = old_position 

520 

521 

522def accept_and_move_cursor_left(event: KeyPressEvent): 

523 """Accept autosuggestion and move cursor left in place""" 

524 accept_and_keep_cursor(event) 

525 nc.backward_char(event) 

526 

527 

528def _update_hint(buffer: Buffer): 

529 if buffer.auto_suggest: 

530 suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document) 

531 buffer.suggestion = suggestion 

532 

533 

534def backspace_and_resume_hint(event: KeyPressEvent): 

535 """Resume autosuggestions after deleting last character""" 

536 nc.backward_delete_char(event) 

537 _update_hint(event.current_buffer) 

538 

539 

540def resume_hinting(event: KeyPressEvent): 

541 """Resume autosuggestions""" 

542 pass_through.reply(event) 

543 # Order matters: if update happened first and event reply second, the 

544 # suggestion would be auto-accepted if both actions are bound to same key. 

545 _update_hint(event.current_buffer) 

546 

547 

548def up_and_update_hint(event: KeyPressEvent): 

549 """Go up and update hint""" 

550 current_buffer = event.current_buffer 

551 

552 current_buffer.auto_up(count=event.arg) 

553 _update_hint(current_buffer) 

554 

555 

556def down_and_update_hint(event: KeyPressEvent): 

557 """Go down and update hint""" 

558 current_buffer = event.current_buffer 

559 

560 current_buffer.auto_down(count=event.arg) 

561 _update_hint(current_buffer) 

562 

563 

564def accept_token(event: KeyPressEvent): 

565 """Fill partial autosuggestion by token""" 

566 b = event.current_buffer 

567 suggestion = b.suggestion 

568 

569 if suggestion: 

570 prefix = _get_query(b.document) 

571 text = prefix + suggestion.text 

572 

573 tokens: List[Optional[str]] = [None, None, None] 

574 substrings = [""] 

575 i = 0 

576 

577 for token in generate_tokens(StringIO(text).readline): 

578 if token.type == tokenize.NEWLINE: 

579 index = len(text) 

580 else: 

581 index = text.index(token[1], len(substrings[-1])) 

582 substrings.append(text[:index]) 

583 tokenized_so_far = substrings[-1] 

584 if tokenized_so_far.startswith(prefix): 

585 if i == 0 and len(tokenized_so_far) > len(prefix): 

586 tokens[0] = tokenized_so_far[len(prefix) :] 

587 substrings.append(tokenized_so_far) 

588 i += 1 

589 tokens[i] = token[1] 

590 if i == 2: 

591 break 

592 i += 1 

593 

594 if tokens[0]: 

595 to_insert: str 

596 insert_text = substrings[-2] 

597 if tokens[1] and len(tokens[1]) == 1: 

598 insert_text = substrings[-1] 

599 to_insert = insert_text[len(prefix) :] 

600 b.insert_text(to_insert) 

601 return 

602 

603 nc.forward_word(event) 

604 

605 

606Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None] 

607 

608 

609def _swap_autosuggestion( 

610 buffer: Buffer, 

611 provider: NavigableAutoSuggestFromHistory, 

612 direction_method: Callable, 

613): 

614 """ 

615 We skip most recent history entry (in either direction) if it equals the 

616 current autosuggestion because if user cycles when auto-suggestion is shown 

617 they most likely want something else than what was suggested (otherwise 

618 they would have accepted the suggestion). 

619 """ 

620 suggestion = buffer.suggestion 

621 if not suggestion: 

622 return 

623 

624 query = _get_query(buffer.document) 

625 current = query + suggestion.text 

626 

627 direction_method(query=query, other_than=current, history=buffer.history) 

628 

629 new_suggestion = provider.get_suggestion(buffer, buffer.document) 

630 buffer.suggestion = new_suggestion 

631 

632 

633def swap_autosuggestion_up(event: KeyPressEvent): 

634 """Get next autosuggestion from history.""" 

635 shell = get_ipython() 

636 provider = shell.auto_suggest 

637 

638 if not isinstance(provider, NavigableAutoSuggestFromHistory): 

639 return 

640 

641 return _swap_autosuggestion( 

642 buffer=event.current_buffer, provider=provider, direction_method=provider.up 

643 ) 

644 

645 

646def swap_autosuggestion_down(event: KeyPressEvent): 

647 """Get previous autosuggestion from history.""" 

648 shell = get_ipython() 

649 provider = shell.auto_suggest 

650 

651 if not isinstance(provider, NavigableAutoSuggestFromHistory): 

652 return 

653 

654 return _swap_autosuggestion( 

655 buffer=event.current_buffer, 

656 provider=provider, 

657 direction_method=provider.down, 

658 )