Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/history.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

131 statements  

1""" 

2Implementations for the history of a `Buffer`. 

3 

4NOTE: There is no `DynamicHistory`: 

5 This doesn't work well, because the `Buffer` needs to be able to attach 

6 an event handler to the event when a history entry is loaded. This 

7 loading can be done asynchronously and making the history swappable would 

8 probably break this. 

9""" 

10 

11from __future__ import annotations 

12 

13import datetime 

14import os 

15import threading 

16from abc import ABCMeta, abstractmethod 

17from asyncio import get_running_loop 

18from typing import AsyncGenerator, Iterable, Sequence, Union 

19 

20__all__ = [ 

21 "History", 

22 "ThreadedHistory", 

23 "DummyHistory", 

24 "FileHistory", 

25 "InMemoryHistory", 

26] 

27 

28 

29class History(metaclass=ABCMeta): 

30 """ 

31 Base ``History`` class. 

32 

33 This also includes abstract methods for loading/storing history. 

34 """ 

35 

36 def __init__(self) -> None: 

37 # In memory storage for strings. 

38 self._loaded = False 

39 

40 # History that's loaded already, in reverse order. Latest, most recent 

41 # item first. 

42 self._loaded_strings: list[str] = [] 

43 

44 # 

45 # Methods expected by `Buffer`. 

46 # 

47 

48 async def load(self) -> AsyncGenerator[str, None]: 

49 """ 

50 Load the history and yield all the entries in reverse order (latest, 

51 most recent history entry first). 

52 

53 This method can be called multiple times from the `Buffer` to 

54 repopulate the history when prompting for a new input. So we are 

55 responsible here for both caching, and making sure that strings that 

56 were were appended to the history will be incorporated next time this 

57 method is called. 

58 """ 

59 if not self._loaded: 

60 self._loaded_strings = list(self.load_history_strings()) 

61 self._loaded = True 

62 

63 for item in self._loaded_strings: 

64 yield item 

65 

66 def get_strings(self) -> list[str]: 

67 """ 

68 Get the strings from the history that are loaded so far. 

69 (In order. Oldest item first.) 

70 """ 

71 return self._loaded_strings[::-1] 

72 

73 def append_string(self, string: str) -> None: 

74 "Add string to the history." 

75 self._loaded_strings.insert(0, string) 

76 self.store_string(string) 

77 

78 # 

79 # Implementation for specific backends. 

80 # 

81 

82 @abstractmethod 

83 def load_history_strings(self) -> Iterable[str]: 

84 """ 

85 This should be a generator that yields `str` instances. 

86 

87 It should yield the most recent items first, because they are the most 

88 important. (The history can already be used, even when it's only 

89 partially loaded.) 

90 """ 

91 while False: 

92 yield 

93 

94 @abstractmethod 

95 def store_string(self, string: str) -> None: 

96 """ 

97 Store the string in persistent storage. 

98 """ 

99 

100 

101class ThreadedHistory(History): 

102 """ 

103 Wrapper around `History` implementations that run the `load()` generator in 

104 a thread. 

105 

106 Use this to increase the start-up time of prompt_toolkit applications. 

107 History entries are available as soon as they are loaded. We don't have to 

108 wait for everything to be loaded. 

109 """ 

110 

111 def __init__(self, history: History) -> None: 

112 super().__init__() 

113 

114 self.history = history 

115 

116 self._load_thread: threading.Thread | None = None 

117 

118 # Lock for accessing/manipulating `_loaded_strings` and `_loaded` 

119 # together in a consistent state. 

120 self._lock = threading.Lock() 

121 

122 # Events created by each `load()` call. Used to wait for new history 

123 # entries from the loader thread. 

124 self._string_load_events: list[threading.Event] = [] 

125 

126 async def load(self) -> AsyncGenerator[str, None]: 

127 """ 

128 Like `History.load(), but call `self.load_history_strings()` in a 

129 background thread. 

130 """ 

131 # Start the load thread, if this is called for the first time. 

132 if not self._load_thread: 

133 self._load_thread = threading.Thread( 

134 target=self._in_load_thread, 

135 daemon=True, 

136 ) 

137 self._load_thread.start() 

138 

139 # Consume the `_loaded_strings` list, using asyncio. 

140 loop = get_running_loop() 

141 

142 # Create threading Event so that we can wait for new items. 

143 event = threading.Event() 

144 event.set() 

145 self._string_load_events.append(event) 

146 

147 items_yielded = 0 

148 

149 try: 

150 while True: 

151 # Wait for new items to be available. 

152 # (Use a timeout, because the executor thread is not a daemon 

