Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/terminal/shortcuts/auto_suggest.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

322 statements  

1import re 

2import asyncio 

3import tokenize 

4from io import StringIO 

5from typing import Callable, List, Optional, Union, Generator, Tuple, ClassVar, Any 

6import warnings 

7 

8import prompt_toolkit 

9from prompt_toolkit.buffer import Buffer 

10from prompt_toolkit.key_binding import KeyPressEvent 

11from prompt_toolkit.key_binding.bindings import named_commands as nc 

12from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion 

13from prompt_toolkit.document import Document 

14from prompt_toolkit.history import History 

15from prompt_toolkit.shortcuts import PromptSession 

16from prompt_toolkit.layout.processors import ( 

17 Processor, 

18 Transformation, 

19 TransformationInput, 

20) 

21 

22from IPython.core.getipython import get_ipython 

23from IPython.utils.tokenutil import generate_tokens 

24 

25from .filters import pass_through 

26 

27 

28def _get_query(document: Document): 

29 return document.lines[document.cursor_position_row] 

30 

31 

32class AppendAutoSuggestionInAnyLine(Processor): 

33 """ 

34 Append the auto suggestion to lines other than the last (appending to the 

35 last line is natively supported by the prompt toolkit). 

36 

37 This has a private `_debug` attribute that can be set to True to display 

38 debug information as virtual suggestion on the end of any line. You can do 

39 so with: 

40 

41 >>> from IPython.terminal.shortcuts.auto_suggest import AppendAutoSuggestionInAnyLine 

42 >>> AppendAutoSuggestionInAnyLine._debug = True 

43 

44 """ 

45 

46 _debug: ClassVar[bool] = False 

47 

48 def __init__(self, style: str = "class:auto-suggestion") -> None: 

49 self.style = style 

50 

51 def apply_transformation(self, ti: TransformationInput) -> Transformation: 

52 """ 

53 Apply transformation to the line that is currently being edited. 

54 

55 This is a variation of the original implementation in prompt toolkit 

56 that allows to not only append suggestions to any line, but also to show 

57 multi-line suggestions. 

58 

59 As transformation are applied on a line-by-line basis; we need to trick 

60 a bit, and elide any line that is after the line we are currently 

61 editing, until we run out of completions. We cannot shift the existing 

62 lines 

63 

64 There are multiple cases to handle: 

65 

66 The completions ends before the end of the buffer: 

67 We can resume showing the normal line, and say that some code may 

68 be hidden. 

69 

70 The completions ends at the end of the buffer 

71 We can just say that some code may be hidden. 

72 

73 And separately: 

74 

75 The completions ends beyond the end of the buffer 

76 We need to both say that some code may be hidden, and that some 

77 lines are not shown. 

78 

79 """ 

80 last_line_number = ti.document.line_count - 1 

81 is_last_line = ti.lineno == last_line_number 

82 

83 noop = lambda text: Transformation( 

84 fragments=ti.fragments + [(self.style, " " + text if self._debug else "")] 

85 ) 

86 if ti.document.line_count == 1: 

87 return noop("noop:oneline") 

88 if ti.document.cursor_position_row == last_line_number and is_last_line: 

89 # prompt toolkit already appends something; just leave it be 

90 return noop("noop:last line and cursor") 

91 

92 # first everything before the current line is unchanged. 

93 if ti.lineno < ti.document.cursor_position_row: 

94 return noop("noop:before cursor") 

95 

96 buffer = ti.buffer_control.buffer 

97 if not buffer.suggestion or not ti.document.is_cursor_at_the_end_of_line: 

98 return noop("noop:not eol") 

99 

100 delta = ti.lineno - ti.document.cursor_position_row 

101 suggestions = buffer.suggestion.text.splitlines() 

102 

103 if len(suggestions) == 0: 

104 return noop("noop: no suggestions") 

105 

106 if prompt_toolkit.VERSION < (3, 0, 49): 

107 if len(suggestions) > 1 and prompt_toolkit.VERSION < (3, 0, 49): 

