Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/completion/base.py: 38%

136 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1""" 

2""" 

3from __future__ import annotations 

4 

5from abc import ABCMeta, abstractmethod 

6from typing import AsyncGenerator, Callable, Iterable, Sequence 

7 

8from prompt_toolkit.document import Document 

9from prompt_toolkit.eventloop import aclosing, generator_to_async_generator 

10from prompt_toolkit.filters import FilterOrBool, to_filter 

11from prompt_toolkit.formatted_text import AnyFormattedText, StyleAndTextTuples 

12 

13__all__ = [ 

14 "Completion", 

15 "Completer", 

16 "ThreadedCompleter", 

17 "DummyCompleter", 

18 "DynamicCompleter", 

19 "CompleteEvent", 

20 "ConditionalCompleter", 

21 "merge_completers", 

22 "get_common_complete_suffix", 

23] 

24 

25 

26class Completion: 

27 """ 

28 :param text: The new string that will be inserted into the document. 

29 :param start_position: Position relative to the cursor_position where the 

30 new text will start. The text will be inserted between the 

31 start_position and the original cursor position. 

32 :param display: (optional string or formatted text) If the completion has 

33 to be displayed differently in the completion menu. 

34 :param display_meta: (Optional string or formatted text) Meta information 

35 about the completion, e.g. the path or source where it's coming from. 

36 This can also be a callable that returns a string. 

37 :param style: Style string. 

38 :param selected_style: Style string, used for a selected completion. 

39 This can override the `style` parameter. 

40 """ 

41 

42 def __init__( 

43 self, 

44 text: str, 

45 start_position: int = 0, 

46 display: AnyFormattedText | None = None, 

47 display_meta: AnyFormattedText | None = None, 

48 style: str = "", 

49 selected_style: str = "", 

50 ) -> None: 

51 from prompt_toolkit.formatted_text import to_formatted_text 

52 

53 self.text = text 

54 self.start_position = start_position 

55 self._display_meta = display_meta 

56 

57 if display is None: 

58 display = text 

59 

60 self.display = to_formatted_text(display) 

61 

62 self.style = style 

63 self.selected_style = selected_style 

64 

65 assert self.start_position <= 0 

66 

67 def __repr__(self) -> str: 

68 if isinstance(self.display, str) and self.display == self.text: 

69 return "{}(text={!r}, start_position={!r})".format( 

70 self.__class__.__name__, 

71 self.text, 

72 self.start_position, 

73 ) 

74 else: 

75 return "{}(text={!r}, start_position={!r}, display={!r})".format( 

76 self.__class__.__name__, 

77 self.text, 

78 self.start_position, 

79 self.display, 

80 ) 

81 

82 def __eq__(self, other: object) -> bool: 

83 if not isinstance(other, Completion): 

84 return False 

85 return ( 

86 self.text == other.text 

87 and self.start_position == other.start_position 

88 and self.display == other.display 

89 and self._display_meta == other._display_meta 

90 ) 

91 

92 def __hash__(self) -> int: 

93 return hash((self.text, self.start_position, self.display, self._display_meta)) 

94 

95 @property 

96 def display_text(self) -> str: 

97 "The 'display' field as plain text." 

98 from prompt_toolkit.formatted_text import fragment_list_to_text 

99 

100 return fragment_list_to_text(self.display) 

101 

102 @property 

103 def display_meta(self) -> StyleAndTextTuples: 

104 "Return meta-text. (This is lazy when using a callable)." 

105 from prompt_toolkit.formatted_text import to_formatted_text 

106 

107 return to_formatted_text(self._display_meta or "") 

108 

109 @property 

110 def display_meta_text(self) -> str: 

111 "The 'meta' field as plain text." 

112 from prompt_toolkit.formatted_text import fragment_list_to_text 

113 

114 return fragment_list_to_text(self.display_meta) 

115 

116 def new_completion_from_position(self, position: int) -> Completion: 

