1"""
2Implementations for the history of a `Buffer`.
3
4NOTE: There is no `DynamicHistory`:
5 This doesn't work well, because the `Buffer` needs to be able to attach
6 an event handler to the event when a history entry is loaded. This
7 loading can be done asynchronously and making the history swappable would
8 probably break this.
9"""
10
11from __future__ import annotations
12
13import datetime
14import os
15import threading
16from abc import ABCMeta, abstractmethod
17from asyncio import get_running_loop
18from typing import AsyncGenerator, Iterable, Sequence, Union
19
20__all__ = [
21 "History",
22 "ThreadedHistory",
23 "DummyHistory",
24 "FileHistory",
25 "InMemoryHistory",
26]
27
28
29class History(metaclass=ABCMeta):
30 """
31 Base ``History`` class.
32
33 This also includes abstract methods for loading/storing history.
34 """
35
36 def __init__(self) -> None:
37 # In memory storage for strings.
38 self._loaded = False
39
40 # History that's loaded already, in reverse order. Latest, most recent
41 # item first.
42 self._loaded_strings: list[str] = []
43
44 #
45 # Methods expected by `Buffer`.
46 #
47
48 async def load(self) -> AsyncGenerator[str, None]:
49 """
50 Load the history and yield all the entries in reverse order (latest,
51 most recent history entry first).
52
53 This method can be called multiple times from the `Buffer` to
54 repopulate the history when prompting for a new input. So we are
55 responsible here for both caching, and making sure that strings that
56 were were appended to the history will be incorporated next time this
57 method is called.
58 """
59 if not self._loaded:
60 self._loaded_strings = list(self.load_history_strings())
61 self._loaded = True
62
63 for item in self._loaded_strings:
64 yield item
65
66 def get_strings(self) -> list[str]:
67 """
68 Get the strings from the history that are loaded so far.
69 (In order. Oldest item first.)
70 """
71 return self._loaded_strings[::-1]
72
73 def append_string(self, string: str) -> None:
74 "Add string to the history."
75 self._loaded_strings.insert(0, string)
76 self.store_string(string)
77
78 #
79 # Implementation for specific backends.
80 #
81
82 @abstractmethod
83 def load_history_strings(self) -> Iterable[str]:
84 """
85 This should be a generator that yields `str` instances.
86
87 It should yield the most recent items first, because they are the most
88 important. (The history can already be used, even when it's only
89 partially loaded.)
90 """
91 while False:
92 yield
93
94 @abstractmethod
95 def store_string(self, string: str) -> None:
96 """
97 Store the string in persistent storage.
98 """
99
100
101class ThreadedHistory(History):
102 """
103 Wrapper around `History` implementations that run the `load()` generator in
104 a thread.
105
106 Use this to increase the start-up time of prompt_toolkit applications.
107 History entries are available as soon as they are loaded. We don't have to
108 wait for everything to be loaded.
109 """
110
111 def __init__(self, history: History) -> None:
112 super().__init__()
113
114 self.history = history
115
116 self._load_thread: threading.Thread | None = None
117
118 # Lock for accessing/manipulating `_loaded_strings` and `_loaded`
119 # together in a consistent state.
120 self._lock = threading.Lock()
121
122 # Events created by each `load()` call. Used to wait for new history
123 # entries from the loader thread.
124 self._string_load_events: list[threading.Event] = []
125
126 async def load(self) -> AsyncGenerator[str, None]:
127 """
128 Like `History.load(), but call `self.load_history_strings()` in a
129 background thread.
130 """
131 # Start the load thread, if this is called for the first time.
132 if not self._load_thread:
133 self._load_thread = threading.Thread(
134 target=self._in_load_thread,
135 daemon=True,
136 )
137 self._load_thread.start()
138
139 # Consume the `_loaded_strings` list, using asyncio.
140 loop = get_running_loop()
141
142 # Create threading Event so that we can wait for new items.
143 event = threading.Event()
144 event.set()
145 self._string_load_events.append(event)
146
147 items_yielded = 0
148
149 try:
150 while True:
151 # Wait for new items to be available.
152 # (Use a timeout, because the executor thread is not a daemon
153 # thread. The "slow-history.py" example would otherwise hang if
154 # Control-C is pressed before the history is fully loaded,
155 # because there's still this non-daemon executor thread waiting
156 # for this event.)
157 got_timeout = await loop.run_in_executor(
158 None, lambda: event.wait(timeout=0.5)
159 )
160 if not got_timeout:
161 continue
162
163 # Read new items (in lock).
164 def in_executor() -> tuple[list[str], bool]:
165 with self._lock:
166 new_items = self._loaded_strings[items_yielded:]
167 done = self._loaded
168 event.clear()
169 return new_items, done
170
171 new_items, done = await loop.run_in_executor(None, in_executor)
172
173 items_yielded += len(new_items)
174
175 for item in new_items:
176 yield item
177
178 if done:
179 break
180 finally:
181 self._string_load_events.remove(event)
182
183 def _in_load_thread(self) -> None:
184 try:
185 # Start with an empty list. In case `append_string()` was called
186 # before `load()` happened. Then `.store_string()` will have
187 # written these entries back to disk and we will reload it.
188 self._loaded_strings = []
189
190 for item in self.history.load_history_strings():
191 with self._lock:
192 self._loaded_strings.append(item)
193
194 for event in self._string_load_events:
195 event.set()
196 finally:
197 with self._lock:
198 self._loaded = True
199 for event in self._string_load_events:
200 event.set()
201
202 def append_string(self, string: str) -> None:
203 with self._lock:
204 self._loaded_strings.insert(0, string)
205 self.store_string(string)
206
207 # All of the following are proxied to `self.history`.
208
209 def load_history_strings(self) -> Iterable[str]:
210 return self.history.load_history_strings()
211
212 def store_string(self, string: str) -> None:
213 self.history.store_string(string)
214
215 def __repr__(self) -> str:
216 return f"ThreadedHistory({self.history!r})"
217
218
219class InMemoryHistory(History):
220 """
221 :class:`.History` class that keeps a list of all strings in memory.
222
223 In order to prepopulate the history, it's possible to call either
224 `append_string` for all items or pass a list of strings to `__init__` here.
225 """
226
227 def __init__(self, history_strings: Sequence[str] | None = None) -> None:
228 super().__init__()
229 # Emulating disk storage.
230 if history_strings is None:
231 self._storage = []
232 else:
233 self._storage = list(history_strings)
234
235 def load_history_strings(self) -> Iterable[str]:
236 yield from self._storage[::-1]
237
238 def store_string(self, string: str) -> None:
239 self._storage.append(string)
240
241
242class DummyHistory(History):
243 """
244 :class:`.History` object that doesn't remember anything.
245 """
246
247 def load_history_strings(self) -> Iterable[str]:
248 return []
249
250 def store_string(self, string: str) -> None:
251 pass
252
253 def append_string(self, string: str) -> None:
254 # Don't remember this.
255 pass
256
257
258_StrOrBytesPath = Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"]
259
260
261class FileHistory(History):
262 """
263 :class:`.History` class that stores all strings in a file.
264 """
265
266 def __init__(self, filename: _StrOrBytesPath) -> None:
267 self.filename = filename
268 super().__init__()
269
270 def load_history_strings(self) -> Iterable[str]:
271 strings: list[str] = []
272 lines: list[str] = []
273
274 def add() -> None:
275 if lines:
276 # Join and drop trailing newline.
277 string = "".join(lines)[:-1]
278
279 strings.append(string)
280
281 if os.path.exists(self.filename):
282 with open(self.filename, "rb") as f:
283 for line_bytes in f:
284 line = line_bytes.decode("utf-8", errors="replace")
285
286 if line.startswith("+"):
287 lines.append(line[1:])
288 else:
289 add()
290 lines = []
291
292 add()
293
294 # Reverse the order, because newest items have to go first.
295 return reversed(strings)
296
297 def store_string(self, string: str) -> None:
298 # Save to file.
299 with open(self.filename, "ab") as f:
300
301 def write(t: str) -> None:
302 f.write(t.encode("utf-8"))
303
304 write(f"\n# {datetime.datetime.now()}\n")
305 for line in string.split("\n"):
306 write(f"+{line}\n")