108 if ti.lineno == ti.document.cursor_position_row: 

109 return Transformation( 

110 fragments=ti.fragments 

111 + [ 

112 ( 

113 "red", 

114 "(Cannot show multiline suggestion; requires prompt_toolkit > 3.0.49)", 

115 ) 

116 ] 

117 ) 

118 else: 

119 return Transformation(fragments=ti.fragments) 

120 elif len(suggestions) == 1: 

121 if ti.lineno == ti.document.cursor_position_row: 

122 return Transformation( 

123 fragments=ti.fragments + [(self.style, suggestions[0])] 

124 ) 

125 return Transformation(fragments=ti.fragments) 

126 

127 if delta == 0: 

128 suggestion = suggestions[0] 

129 return Transformation(fragments=ti.fragments + [(self.style, suggestion)]) 

130 if is_last_line: 

131 if delta < len(suggestions): 

132 suggestion = f"… rest of suggestion ({len(suggestions) - delta} lines) and code hidden" 

133 return Transformation([(self.style, suggestion)]) 

134 

135 n_elided = len(suggestions) 

136 for i in range(len(suggestions)): 

137 ll = ti.get_line(last_line_number - i) 

138 el = "".join(l[1] for l in ll).strip() 

139 if el: 

140 break 

141 else: 

142 n_elided -= 1 

143 if n_elided: 

144 return Transformation([(self.style, f"… {n_elided} line(s) hidden")]) 

145 else: 

146 return Transformation( 

147 ti.get_line(last_line_number - len(suggestions) + 1) 

148 + ([(self.style, "shift-last-line")] if self._debug else []) 

149 ) 

150 

151 elif delta < len(suggestions): 

152 suggestion = suggestions[delta] 

153 return Transformation([(self.style, suggestion)]) 

154 else: 

155 shift = ti.lineno - len(suggestions) + 1 

156 return Transformation(ti.get_line(shift)) 

157 

158 

159class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory): 

160 """ 

161 A subclass of AutoSuggestFromHistory that allow navigation to next/previous 

162 suggestion from history. To do so it remembers the current position, but it 

163 state need to carefully be cleared on the right events. 

164 """ 

165 

166 skip_lines: int 

167 _connected_apps: list[PromptSession] 

168 

169 # handle to the currently running llm task that appends suggestions to the 

170 # current buffer; we keep a handle to it in order to cancel it when there is a cursor movement, or 

171 # another request. 

172 _llm_task: asyncio.Task | None = None 

173 

174 # This is the constructor of the LLM provider from jupyter-ai 

175 # to which we forward the request to generate inline completions. 

176 _init_llm_provider: Callable | None 

177 

178 _llm_provider_instance: Any | None 

179 _llm_prefixer: Callable = lambda self, x: "wrong" 

180 

181 def __init__(self): 

182 super().__init__() 

183 self.skip_lines = 0 

184 self._connected_apps = [] 

185 self._llm_provider_instance = None 

186 self._init_llm_provider = None 

187 self._request_number = 0 

188 

189 def reset_history_position(self, _: Buffer) -> None: 

190 self.skip_lines = 0 

191 

192 def disconnect(self) -> None: 

193 self._cancel_running_llm_task() 

194 for pt_app in self._connected_apps: 

195 text_insert_event = pt_app.default_buffer.on_text_insert 

196 text_insert_event.remove_handler(self.reset_history_position) 

197 

198 def connect(self, pt_app: PromptSession) -> None: 

199 self._connected_apps.append(pt_app) 

200 # note: `on_text_changed` could be used for a bit different behaviour 

201 # on character deletion (i.e. resetting history position on backspace) 

202 pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position) 

203 pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss) 

204 

205 def get_suggestion( 

206 self, buffer: Buffer, document: Document 

207 ) -> Optional[Suggestion]: 

208 text = _get_query(document) 

209 

210 if text.strip(): 

