Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/history.py: 29%
128 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1"""
2Implementations for the history of a `Buffer`.
4NOTE: There is no `DynamicHistory`:
5 This doesn't work well, because the `Buffer` needs to be able to attach
6 an event handler to the event when a history entry is loaded. This
7 loading can be done asynchronously and making the history swappable would
8 probably break this.
9"""
10from __future__ import annotations
12import datetime
13import os
14import threading
15from abc import ABCMeta, abstractmethod
16from asyncio import get_running_loop
17from typing import AsyncGenerator, Iterable, Sequence
19__all__ = [
20 "History",
21 "ThreadedHistory",
22 "DummyHistory",
23 "FileHistory",
24 "InMemoryHistory",
25]
28class History(metaclass=ABCMeta):
29 """
30 Base ``History`` class.
32 This also includes abstract methods for loading/storing history.
33 """
35 def __init__(self) -> None:
36 # In memory storage for strings.
37 self._loaded = False
39 # History that's loaded already, in reverse order. Latest, most recent
40 # item first.
41 self._loaded_strings: list[str] = []
43 #
44 # Methods expected by `Buffer`.
45 #
47 async def load(self) -> AsyncGenerator[str, None]:
48 """
49 Load the history and yield all the entries in reverse order (latest,
50 most recent history entry first).
52 This method can be called multiple times from the `Buffer` to
53 repopulate the history when prompting for a new input. So we are
54 responsible here for both caching, and making sure that strings that
55 were were appended to the history will be incorporated next time this
56 method is called.
57 """
58 if not self._loaded:
59 self._loaded_strings = list(self.load_history_strings())
60 self._loaded = True
62 for item in self._loaded_strings:
63 yield item
65 def get_strings(self) -> list[str]:
66 """
67 Get the strings from the history that are loaded so far.
68 (In order. Oldest item first.)
69 """
70 return self._loaded_strings[::-1]
72 def append_string(self, string: str) -> None:
73 "Add string to the history."
74 self._loaded_strings.insert(0, string)
75 self.store_string(string)
77 #
78 # Implementation for specific backends.
79 #
81 @abstractmethod
82 def load_history_strings(self) -> Iterable[str]:
83 """
84 This should be a generator that yields `str` instances.
86 It should yield the most recent items first, because they are the most
87 important. (The history can already be used, even when it's only
88 partially loaded.)
89 """
90 while False:
91 yield
93 @abstractmethod
94 def store_string(self, string: str) -> None:
95 """
96 Store the string in persistent storage.
97 """
100class ThreadedHistory(History):
101 """
102 Wrapper around `History` implementations that run the `load()` generator in
103 a thread.
105 Use this to increase the start-up time of prompt_toolkit applications.
106 History entries are available as soon as they are loaded. We don't have to
107 wait for everything to be loaded.
108 """
110 def __init__(self, history: History) -> None:
111 super().__init__()
113 self.history = history
115 self._load_thread: threading.Thread | None = None
117 # Lock for accessing/manipulating `_loaded_strings` and `_loaded`
118 # together in a consistent state.
119 self._lock = threading.Lock()
121 # Events created by each `load()` call. Used to wait for new history
122 # entries from the loader thread.
123 self._string_load_events: list[threading.Event] = []
125 async def load(self) -> AsyncGenerator[str, None]:
126 """
127 Like `History.load(), but call `self.load_history_strings()` in a
128 background thread.
129 """
130 # Start the load thread, if this is called for the first time.
131 if not self._load_thread:
132 self._load_thread = threading.Thread(
133 target=self._in_load_thread,
134 daemon=True,
135 )
136 self._load_thread.start()
138 # Consume the `_loaded_strings` list, using asyncio.
139 loop = get_running_loop()
141 # Create threading Event so that we can wait for new items.
142 event = threading.Event()
143 event.set()
144 self._string_load_events.append(event)
146 items_yielded = 0
148 try:
149 while True:
150 # Wait for new items to be available.
151 # (Use a timeout, because the executor thread is not a daemon
152 # thread. The "slow-history.py" example would otherwise hang if
153 # Control-C is pressed before the history is fully loaded,
154 # because there's still this non-daemon executor thread waiting
155 # for this event.)
156 got_timeout = await loop.run_in_executor(
157 None, lambda: event.wait(timeout=0.5)
158 )
159 if not got_timeout:
160 continue
162 # Read new items (in lock).
163 def in_executor() -> tuple[list[str], bool]:
164 with self._lock:
165 new_items = self._loaded_strings[items_yielded:]
166 done = self._loaded
167 event.clear()
168 return new_items, done
170 new_items, done = await loop.run_in_executor(None, in_executor)
172 items_yielded += len(new_items)
174 for item in new_items:
175 yield item
177 if done:
178 break
179 finally:
180 self._string_load_events.remove(event)
182 def _in_load_thread(self) -> None:
183 try:
184 # Start with an empty list. In case `append_string()` was called
185 # before `load()` happened. Then `.store_string()` will have
186 # written these entries back to disk and we will reload it.
187 self._loaded_strings = []
189 for item in self.history.load_history_strings():
190 with self._lock:
191 self._loaded_strings.append(item)
193 for event in self._string_load_events:
194 event.set()
195 finally:
196 with self._lock:
197 self._loaded = True
198 for event in self._string_load_events:
199 event.set()
201 def append_string(self, string: str) -> None:
202 with self._lock:
203 self._loaded_strings.insert(0, string)
204 self.store_string(string)
206 # All of the following are proxied to `self.history`.
208 def load_history_strings(self) -> Iterable[str]:
209 return self.history.load_history_strings()
211 def store_string(self, string: str) -> None:
212 self.history.store_string(string)
214 def __repr__(self) -> str:
215 return f"ThreadedHistory({self.history!r})"
218class InMemoryHistory(History):
219 """
220 :class:`.History` class that keeps a list of all strings in memory.
222 In order to prepopulate the history, it's possible to call either
223 `append_string` for all items or pass a list of strings to `__init__` here.
224 """
226 def __init__(self, history_strings: Sequence[str] | None = None) -> None:
227 super().__init__()
228 # Emulating disk storage.
229 if history_strings is None:
230 self._storage = []
231 else:
232 self._storage = list(history_strings)
234 def load_history_strings(self) -> Iterable[str]:
235 yield from self._storage[::-1]
237 def store_string(self, string: str) -> None:
238 self._storage.append(string)
241class DummyHistory(History):
242 """
243 :class:`.History` object that doesn't remember anything.
244 """
246 def load_history_strings(self) -> Iterable[str]:
247 return []
249 def store_string(self, string: str) -> None:
250 pass
252 def append_string(self, string: str) -> None:
253 # Don't remember this.
254 pass
257class FileHistory(History):
258 """
259 :class:`.History` class that stores all strings in a file.
260 """
262 def __init__(self, filename: str) -> None:
263 self.filename = filename
264 super().__init__()
266 def load_history_strings(self) -> Iterable[str]:
267 strings: list[str] = []
268 lines: list[str] = []
270 def add() -> None:
271 if lines:
272 # Join and drop trailing newline.
273 string = "".join(lines)[:-1]
275 strings.append(string)
277 if os.path.exists(self.filename):
278 with open(self.filename, "rb") as f:
279 for line_bytes in f:
280 line = line_bytes.decode("utf-8", errors="replace")
282 if line.startswith("+"):
283 lines.append(line[1:])
284 else:
285 add()
286 lines = []
288 add()
290 # Reverse the order, because newest items have to go first.
291 return reversed(strings)
293 def store_string(self, string: str) -> None:
294 # Save to file.
295 with open(self.filename, "ab") as f:
297 def write(t: str) -> None:
298 f.write(t.encode("utf-8"))
300 write("\n# %s\n" % datetime.datetime.now())
301 for line in string.split("\n"):
302 write("+%s\n" % line)