Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/history.py: 29%

128 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1""" 

2Implementations for the history of a `Buffer`. 

3 

4NOTE: There is no `DynamicHistory`: 

5 This doesn't work well, because the `Buffer` needs to be able to attach 

6 an event handler to the event when a history entry is loaded. This 

7 loading can be done asynchronously and making the history swappable would 

8 probably break this. 

9""" 

10from __future__ import annotations 

11 

12import datetime 

13import os 

14import threading 

15from abc import ABCMeta, abstractmethod 

16from asyncio import get_running_loop 

17from typing import AsyncGenerator, Iterable, Sequence 

18 

19__all__ = [ 

20 "History", 

21 "ThreadedHistory", 

22 "DummyHistory", 

23 "FileHistory", 

24 "InMemoryHistory", 

25] 

26 

27 

28class History(metaclass=ABCMeta): 

29 """ 

30 Base ``History`` class. 

31 

32 This also includes abstract methods for loading/storing history. 

33 """ 

34 

35 def __init__(self) -> None: 

36 # In memory storage for strings. 

37 self._loaded = False 

38 

39 # History that's loaded already, in reverse order. Latest, most recent 

40 # item first. 

41 self._loaded_strings: list[str] = [] 

42 

43 # 

44 # Methods expected by `Buffer`. 

45 # 

46 

47 async def load(self) -> AsyncGenerator[str, None]: 

48 """ 

49 Load the history and yield all the entries in reverse order (latest, 

50 most recent history entry first). 

51 

52 This method can be called multiple times from the `Buffer` to 

53 repopulate the history when prompting for a new input. So we are 

54 responsible here for both caching, and making sure that strings that 

55 were were appended to the history will be incorporated next time this 

56 method is called. 

57 """ 

58 if not self._loaded: 

59 self._loaded_strings = list(self.load_history_strings()) 

60 self._loaded = True 

61 

62 for item in self._loaded_strings: 

63 yield item 

64 

65 def get_strings(self) -> list[str]: 

66 """ 

67 Get the strings from the history that are loaded so far. 

68 (In order. Oldest item first.) 

69 """ 

70 return self._loaded_strings[::-1] 

71 

72 def append_string(self, string: str) -> None: 

73 "Add string to the history." 

74 self._loaded_strings.insert(0, string) 

75 self.store_string(string) 

76 

77 # 

78 # Implementation for specific backends. 

79 # 

80 

81 @abstractmethod 

82 def load_history_strings(self) -> Iterable[str]: 

83 """ 

84 This should be a generator that yields `str` instances. 

85 

86 It should yield the most recent items first, because they are the most 

87 important. (The history can already be used, even when it's only 

88 partially loaded.) 

89 """ 

90 while False: 

91 yield 

92 

93 @abstractmethod 

94 def store_string(self, string: str) -> None: 

95 """ 

96 Store the string in persistent storage. 

97 """ 

98 

99 

100class ThreadedHistory(History): 

101 """ 

102 Wrapper around `History` implementations that run the `load()` generator in 

103 a thread. 

104 

105 Use this to increase the start-up time of prompt_toolkit applications. 

106 History entries are available as soon as they are loaded. We don't have to 

107 wait for everything to be loaded. 

108 """ 

109 

110 def __init__(self, history: History) -> None: 

111 super().__init__() 

112 

113 self.history = history 

114 

115 self._load_thread: threading.Thread | None = None 

116 

117 # Lock for accessing/manipulating `_loaded_strings` and `_loaded` 

118 # together in a consistent state. 

119 self._lock = threading.Lock() 

120 

121 # Events created by each `load()` call. Used to wait for new history 

122 # entries from the loader thread. 

123 self._string_load_events: list[threading.Event] = [] 

124 

125 async def load(self) -> AsyncGenerator[str, None]: 

126 """ 

127 Like `History.load(), but call `self.load_history_strings()` in a 

128 background thread. 

129 """ 

130 # Start the load thread, if this is called for the first time. 

131 if not self._load_thread: 

132 self._load_thread = threading.Thread( 

133 target=self._in_load_thread, 

134 daemon=True, 

135 ) 

136 self._load_thread.start() 

137 

138 # Consume the `_loaded_strings` list, using asyncio. 

139 loop = get_running_loop() 

140 

141 # Create threading Event so that we can wait for new items. 

142 event = threading.Event() 

143 event.set() 

144 self._string_load_events.append(event) 

145 

146 items_yielded = 0 

147 

148 try: 

149 while True: 

150 # Wait for new items to be available. 

151 # (Use a timeout, because the executor thread is not a daemon 