153 # thread. The "slow-history.py" example would otherwise hang if 

154 # Control-C is pressed before the history is fully loaded, 

155 # because there's still this non-daemon executor thread waiting 

156 # for this event.) 

157 got_timeout = await loop.run_in_executor( 

158 None, lambda: event.wait(timeout=0.5) 

159 ) 

160 if not got_timeout: 

161 continue 

162 

163 # Read new items (in lock). 

164 def in_executor() -> tuple[list[str], bool]: 

165 with self._lock: 

166 new_items = self._loaded_strings[items_yielded:] 

167 done = self._loaded 

168 event.clear() 

169 return new_items, done 

170 

171 new_items, done = await loop.run_in_executor(None, in_executor) 

172 

173 items_yielded += len(new_items) 

174 

175 for item in new_items: 

176 yield item 

177 

178 if done: 

179 break 

180 finally: 

181 self._string_load_events.remove(event) 

182 

183 def _in_load_thread(self) -> None: 

184 try: 

185 # Start with an empty list. In case `append_string()` was called 

186 # before `load()` happened. Then `.store_string()` will have 

187 # written these entries back to disk and we will reload it. 

188 self._loaded_strings = [] 

189 

190 for item in self.history.load_history_strings(): 

191 with self._lock: 

192 self._loaded_strings.append(item) 

193 

194 for event in self._string_load_events: 

195 event.set() 

196 finally: 

197 with self._lock: 

198 self._loaded = True 

199 for event in self._string_load_events: 

200 event.set() 

201 

202 def append_string(self, string: str) -> None: 

203 with self._lock: 

204 self._loaded_strings.insert(0, string) 

205 self.store_string(string) 

206 

207 # All of the following are proxied to `self.history`. 

208 

209 def load_history_strings(self) -> Iterable[str]: 

210 return self.history.load_history_strings() 

211 

212 def store_string(self, string: str) -> None: 

213 self.history.store_string(string) 

214 

215 def __repr__(self) -> str: 

216 return f"ThreadedHistory({self.history!r})" 

217 

218 

219class InMemoryHistory(History): 

220 """ 

221 :class:`.History` class that keeps a list of all strings in memory. 

222 

223 In order to prepopulate the history, it's possible to call either 

224 `append_string` for all items or pass a list of strings to `__init__` here. 

225 """ 

226 

227 def __init__(self, history_strings: Sequence[str] | None = None) -> None: 

228 super().__init__() 

229 # Emulating disk storage. 

230 if history_strings is None: 

231 self._storage = [] 

232 else: 

233 self._storage = list(history_strings) 

234 

235 def load_history_strings(self) -> Iterable[str]: 

236 yield from self._storage[::-1] 

237 

238 def store_string(self, string: str) -> None: 

239 self._storage.append(string) 

240 

241 

242class DummyHistory(History): 

243 """ 

244 :class:`.History` object that doesn't remember anything. 

245 """ 

246 

247 def load_history_strings(self) -> Iterable[str]: 

248 return [] 

249 

250 def store_string(self, string: str) -> None: 

251 pass 

252 

253 def append_string(self, string: str) -> None: 

254 # Don't remember this. 

255 pass 

256 

257 

258_StrOrBytesPath = Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"] 

259 

260 

261class FileHistory(History): 

262 """ 

263 :class:`.History` class that stores all strings in a file. 

264 """ 

265 

266 def __init__(self, filename: _StrOrBytesPath) -> None: 

267 self.filename = filename 

268 super().__init__() 

269 

270 def load_history_strings(self) -> Iterable[str]: 

271 strings: list[str] = [] 

272 lines: list[str] = [] 

273 

274 def add() -> None: 

275 if lines: 

276 # Join and drop trailing newline. 

277 string = "".join(lines)[:-1] 

278 

279 strings.append(string) 

280 

281 if os.path.exists(self.filename): 

282 with open(self.filename, "rb") as f: 

283 for line_bytes in f: 

284 line = line_bytes.decode("utf-8", errors="replace") 

285 

286 if line.startswith("+"): 

287 lines.append(line[1:]) 

288 else: 

289 add() 

290 lines = [] 

291 

292 add() 

293 

294 # Reverse the order, because newest items have to go first. 

295 return reversed(strings) 

296 

297 def store_string(self, string: str) -> None: 

298 # Save to file. 

299 with open(self.filename, "ab") as f: 

300 

301 def write(t: str) -> None: 

302 f.write(t.encode("utf-8")) 

303 

304 write(f"\n# {datetime.datetime.now()}\n") 

305 for line in string.split("\n"): 

306 write(f"+{line}\n")