117 """ 

118 (Only for internal use!) 

119 Get a new completion by splitting this one. Used by `Application` when 

120 it needs to have a list of new completions after inserting the common 

121 prefix. 

122 """ 

123 assert position - self.start_position >= 0 

124 

125 return Completion( 

126 text=self.text[position - self.start_position :], 

127 display=self.display, 

128 display_meta=self._display_meta, 

129 ) 

130 

131 

132class CompleteEvent: 

133 """ 

134 Event that called the completer. 

135 

136 :param text_inserted: When True, it means that completions are requested 

137 because of a text insert. (`Buffer.complete_while_typing`.) 

138 :param completion_requested: When True, it means that the user explicitly 

139 pressed the `Tab` key in order to view the completions. 

140 

141 These two flags can be used for instance to implement a completer that 

142 shows some completions when ``Tab`` has been pressed, but not 

143 automatically when the user presses a space. (Because of 

144 `complete_while_typing`.) 

145 """ 

146 

147 def __init__( 

148 self, text_inserted: bool = False, completion_requested: bool = False 

149 ) -> None: 

150 assert not (text_inserted and completion_requested) 

151 

152 #: Automatic completion while typing. 

153 self.text_inserted = text_inserted 

154 

155 #: Used explicitly requested completion by pressing 'tab'. 

156 self.completion_requested = completion_requested 

157 

158 def __repr__(self) -> str: 

159 return "{}(text_inserted={!r}, completion_requested={!r})".format( 

160 self.__class__.__name__, 

161 self.text_inserted, 

162 self.completion_requested, 

163 ) 

164 

165 

166class Completer(metaclass=ABCMeta): 

167 """ 

168 Base class for completer implementations. 

169 """ 

170 

171 @abstractmethod 

172 def get_completions( 

173 self, document: Document, complete_event: CompleteEvent 

174 ) -> Iterable[Completion]: 

175 """ 

176 This should be a generator that yields :class:`.Completion` instances. 

177 

178 If the generation of completions is something expensive (that takes a 

179 lot of time), consider wrapping this `Completer` class in a 

180 `ThreadedCompleter`. In that case, the completer algorithm runs in a 

181 background thread and completions will be displayed as soon as they 

182 arrive. 

183 

184 :param document: :class:`~prompt_toolkit.document.Document` instance. 

185 :param complete_event: :class:`.CompleteEvent` instance. 

186 """ 

187 while False: 

188 yield 

189 

190 async def get_completions_async( 

191 self, document: Document, complete_event: CompleteEvent 

192 ) -> AsyncGenerator[Completion, None]: 

193 """ 

194 Asynchronous generator for completions. (Probably, you won't have to 

195 override this.) 

196 

197 Asynchronous generator of :class:`.Completion` objects. 

198 """ 

199 for item in self.get_completions(document, complete_event): 

200 yield item 

201 

202 

203class ThreadedCompleter(Completer): 

204 """ 

205 Wrapper that runs the `get_completions` generator in a thread. 

206 

207 (Use this to prevent the user interface from becoming unresponsive if the 

208 generation of completions takes too much time.) 

209 

210 The completions will be displayed as soon as they are produced. The user 

211 can already select a completion, even if not all completions are displayed. 

212 """ 

213 

214 def __init__(self, completer: Completer) -> None: 

215 self.completer = completer 

216 

217 def get_completions( 

218 self, document: Document, complete_event: CompleteEvent 

219 ) -> Iterable[Completion]: 

220 return self.completer.get_completions(document, complete_event) 

221 

222 async def get_completions_async( 

223 self, document: Document, complete_event: CompleteEvent 

224 ) -> AsyncGenerator[Completion, None]: 

225 """ 

226 Asynchronous generator of completions. 

227 """ 

228 # NOTE: Right now, we are consuming the `get_completions` generator in 

229 # a synchronous background thread, then passing the results one 

230 # at a time over a queue, and consuming this queue in the main 

231 # thread (that's what `generator_to_async_generator` does). That 