211 for suggestion, _ in self._find_next_match( 

212 text, self.skip_lines, buffer.history 

213 ): 

214 return Suggestion(suggestion) 

215 

216 return None 

217 

218 def _dismiss(self, buffer, *args, **kwargs) -> None: 

219 self._cancel_running_llm_task() 

220 buffer.suggestion = None 

221 

222 def _find_match( 

223 self, text: str, skip_lines: float, history: History, previous: bool 

224 ) -> Generator[Tuple[str, float], None, None]: 

225 """ 

226 text : str 

227 Text content to find a match for, the user cursor is most of the 

228 time at the end of this text. 

229 skip_lines : float 

230 number of items to skip in the search, this is used to indicate how 

231 far in the list the user has navigated by pressing up or down. 

232 The float type is used as the base value is +inf 

233 history : History 

234 prompt_toolkit History instance to fetch previous entries from. 

235 previous : bool 

236 Direction of the search, whether we are looking previous match 

237 (True), or next match (False). 

238 

239 Yields 

240 ------ 

241 Tuple with: 

242 str: 

243 current suggestion. 

244 float: 

245 will actually yield only ints, which is passed back via skip_lines, 

246 which may be a +inf (float) 

247 

248 

249 """ 

250 line_number = -1 

251 for string in reversed(list(history.get_strings())): 

252 for line in reversed(string.splitlines()): 

253 line_number += 1 

254 if not previous and line_number < skip_lines: 

255 continue 

256 # do not return empty suggestions as these 

257 # close the auto-suggestion overlay (and are useless) 

258 if line.startswith(text) and len(line) > len(text): 

259 yield line[len(text) :], line_number 

260 if previous and line_number >= skip_lines: 

261 return 

262 

263 def _find_next_match( 

264 self, text: str, skip_lines: float, history: History 

265 ) -> Generator[Tuple[str, float], None, None]: 

266 return self._find_match(text, skip_lines, history, previous=False) 

267 

268 def _find_previous_match(self, text: str, skip_lines: float, history: History): 

269 return reversed( 

270 list(self._find_match(text, skip_lines, history, previous=True)) 

271 ) 

272 

273 def up(self, query: str, other_than: str, history: History) -> None: 

274 self._cancel_running_llm_task() 

275 for suggestion, line_number in self._find_next_match( 

276 query, self.skip_lines, history 

277 ): 

278 # if user has history ['very.a', 'very', 'very.b'] and typed 'very' 

279 # we want to switch from 'very.b' to 'very.a' because a) if the 

280 # suggestion equals current text, prompt-toolkit aborts suggesting 

281 # b) user likely would not be interested in 'very' anyways (they 

282 # already typed it). 

283 if query + suggestion != other_than: 

284 self.skip_lines = line_number 

285 break 

286 else: 

287 # no matches found, cycle back to beginning 

288 self.skip_lines = 0 

289 

290 def down(self, query: str, other_than: str, history: History) -> None: 

291 self._cancel_running_llm_task() 

292 for suggestion, line_number in self._find_previous_match( 

293 query, self.skip_lines, history 

294 ): 

295 if query + suggestion != other_than: 

296 self.skip_lines = line_number 

297 break 

298 else: 

299 # no matches found, cycle to end 

300 for suggestion, line_number in self._find_previous_match( 

301 query, float("Inf"), history 

302 ): 

303 if query + suggestion != other_than: 

304 self.skip_lines = line_number 

305 break 

306 

307 def _cancel_running_llm_task(self) -> None: 

308 """ 

309 Try to cancel the currently running llm_task if exists, and set it to None. 

310 """ 

311 if self._llm_task is not None: 

312 if self._llm_task.done(): 

313 self._llm_task = None 

314 return 

315 cancelled = self._llm_task.cancel() 

316 if cancelled: 

317 self._llm_task = None 

318 if not cancelled: 

319 warnings.warn( 

320 "LLM task not cancelled, does your provider support cancellation?" 

321 ) 

322 

323 @property 

324 def _llm_provider(self): 