152 # thread. The "slow-history.py" example would otherwise hang if 

153 # Control-C is pressed before the history is fully loaded, 

154 # because there's still this non-daemon executor thread waiting 

155 # for this event.) 

156 got_timeout = await loop.run_in_executor( 

157 None, lambda: event.wait(timeout=0.5) 

158 ) 

159 if not got_timeout: 

160 continue 

161 

162 # Read new items (in lock). 

163 def in_executor() -> tuple[list[str], bool]: 

164 with self._lock: 

165 new_items = self._loaded_strings[items_yielded:] 

166 done = self._loaded 

167 event.clear() 

168 return new_items, done 

169 

170 new_items, done = await loop.run_in_executor(None, in_executor) 

171 

172 items_yielded += len(new_items) 

173 

174 for item in new_items: 

175 yield item 

176 

177 if done: 

178 break 

179 finally: 

180 self._string_load_events.remove(event) 

181 

182 def _in_load_thread(self) -> None: 

183 try: 

184 # Start with an empty list. In case `append_string()` was called 

185 # before `load()` happened. Then `.store_string()` will have 

186 # written these entries back to disk and we will reload it. 

187 self._loaded_strings = [] 

188 

189 for item in self.history.load_history_strings(): 

190 with self._lock: 

191 self._loaded_strings.append(item) 

192 

193 for event in self._string_load_events: 

194 event.set() 

195 finally: 

196 with self._lock: 

197 self._loaded = True 

198 for event in self._string_load_events: 

199 event.set() 

200 

201 def append_string(self, string: str) -> None: 

202 with self._lock: 

203 self._loaded_strings.insert(0, string) 

204 self.store_string(string) 

205 

206 # All of the following are proxied to `self.history`. 

207 

208 def load_history_strings(self) -> Iterable[str]: 

209 return self.history.load_history_strings() 

210 

211 def store_string(self, string: str) -> None: 

212 self.history.store_string(string) 

213 

214 def __repr__(self) -> str: 

215 return f"ThreadedHistory({self.history!r})" 

216 

217 

218class InMemoryHistory(History): 

219 """ 

220 :class:`.History` class that keeps a list of all strings in memory. 

221 

222 In order to prepopulate the history, it's possible to call either 

223 `append_string` for all items or pass a list of strings to `__init__` here. 

224 """ 

225 

226 def __init__(self, history_strings: Sequence[str] | None = None) -> None: 

227 super().__init__() 

228 # Emulating disk storage. 

229 if history_strings is None: 

230 self._storage = [] 

231 else: 

232 self._storage = list(history_strings) 

233 

234 def load_history_strings(self) -> Iterable[str]: 

235 yield from self._storage[::-1] 

236 

237 def store_string(self, string: str) -> None: 

238 self._storage.append(string) 

239 

240 

241class DummyHistory(History): 

242 """ 

243 :class:`.History` object that doesn't remember anything. 

244 """ 

245 

246 def load_history_strings(self) -> Iterable[str]: 

247 return [] 

248 

249 def store_string(self, string: str) -> None: 

250 pass 

251 

252 def append_string(self, string: str) -> None: 

253 # Don't remember this. 

254 pass 

255 

256 

257class FileHistory(History): 

258 """ 

259 :class:`.History` class that stores all strings in a file. 

260 """ 

261 

262 def __init__(self, filename: str) -> None: 

263 self.filename = filename 

264 super().__init__() 

265 

266 def load_history_strings(self) -> Iterable[str]: 

267 strings: list[str] = [] 

268 lines: list[str] = [] 

269 

270 def add() -> None: 

271 if lines: 

272 # Join and drop trailing newline. 

273 string = "".join(lines)[:-1] 

274 

275 strings.append(string) 

276 

277 if os.path.exists(self.filename): 

278 with open(self.filename, "rb") as f: 

279 for line_bytes in f: 

280 line = line_bytes.decode("utf-8", errors="replace") 

281 

282 if line.startswith("+"): 

283 lines.append(line[1:]) 

284 else: 

285 add() 

286 lines = [] 

287 

288 add() 

289 

290 # Reverse the order, because newest items have to go first. 

291 return reversed(strings) 

292 

293 def store_string(self, string: str) -> None: 

294 # Save to file. 

295 with open(self.filename, "ab") as f: 

296 

297 def write(t: str) -> None: 

298 f.write(t.encode("utf-8")) 

299 

300 write("\n# %s\n" % datetime.datetime.now()) 

301 for line in string.split("\n"): 

302 write("+%s\n" % line)