232 # means that if the completer is *very* slow, we'll be showing 

233 # completions in the UI once they are computed. 

234 

235 # It's very tempting to replace this implementation with the 

236 # commented code below for several reasons: 

237 

238 # - `generator_to_async_generator` is not perfect and hard to get 

239 # right. It's a lot of complexity for little gain. The 

240 # implementation needs a huge buffer for it to be efficient 

241 # when there are many completions (like 50k+). 

242 # - Normally, a completer is supposed to be fast, users can have 

243 # "complete while typing" enabled, and want to see the 

244 # completions within a second. Handling one completion at a 

245 # time, and rendering once we get it here doesn't make any 

246 # sense if this is quick anyway. 

247 # - Completers like `FuzzyCompleter` prepare all completions 

248 # anyway so that they can be sorted by accuracy before they are 

249 # yielded. At the point that we start yielding completions 

250 # here, we already have all completions. 

251 # - The `Buffer` class has complex logic to invalidate the UI 

252 # while it is consuming the completions. We don't want to 

253 # invalidate the UI for every completion (if there are many), 

254 # but we want to do it often enough so that completions are 

255 # being displayed while they are produced. 

256 

257 # We keep the current behavior mainly for backward-compatibility. 

258 # Similarly, it would be better for this function to not return 

259 # an async generator, but simply be a coroutine that returns a 

260 # list of `Completion` objects, containing all completions at 

261 # once. 

262 

263 # Note that this argument doesn't mean we shouldn't use 

264 # `ThreadedCompleter`. It still makes sense to produce 

265 # completions in a background thread, because we don't want to 

266 # freeze the UI while the user is typing. But sending the 

267 # completions one at a time to the UI maybe isn't worth it. 

268 

269 # def get_all_in_thread() -> List[Completion]: 

270 # return list(self.get_completions(document, complete_event)) 

271 

272 # completions = await get_running_loop().run_in_executor(None, get_all_in_thread) 

273 # for completion in completions: 

274 # yield completion 

275 

276 async with aclosing( 

277 generator_to_async_generator( 

278 lambda: self.completer.get_completions(document, complete_event) 

279 ) 

280 ) as async_generator: 

281 async for completion in async_generator: 

282 yield completion 

283 

284 def __repr__(self) -> str: 

285 return f"ThreadedCompleter({self.completer!r})" 

286 

287 

288class DummyCompleter(Completer): 

289 """ 

290 A completer that doesn't return any completion. 

291 """ 

292 

293 def get_completions( 

294 self, document: Document, complete_event: CompleteEvent 

295 ) -> Iterable[Completion]: 

296 return [] 

297 

298 def __repr__(self) -> str: 

299 return "DummyCompleter()" 

300 

301 

302class DynamicCompleter(Completer): 

303 """ 

304 Completer class that can dynamically returns any Completer. 

305 

306 :param get_completer: Callable that returns a :class:`.Completer` instance. 

307 """ 

308 

309 def __init__(self, get_completer: Callable[[], Completer | None]) -> None: 

310 self.get_completer = get_completer 

311 

312 def get_completions( 

313 self, document: Document, complete_event: CompleteEvent 

314 ) -> Iterable[Completion]: 

315 completer = self.get_completer() or DummyCompleter() 

316 return completer.get_completions(document, complete_event) 

317 

318 async def get_completions_async( 

319 self, document: Document, complete_event: CompleteEvent 

320 ) -> AsyncGenerator[Completion, None]: 

321 completer = self.get_completer() or DummyCompleter() 

322 

323 async for completion in completer.get_completions_async( 

324 document, complete_event 

325 ): 

326 yield completion 

327 

328 def __repr__(self) -> str: 

329 return f"DynamicCompleter({self.get_completer!r} -> {self.get_completer()!r})" 

330 

331 

332class ConditionalCompleter(Completer): 

