1""" """
2
3from __future__ import annotations
4
5from abc import ABCMeta, abstractmethod
6from typing import AsyncGenerator, Callable, Iterable, Sequence
7
8from prompt_toolkit.document import Document
9from prompt_toolkit.eventloop import aclosing, generator_to_async_generator
10from prompt_toolkit.filters import FilterOrBool, to_filter
11from prompt_toolkit.formatted_text import AnyFormattedText, StyleAndTextTuples
12
13__all__ = [
14 "Completion",
15 "Completer",
16 "ThreadedCompleter",
17 "DummyCompleter",
18 "DynamicCompleter",
19 "CompleteEvent",
20 "ConditionalCompleter",
21 "merge_completers",
22 "get_common_complete_suffix",
23]
24
25
26class Completion:
27 """
28 :param text: The new string that will be inserted into the document.
29 :param start_position: Position relative to the cursor_position where the
30 new text will start. The text will be inserted between the
31 start_position and the original cursor position.
32 :param display: (optional string or formatted text) If the completion has
33 to be displayed differently in the completion menu.
34 :param display_meta: (Optional string or formatted text) Meta information
35 about the completion, e.g. the path or source where it's coming from.
36 This can also be a callable that returns a string.
37 :param style: Style string.
38 :param selected_style: Style string, used for a selected completion.
39 This can override the `style` parameter.
40 """
41
42 def __init__(
43 self,
44 text: str,
45 start_position: int = 0,
46 display: AnyFormattedText | None = None,
47 display_meta: AnyFormattedText | None = None,
48 style: str = "",
49 selected_style: str = "",
50 ) -> None:
51 from prompt_toolkit.formatted_text import to_formatted_text
52
53 self.text = text
54 self.start_position = start_position
55 self._display_meta = display_meta
56
57 if display is None:
58 display = text
59
60 self.display = to_formatted_text(display)
61
62 self.style = style
63 self.selected_style = selected_style
64
65 assert self.start_position <= 0
66
67 def __repr__(self) -> str:
68 if isinstance(self.display, str) and self.display == self.text:
69 return f"{self.__class__.__name__}(text={self.text!r}, start_position={self.start_position!r})"
70 else:
71 return f"{self.__class__.__name__}(text={self.text!r}, start_position={self.start_position!r}, display={self.display!r})"
72
73 def __eq__(self, other: object) -> bool:
74 if not isinstance(other, Completion):
75 return False
76 return (
77 self.text == other.text
78 and self.start_position == other.start_position
79 and self.display == other.display
80 and self._display_meta == other._display_meta
81 )
82
83 def __hash__(self) -> int:
84 return hash((self.text, self.start_position, self.display, self._display_meta))
85
86 @property
87 def display_text(self) -> str:
88 "The 'display' field as plain text."
89 from prompt_toolkit.formatted_text import fragment_list_to_text
90
91 return fragment_list_to_text(self.display)
92
93 @property
94 def display_meta(self) -> StyleAndTextTuples:
95 "Return meta-text. (This is lazy when using a callable)."
96 from prompt_toolkit.formatted_text import to_formatted_text
97
98 return to_formatted_text(self._display_meta or "")
99
100 @property
101 def display_meta_text(self) -> str:
102 "The 'meta' field as plain text."
103 from prompt_toolkit.formatted_text import fragment_list_to_text
104
105 return fragment_list_to_text(self.display_meta)
106
107 def new_completion_from_position(self, position: int) -> Completion:
108 """
109 (Only for internal use!)
110 Get a new completion by splitting this one. Used by `Application` when
111 it needs to have a list of new completions after inserting the common
112 prefix.
113 """
114 assert position - self.start_position >= 0
115
116 return Completion(
117 text=self.text[position - self.start_position :],
118 display=self.display,
119 display_meta=self._display_meta,
120 )
121
122
123class CompleteEvent:
124 """
125 Event that called the completer.
126
127 :param text_inserted: When True, it means that completions are requested
128 because of a text insert. (`Buffer.complete_while_typing`.)
129 :param completion_requested: When True, it means that the user explicitly
130 pressed the `Tab` key in order to view the completions.
131
132 These two flags can be used for instance to implement a completer that
133 shows some completions when ``Tab`` has been pressed, but not
134 automatically when the user presses a space. (Because of
135 `complete_while_typing`.)
136 """
137
138 def __init__(
139 self, text_inserted: bool = False, completion_requested: bool = False
140 ) -> None:
141 assert not (text_inserted and completion_requested)
142
143 #: Automatic completion while typing.
144 self.text_inserted = text_inserted
145
146 #: Used explicitly requested completion by pressing 'tab'.
147 self.completion_requested = completion_requested
148
149 def __repr__(self) -> str:
150 return f"{self.__class__.__name__}(text_inserted={self.text_inserted!r}, completion_requested={self.completion_requested!r})"
151
152
153class Completer(metaclass=ABCMeta):
154 """
155 Base class for completer implementations.
156 """
157
158 @abstractmethod
159 def get_completions(
160 self, document: Document, complete_event: CompleteEvent
161 ) -> Iterable[Completion]:
162 """
163 This should be a generator that yields :class:`.Completion` instances.
164
165 If the generation of completions is something expensive (that takes a
166 lot of time), consider wrapping this `Completer` class in a
167 `ThreadedCompleter`. In that case, the completer algorithm runs in a
168 background thread and completions will be displayed as soon as they
169 arrive.
170
171 :param document: :class:`~prompt_toolkit.document.Document` instance.
172 :param complete_event: :class:`.CompleteEvent` instance.
173 """
174 while False:
175 yield
176
177 async def get_completions_async(
178 self, document: Document, complete_event: CompleteEvent
179 ) -> AsyncGenerator[Completion, None]:
180 """
181 Asynchronous generator for completions. (Probably, you won't have to
182 override this.)
183
184 Asynchronous generator of :class:`.Completion` objects.
185 """
186 for item in self.get_completions(document, complete_event):
187 yield item
188
189
190class ThreadedCompleter(Completer):
191 """
192 Wrapper that runs the `get_completions` generator in a thread.
193
194 (Use this to prevent the user interface from becoming unresponsive if the
195 generation of completions takes too much time.)
196
197 The completions will be displayed as soon as they are produced. The user
198 can already select a completion, even if not all completions are displayed.
199 """
200
201 def __init__(self, completer: Completer) -> None:
202 self.completer = completer
203
204 def get_completions(
205 self, document: Document, complete_event: CompleteEvent
206 ) -> Iterable[Completion]:
207 return self.completer.get_completions(document, complete_event)
208
209 async def get_completions_async(
210 self, document: Document, complete_event: CompleteEvent
211 ) -> AsyncGenerator[Completion, None]:
212 """
213 Asynchronous generator of completions.
214 """
215 # NOTE: Right now, we are consuming the `get_completions` generator in
216 # a synchronous background thread, then passing the results one
217 # at a time over a queue, and consuming this queue in the main
218 # thread (that's what `generator_to_async_generator` does). That
219 # means that if the completer is *very* slow, we'll be showing
220 # completions in the UI once they are computed.
221
222 # It's very tempting to replace this implementation with the
223 # commented code below for several reasons:
224
225 # - `generator_to_async_generator` is not perfect and hard to get
226 # right. It's a lot of complexity for little gain. The
227 # implementation needs a huge buffer for it to be efficient
228 # when there are many completions (like 50k+).
229 # - Normally, a completer is supposed to be fast, users can have
230 # "complete while typing" enabled, and want to see the
231 # completions within a second. Handling one completion at a
232 # time, and rendering once we get it here doesn't make any
233 # sense if this is quick anyway.
234 # - Completers like `FuzzyCompleter` prepare all completions
235 # anyway so that they can be sorted by accuracy before they are
236 # yielded. At the point that we start yielding completions
237 # here, we already have all completions.
238 # - The `Buffer` class has complex logic to invalidate the UI
239 # while it is consuming the completions. We don't want to
240 # invalidate the UI for every completion (if there are many),
241 # but we want to do it often enough so that completions are
242 # being displayed while they are produced.
243
244 # We keep the current behavior mainly for backward-compatibility.
245 # Similarly, it would be better for this function to not return
246 # an async generator, but simply be a coroutine that returns a
247 # list of `Completion` objects, containing all completions at
248 # once.
249
250 # Note that this argument doesn't mean we shouldn't use
251 # `ThreadedCompleter`. It still makes sense to produce
252 # completions in a background thread, because we don't want to
253 # freeze the UI while the user is typing. But sending the
254 # completions one at a time to the UI maybe isn't worth it.
255
256 # def get_all_in_thread() -> List[Completion]:
257 # return list(self.get_completions(document, complete_event))
258
259 # completions = await get_running_loop().run_in_executor(None, get_all_in_thread)
260 # for completion in completions:
261 # yield completion
262
263 async with aclosing(
264 generator_to_async_generator(
265 lambda: self.completer.get_completions(document, complete_event)
266 )
267 ) as async_generator:
268 async for completion in async_generator:
269 yield completion
270
271 def __repr__(self) -> str:
272 return f"ThreadedCompleter({self.completer!r})"
273
274
275class DummyCompleter(Completer):
276 """
277 A completer that doesn't return any completion.
278 """
279
280 def get_completions(
281 self, document: Document, complete_event: CompleteEvent
282 ) -> Iterable[Completion]:
283 return []
284
285 def __repr__(self) -> str:
286 return "DummyCompleter()"
287
288
289class DynamicCompleter(Completer):
290 """
291 Completer class that can dynamically returns any Completer.
292
293 :param get_completer: Callable that returns a :class:`.Completer` instance.
294 """
295
296 def __init__(self, get_completer: Callable[[], Completer | None]) -> None:
297 self.get_completer = get_completer
298
299 def get_completions(
300 self, document: Document, complete_event: CompleteEvent
301 ) -> Iterable[Completion]:
302 completer = self.get_completer() or DummyCompleter()
303 return completer.get_completions(document, complete_event)
304
305 async def get_completions_async(
306 self, document: Document, complete_event: CompleteEvent
307 ) -> AsyncGenerator[Completion, None]:
308 completer = self.get_completer() or DummyCompleter()
309
310 async for completion in completer.get_completions_async(
311 document, complete_event
312 ):
313 yield completion
314
315 def __repr__(self) -> str:
316 return f"DynamicCompleter({self.get_completer!r} -> {self.get_completer()!r})"
317
318
319class ConditionalCompleter(Completer):
320 """
321 Wrapper around any other completer that will enable/disable the completions
322 depending on whether the received condition is satisfied.
323
324 :param completer: :class:`.Completer` instance.
325 :param filter: :class:`.Filter` instance.
326 """
327
328 def __init__(self, completer: Completer, filter: FilterOrBool) -> None:
329 self.completer = completer
330 self.filter = to_filter(filter)
331
332 def __repr__(self) -> str:
333 return f"ConditionalCompleter({self.completer!r}, filter={self.filter!r})"
334
335 def get_completions(
336 self, document: Document, complete_event: CompleteEvent
337 ) -> Iterable[Completion]:
338 # Get all completions in a blocking way.
339 if self.filter():
340 yield from self.completer.get_completions(document, complete_event)
341
342 async def get_completions_async(
343 self, document: Document, complete_event: CompleteEvent
344 ) -> AsyncGenerator[Completion, None]:
345 # Get all completions in a non-blocking way.
346 if self.filter():
347 async with aclosing(
348 self.completer.get_completions_async(document, complete_event)
349 ) as async_generator:
350 async for item in async_generator:
351 yield item
352
353
354class _MergedCompleter(Completer):
355 """
356 Combine several completers into one.
357 """
358
359 def __init__(self, completers: Sequence[Completer]) -> None:
360 self.completers = completers
361
362 def get_completions(
363 self, document: Document, complete_event: CompleteEvent
364 ) -> Iterable[Completion]:
365 # Get all completions from the other completers in a blocking way.
366 for completer in self.completers:
367 yield from completer.get_completions(document, complete_event)
368
369 async def get_completions_async(
370 self, document: Document, complete_event: CompleteEvent
371 ) -> AsyncGenerator[Completion, None]:
372 # Get all completions from the other completers in a non-blocking way.
373 for completer in self.completers:
374 async with aclosing(
375 completer.get_completions_async(document, complete_event)
376 ) as async_generator:
377 async for item in async_generator:
378 yield item
379
380
381def merge_completers(
382 completers: Sequence[Completer], deduplicate: bool = False
383) -> Completer:
384 """
385 Combine several completers into one.
386
387 :param deduplicate: If `True`, wrap the result in a `DeduplicateCompleter`
388 so that completions that would result in the same text will be
389 deduplicated.
390 """
391 if deduplicate:
392 from .deduplicate import DeduplicateCompleter
393
394 return DeduplicateCompleter(_MergedCompleter(completers))
395
396 return _MergedCompleter(completers)
397
398
399def get_common_complete_suffix(
400 document: Document, completions: Sequence[Completion]
401) -> str:
402 """
403 Return the common prefix for all completions.
404 """
405
406 # Take only completions that don't change the text before the cursor.
407 def doesnt_change_before_cursor(completion: Completion) -> bool:
408 end = completion.text[: -completion.start_position]
409 return document.text_before_cursor.endswith(end)
410
411 completions2 = [c for c in completions if doesnt_change_before_cursor(c)]
412
413 # When there is at least one completion that changes the text before the
414 # cursor, don't return any common part.
415 if len(completions2) != len(completions):
416 return ""
417
418 # Return the common prefix.
419 def get_suffix(completion: Completion) -> str:
420 return completion.text[-completion.start_position :]
421
422 return _commonprefix([get_suffix(c) for c in completions2])
423
424
425def _commonprefix(strings: Iterable[str]) -> str:
426 # Similar to os.path.commonprefix
427 if not strings:
428 return ""
429
430 else:
431 s1 = min(strings)
432 s2 = max(strings)
433
434 for i, c in enumerate(s1):
435 if c != s2[i]:
436 return s1[:i]
437
438 return s1