325 """Lazy-initialized instance of the LLM provider. 

326 

327 Do not use in the constructor, as `_init_llm_provider` can trigger slow side-effects. 

328 """ 

329 if self._llm_provider_instance is None and self._init_llm_provider: 

330 self._llm_provider_instance = self._init_llm_provider() 

331 return self._llm_provider_instance 

332 

333 async def _trigger_llm(self, buffer) -> None: 

334 """ 

335 This will ask the current llm provider a suggestion for the current buffer. 

336 

337 If there is a currently running llm task, it will cancel it. 

338 """ 

339 # we likely want to store the current cursor position, and cancel if the cursor has moved. 

340 try: 

341 import jupyter_ai_magics 

342 except ModuleNotFoundError: 

343 jupyter_ai_magics = None 

344 if not self._llm_provider: 

345 warnings.warn("No LLM provider found, cannot trigger LLM completions") 

346 return 

347 if jupyter_ai_magics is None: 

348 warnings.warn("LLM Completion requires `jupyter_ai_magics` to be installed") 

349 

350 self._cancel_running_llm_task() 

351 

352 async def error_catcher(buffer): 

353 """ 

354 This catches and log any errors, as otherwise this is just 

355 lost in the void of the future running task. 

356 """ 

357 try: 

358 await self._trigger_llm_core(buffer) 

359 except Exception as e: 

360 get_ipython().log.error("error %s", e) 

361 raise 

362 

363 # here we need a cancellable task so we can't just await the error caught 

364 self._llm_task = asyncio.create_task(error_catcher(buffer)) 

365 await self._llm_task 

366 

367 async def _trigger_llm_core(self, buffer: Buffer): 

368 """ 

369 This is the core of the current llm request. 

370 

371 Here we build a compatible `InlineCompletionRequest` and ask the llm 

372 provider to stream it's response back to us iteratively setting it as 

373 the suggestion on the current buffer. 

374 

375 Unlike with JupyterAi, as we do not have multiple cells, the cell id 

376 is always set to `None`. 

377 

378 We set the prefix to the current cell content, but could also insert the 

379 rest of the history or even just the non-fail history. 

380 

381 In the same way, we do not have cell id. 

382 

383 LLM provider may return multiple suggestion stream, but for the time 

384 being we only support one. 

385 

386 Here we make the assumption that the provider will have 

387 stream_inline_completions, I'm not sure it is the case for all 

388 providers. 

389 """ 

390 try: 

391 import jupyter_ai.completions.models as jai_models 

392 except ModuleNotFoundError: 

393 jai_models = None 

394 

395 if not jai_models: 

396 raise ValueError("jupyter-ai is not installed") 

397 

398 if not self._llm_provider: 

399 raise ValueError("No LLM provider found, cannot trigger LLM completions") 

400 

401 hm = buffer.history.shell.history_manager 

402 prefix = self._llm_prefixer(hm) 

403 get_ipython().log.debug("prefix: %s", prefix) 

404 

405 self._request_number += 1 

406 request_number = self._request_number 

407 

408 request = jai_models.InlineCompletionRequest( 

409 number=request_number, 

410 prefix=prefix + buffer.document.text_before_cursor, 

411 suffix=buffer.document.text_after_cursor, 

412 mime="text/x-python", 

413 stream=True, 

414 path=None, 

415 language="python", 

416 cell_id=None, 

417 ) 

418 

419 async for reply_and_chunks in self._llm_provider.stream_inline_completions( 

420 request 

421 ): 

422 if self._request_number != request_number: 

423 # If a new suggestion was requested, skip processing this one. 

424 return 

425 if isinstance(reply_and_chunks, jai_models.InlineCompletionReply): 

426 if len(reply_and_chunks.list.items) > 1: 

427 raise ValueError( 

428 "Terminal IPython cannot deal with multiple LLM suggestions at once" 

429 ) 

430 buffer.suggestion = Suggestion( 

431 reply_and_chunks.list.items[0].insertText 

432 ) 