333 """ 

334 Wrapper around any other completer that will enable/disable the completions 

335 depending on whether the received condition is satisfied. 

336 

337 :param completer: :class:`.Completer` instance. 

338 :param filter: :class:`.Filter` instance. 

339 """ 

340 

341 def __init__(self, completer: Completer, filter: FilterOrBool) -> None: 

342 self.completer = completer 

343 self.filter = to_filter(filter) 

344 

345 def __repr__(self) -> str: 

346 return f"ConditionalCompleter({self.completer!r}, filter={self.filter!r})" 

347 

348 def get_completions( 

349 self, document: Document, complete_event: CompleteEvent 

350 ) -> Iterable[Completion]: 

351 # Get all completions in a blocking way. 

352 if self.filter(): 

353 yield from self.completer.get_completions(document, complete_event) 

354 

355 async def get_completions_async( 

356 self, document: Document, complete_event: CompleteEvent 

357 ) -> AsyncGenerator[Completion, None]: 

358 # Get all completions in a non-blocking way. 

359 if self.filter(): 

360 async with aclosing( 

361 self.completer.get_completions_async(document, complete_event) 

362 ) as async_generator: 

363 async for item in async_generator: 

364 yield item 

365 

366 

367class _MergedCompleter(Completer): 

368 """ 

369 Combine several completers into one. 

370 """ 

371 

372 def __init__(self, completers: Sequence[Completer]) -> None: 

373 self.completers = completers 

374 

375 def get_completions( 

376 self, document: Document, complete_event: CompleteEvent 

377 ) -> Iterable[Completion]: 

378 # Get all completions from the other completers in a blocking way. 

379 for completer in self.completers: 

380 yield from completer.get_completions(document, complete_event) 

381 

382 async def get_completions_async( 

383 self, document: Document, complete_event: CompleteEvent 

384 ) -> AsyncGenerator[Completion, None]: 

385 # Get all completions from the other completers in a non-blocking way. 

386 for completer in self.completers: 

387 async with aclosing( 

388 completer.get_completions_async(document, complete_event) 

389 ) as async_generator: 

390 async for item in async_generator: 

391 yield item 

392 

393 

394def merge_completers( 

395 completers: Sequence[Completer], deduplicate: bool = False 

396) -> Completer: 

397 """ 

398 Combine several completers into one. 

399 

400 :param deduplicate: If `True`, wrap the result in a `DeduplicateCompleter` 

401 so that completions that would result in the same text will be 

402 deduplicated. 

403 """ 

404 if deduplicate: 

405 from .deduplicate import DeduplicateCompleter 

406 

407 return DeduplicateCompleter(_MergedCompleter(completers)) 

408 

409 return _MergedCompleter(completers) 

410 

411 

412def get_common_complete_suffix( 

413 document: Document, completions: Sequence[Completion] 

414) -> str: 

415 """ 

416 Return the common prefix for all completions. 

417 """ 

418 

419 # Take only completions that don't change the text before the cursor. 

420 def doesnt_change_before_cursor(completion: Completion) -> bool: 

421 end = completion.text[: -completion.start_position] 

422 return document.text_before_cursor.endswith(end) 

423 

424 completions2 = [c for c in completions if doesnt_change_before_cursor(c)] 

425 

426 # When there is at least one completion that changes the text before the 

427 # cursor, don't return any common part. 

428 if len(completions2) != len(completions): 

429 return "" 

430 

431 # Return the common prefix. 

432 def get_suffix(completion: Completion) -> str: 

433 return completion.text[-completion.start_position :] 

434 

435 return _commonprefix([get_suffix(c) for c in completions2]) 

436 

437 

438def _commonprefix(strings: Iterable[str]) -> str: 

439 # Similar to os.path.commonprefix 

440 if not strings: 

441 return "" 

442 

443 else: 

444 s1 = min(strings) 

445 s2 = max(strings) 

446 

447 for i, c in enumerate(s1): 

448 if c != s2[i]: 

449 return s1[:i] 

450 

451 return s1