1"""
2Implementations for the history of a `Buffer`.
3
4NOTE: There is no `DynamicHistory`:
5 This doesn't work well, because the `Buffer` needs to be able to attach
6 an event handler to the event when a history entry is loaded. This
7 loading can be done asynchronously and making the history swappable would
8 probably break this.
9"""
10
11from __future__ import annotations
12
13import datetime
14import os
15import threading
16from abc import ABCMeta, abstractmethod
17from asyncio import get_running_loop
18from collections.abc import AsyncGenerator, Iterable, Sequence
19from typing import Union
20
21__all__ = [
22 "History",
23 "ThreadedHistory",
24 "DummyHistory",
25 "FileHistory",
26 "InMemoryHistory",
27]
28
29
30class History(metaclass=ABCMeta):
31 """
32 Base ``History`` class.
33
34 This also includes abstract methods for loading/storing history.
35 """
36
37 def __init__(self) -> None:
38 # In memory storage for strings.
39 self._loaded = False
40
41 # History that's loaded already, in reverse order. Latest, most recent
42 # item first.
43 self._loaded_strings: list[str] = []
44
45 #
46 # Methods expected by `Buffer`.
47 #
48
49 async def load(self) -> AsyncGenerator[str, None]:
50 """
51 Load the history and yield all the entries in reverse order (latest,
52 most recent history entry first).
53
54 This method can be called multiple times from the `Buffer` to
55 repopulate the history when prompting for a new input. So we are
56 responsible here for both caching, and making sure that strings that
57 were were appended to the history will be incorporated next time this
58 method is called.
59 """
60 if not self._loaded:
61 self._loaded_strings = list(self.load_history_strings())
62 self._loaded = True
63
64 for item in self._loaded_strings:
65 yield item
66
67 def get_strings(self) -> list[str]:
68 """
69 Get the strings from the history that are loaded so far.
70 (In order. Oldest item first.)
71 """
72 return self._loaded_strings[::-1]
73
74 def append_string(self, string: str) -> None:
75 "Add string to the history."
76 self._loaded_strings.insert(0, string)
77 self.store_string(string)
78
79 #
80 # Implementation for specific backends.
81 #
82
83 @abstractmethod
84 def load_history_strings(self) -> Iterable[str]:
85 """
86 This should be a generator that yields `str` instances.
87
88 It should yield the most recent items first, because they are the most
89 important. (The history can already be used, even when it's only
90 partially loaded.)
91 """
92 while False:
93 yield
94
95 @abstractmethod
96 def store_string(self, string: str) -> None:
97 """
98 Store the string in persistent storage.
99 """
100
101
102class ThreadedHistory(History):
103 """
104 Wrapper around `History` implementations that run the `load()` generator in
105 a thread.
106
107 Use this to increase the start-up time of prompt_toolkit applications.
108 History entries are available as soon as they are loaded. We don't have to
109 wait for everything to be loaded.
110 """
111
112 def __init__(self, history: History) -> None:
113 super().__init__()
114
115 self.history = history
116
117 self._load_thread: threading.Thread | None = None
118
119 # Lock for accessing/manipulating `_loaded_strings` and `_loaded`
120 # together in a consistent state.
121 self._lock = threading.Lock()
122
123 # Events created by each `load()` call. Used to wait for new history
124 # entries from the loader thread.
125 self._string_load_events: list[threading.Event] = []
126
127 async def load(self) -> AsyncGenerator[str, None]:
128 """
129 Like `History.load(), but call `self.load_history_strings()` in a
130 background thread.
131 """
132 # Start the load thread, if this is called for the first time.
133 if not self._load_thread:
134 self._load_thread = threading.Thread(
135 target=self._in_load_thread,
136 daemon=True,
137 )
138 self._load_thread.start()
139
140 # Consume the `_loaded_strings` list, using asyncio.
141 loop = get_running_loop()
142
143 # Create threading Event so that we can wait for new items.
144 event = threading.Event()
145 event.set()
146 self._string_load_events.append(event)
147
148 items_yielded = 0
149
150 try:
151 while True:
152 # Wait for new items to be available.
153 # (Use a timeout, because the executor thread is not a daemon
154 # thread. The "slow-history.py" example would otherwise hang if
155 # Control-C is pressed before the history is fully loaded,
156 # because there's still this non-daemon executor thread waiting
157 # for this event.)
158 got_timeout = await loop.run_in_executor(
159 None, lambda: event.wait(timeout=0.5)
160 )
161 if not got_timeout:
162 continue
163
164 # Read new items (in lock).
165 def in_executor() -> tuple[list[str], bool]:
166 with self._lock:
167 new_items = self._loaded_strings[items_yielded:]
168 done = self._loaded
169 event.clear()
170 return new_items, done
171
172 new_items, done = await loop.run_in_executor(None, in_executor)
173
174 items_yielded += len(new_items)
175
176 for item in new_items:
177 yield item
178
179 if done:
180 break
181 finally:
182 self._string_load_events.remove(event)
183
184 def _in_load_thread(self) -> None:
185 try:
186 # Start with an empty list. In case `append_string()` was called
187 # before `load()` happened. Then `.store_string()` will have
188 # written these entries back to disk and we will reload it.
189 self._loaded_strings = []
190
191 for item in self.history.load_history_strings():
192 with self._lock:
193 self._loaded_strings.append(item)
194
195 for event in self._string_load_events:
196 event.set()
197 finally:
198 with self._lock:
199 self._loaded = True
200 for event in self._string_load_events:
201 event.set()
202
203 def append_string(self, string: str) -> None:
204 with self._lock:
205 self._loaded_strings.insert(0, string)
206 self.store_string(string)
207
208 # All of the following are proxied to `self.history`.
209
210 def load_history_strings(self) -> Iterable[str]:
211 return self.history.load_history_strings()
212
213 def store_string(self, string: str) -> None:
214 self.history.store_string(string)
215
216 def __repr__(self) -> str:
217 return f"ThreadedHistory({self.history!r})"
218
219
220class InMemoryHistory(History):
221 """
222 :class:`.History` class that keeps a list of all strings in memory.
223
224 In order to prepopulate the history, it's possible to call either
225 `append_string` for all items or pass a list of strings to `__init__` here.
226 """
227
228 def __init__(self, history_strings: Sequence[str] | None = None) -> None:
229 super().__init__()
230 # Emulating disk storage.
231 if history_strings is None:
232 self._storage = []
233 else:
234 self._storage = list(history_strings)
235
236 def load_history_strings(self) -> Iterable[str]:
237 yield from self._storage[::-1]
238
239 def store_string(self, string: str) -> None:
240 self._storage.append(string)
241
242
243class DummyHistory(History):
244 """
245 :class:`.History` object that doesn't remember anything.
246 """
247
248 def load_history_strings(self) -> Iterable[str]:
249 return []
250
251 def store_string(self, string: str) -> None:
252 pass
253
254 def append_string(self, string: str) -> None:
255 # Don't remember this.
256 pass
257
258
259_StrOrBytesPath = Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"]
260
261
262class FileHistory(History):
263 """
264 :class:`.History` class that stores all strings in a file.
265 """
266
267 def __init__(self, filename: _StrOrBytesPath) -> None:
268 self.filename = filename
269 super().__init__()
270
271 def load_history_strings(self) -> Iterable[str]:
272 strings: list[str] = []
273 lines: list[str] = []
274
275 def add() -> None:
276 if lines:
277 # Join and drop trailing newline.
278 string = "".join(lines)[:-1]
279
280 strings.append(string)
281
282 if os.path.exists(self.filename):
283 with open(self.filename, "rb") as f:
284 for line_bytes in f:
285 line = line_bytes.decode("utf-8", errors="replace")
286
287 if line.startswith("+"):
288 lines.append(line[1:])
289 else:
290 add()
291 lines = []
292
293 add()
294
295 # Reverse the order, because newest items have to go first.
296 return reversed(strings)
297
298 def store_string(self, string: str) -> None:
299 # Save to file.
300 with open(self.filename, "ab") as f:
301
302 def write(t: str) -> None:
303 f.write(t.encode("utf-8"))
304
305 write(f"\n# {datetime.datetime.now()}\n")
306 for line in string.split("\n"):
307 write(f"+{line}\n")