433 buffer.on_suggestion_set.fire() 

434 elif isinstance(reply_and_chunks, jai_models.InlineCompletionStreamChunk): 

435 buffer.suggestion = Suggestion(reply_and_chunks.response.insertText) 

436 buffer.on_suggestion_set.fire() 

437 return 

438 

439 

440async def llm_autosuggestion(event: KeyPressEvent): 

441 """ 

442 Ask the AutoSuggester from history to delegate to ask an LLM for completion 

443 

444 This will first make sure that the current buffer have _MIN_LINES (7) 

445 available lines to insert the LLM completion 

446 

447 Provisional as of 8.32, may change without warnings 

448 

449 """ 

450 _MIN_LINES = 5 

451 provider = get_ipython().auto_suggest 

452 if not isinstance(provider, NavigableAutoSuggestFromHistory): 

453 return 

454 doc = event.current_buffer.document 

455 lines_to_insert = max(0, _MIN_LINES - doc.line_count + doc.cursor_position_row) 

456 for _ in range(lines_to_insert): 

457 event.current_buffer.insert_text("\n", move_cursor=False, fire_event=False) 

458 

459 await provider._trigger_llm(event.current_buffer) 

460 

461 

462def accept_or_jump_to_end(event: KeyPressEvent): 

463 """Apply autosuggestion or jump to end of line.""" 

464 buffer = event.current_buffer 

465 d = buffer.document 

466 after_cursor = d.text[d.cursor_position :] 

467 lines = after_cursor.split("\n") 

468 end_of_current_line = lines[0].strip() 

469 suggestion = buffer.suggestion 

470 if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""): 

471 buffer.insert_text(suggestion.text) 

472 else: 

473 nc.end_of_line(event) 

474 

475 

476def accept(event: KeyPressEvent): 

477 """Accept autosuggestion""" 

478 buffer = event.current_buffer 

479 suggestion = buffer.suggestion 

480 if suggestion: 

481 buffer.insert_text(suggestion.text) 

482 else: 

483 nc.forward_char(event) 

484 

485 

486def discard(event: KeyPressEvent): 

487 """Discard autosuggestion""" 

488 buffer = event.current_buffer 

489 buffer.suggestion = None 

490 

491 

492def accept_word(event: KeyPressEvent): 

493 """Fill partial autosuggestion by word""" 

494 buffer = event.current_buffer 

495 suggestion = buffer.suggestion 

496 if suggestion: 

497 t = re.split(r"(\S+\s+)", suggestion.text) 

498 buffer.insert_text(next((x for x in t if x), "")) 

499 else: 

500 nc.forward_word(event) 

501 

502 

503def accept_character(event: KeyPressEvent): 

504 """Fill partial autosuggestion by character""" 

505 b = event.current_buffer 

506 suggestion = b.suggestion 

507 if suggestion and suggestion.text: 

508 b.insert_text(suggestion.text[0]) 

509 

510 

511def accept_and_keep_cursor(event: KeyPressEvent): 

512 """Accept autosuggestion and keep cursor in place""" 

513 buffer = event.current_buffer 

514 old_position = buffer.cursor_position 

515 suggestion = buffer.suggestion 

516 if suggestion: 

517 buffer.insert_text(suggestion.text) 

518 buffer.cursor_position = old_position 

519 

520 

521def accept_and_move_cursor_left(event: KeyPressEvent): 

522 """Accept autosuggestion and move cursor left in place""" 

523 accept_and_keep_cursor(event) 

524 nc.backward_char(event) 

525 

526 

527def _update_hint(buffer: Buffer): 

528 if buffer.auto_suggest: 

529 suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document) 

530 buffer.suggestion = suggestion 

531 

532 

533def backspace_and_resume_hint(event: KeyPressEvent): 

534 """Resume autosuggestions after deleting last character""" 

535 nc.backward_delete_char(event) 

536 _update_hint(event.current_buffer) 

537 

538 

539def resume_hinting(event: KeyPressEvent): 

540 """Resume autosuggestions""" 

541 pass_through.reply(event) 

542 # Order matters: if update happened first and event reply second, the 

543 # suggestion would be auto-accepted if both actions are bound to same key. 

544 _update_hint(event.current_buffer) 

545 

546 

547def up_and_update_hint(event: KeyPressEvent): 

548 """Go up and update hint""" 

549 current_buffer = event.current_buffer 

550 

551 current_buffer.auto_up(count=event.arg) 

552 _update_hint(current_buffer) 

553 

554 

555def down_and_update_hint(event: KeyPressEvent): 

556 """Go down and update hint""" 

557 current_buffer = event.current_buffer 

558 

559 current_buffer.auto_down(count=event.arg) 

560 _update_hint(current_buffer) 

561 

562 

563def accept_token(event: KeyPressEvent): 

564 """Fill partial autosuggestion by token""" 

565 b = event.current_buffer 

566 suggestion = b.suggestion 

567 

568 if suggestion: 

569 prefix = _get_query(b.document) 

570 text = prefix + suggestion.text 

571 

572 tokens: List[Optional[str]] = [None, None, None] 

573 substrings = [""] 

574 i = 0 

575 

576 for token in generate_tokens(StringIO(text).readline): 

577 if token.type == tokenize.NEWLINE: 

578 index = len(text) 

579 else: 

580 index = text.index(token[1], len(substrings[-1])) 

581 substrings.append(text[:index]) 

582 tokenized_so_far = substrings[-1] 

583 if tokenized_so_far.startswith(prefix): 

584 if i == 0 and len(tokenized_so_far) > len(prefix): 

585 tokens[0] = tokenized_so_far[len(prefix) :] 

586 substrings.append(tokenized_so_far) 

587 i += 1 

588 tokens[i] = token[1] 

589 if i == 2: 

590 break 

591 i += 1 

592 

593 if tokens[0]: 

594 to_insert: str 

595 insert_text = substrings[-2] 

596 if tokens[1] and len(tokens[1]) == 1: 

597 insert_text = substrings[-1] 

598 to_insert = insert_text[len(prefix) :] 

599 b.insert_text(to_insert) 

600 return 

601 

602 nc.forward_word(event) 

603 

604 

605Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None] 

606 

607 

608def _swap_autosuggestion( 

609 buffer: Buffer, 

610 provider: NavigableAutoSuggestFromHistory, 

611 direction_method: Callable, 

612): 

613 """ 

614 We skip most recent history entry (in either direction) if it equals the 

615 current autosuggestion because if user cycles when auto-suggestion is shown 

616 they most likely want something else than what was suggested (otherwise 

617 they would have accepted the suggestion). 

618 """ 

619 suggestion = buffer.suggestion 

620 if not suggestion: 

621 return 

622 

623 query = _get_query(buffer.document) 

624 current = query + suggestion.text 

625 

626 direction_method(query=query, other_than=current, history=buffer.history) 

627 

628 new_suggestion = provider.get_suggestion(buffer, buffer.document) 

629 buffer.suggestion = new_suggestion 

630 

631 

632def swap_autosuggestion_up(event: KeyPressEvent): 

633 """Get next autosuggestion from history.""" 

634 shell = get_ipython() 

635 provider = shell.auto_suggest 

636 

637 if not isinstance(provider, NavigableAutoSuggestFromHistory): 

638 return 

639 

640 return _swap_autosuggestion( 

641 buffer=event.current_buffer, provider=provider, direction_method=provider.up 

642 ) 

643 

644 

645def swap_autosuggestion_down(event: KeyPressEvent): 

646 """Get previous autosuggestion from history.""" 

647 shell = get_ipython() 

648 provider = shell.auto_suggest 

649 

650 if not isinstance(provider, NavigableAutoSuggestFromHistory): 

651 return 

652 

653 return _swap_autosuggestion( 

654 buffer=event.current_buffer, 

655 provider=provider, 

656 direction_method=provider.down, 

657 )