Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/patch_stdout.py: 29%

120 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1""" 

2patch_stdout 

3============ 

4 

5This implements a context manager that ensures that print statements within 

6it won't destroy the user interface. The context manager will replace 

7`sys.stdout` by something that draws the output above the current prompt, 

8rather than overwriting the UI. 

9 

10Usage:: 

11 

12 with patch_stdout(application): 

13 ... 

14 application.run() 

15 ... 

16 

17Multiple applications can run in the body of the context manager, one after the 

18other. 

19""" 

20from __future__ import annotations 

21 

22import asyncio 

23import queue 

24import sys 

25import threading 

26import time 

27from contextlib import contextmanager 

28from typing import Generator, TextIO, cast 

29 

30from .application import get_app_session, run_in_terminal 

31from .output import Output 

32 

33__all__ = [ 

34 "patch_stdout", 

35 "StdoutProxy", 

36] 

37 

38 

39@contextmanager 

40def patch_stdout(raw: bool = False) -> Generator[None, None, None]: 

41 """ 

42 Replace `sys.stdout` by an :class:`_StdoutProxy` instance. 

43 

44 Writing to this proxy will make sure that the text appears above the 

45 prompt, and that it doesn't destroy the output from the renderer. If no 

46 application is curring, the behavior should be identical to writing to 

47 `sys.stdout` directly. 

48 

49 Warning: If a new event loop is installed using `asyncio.set_event_loop()`, 

50 then make sure that the context manager is applied after the event loop 

51 is changed. Printing to stdout will be scheduled in the event loop 

52 that's active when the context manager is created. 

53 

54 :param raw: (`bool`) When True, vt100 terminal escape sequences are not 

55 removed/escaped. 

56 """ 

57 with StdoutProxy(raw=raw) as proxy: 

58 original_stdout = sys.stdout 

59 original_stderr = sys.stderr 

60 

61 # Enter. 

62 sys.stdout = cast(TextIO, proxy) 

63 sys.stderr = cast(TextIO, proxy) 

64 

65 try: 

66 yield 

67 finally: 

68 sys.stdout = original_stdout 

69 sys.stderr = original_stderr 

70 

71 

72class _Done: 

73 "Sentinel value for stopping the stdout proxy." 

74 

75 

76class StdoutProxy: 

77 """ 

78 File-like object, which prints everything written to it, output above the 

79 current application/prompt. This class is compatible with other file 

80 objects and can be used as a drop-in replacement for `sys.stdout` or can 

81 for instance be passed to `logging.StreamHandler`. 

82 

83 The current application, above which we print, is determined by looking 

84 what application currently runs in the `AppSession` that is active during 

85 the creation of this instance. 

86 

87 This class can be used as a context manager. 

88 

89 In order to avoid having to repaint the prompt continuously for every 

90 little write, a short delay of `sleep_between_writes` seconds will be added 

91 between writes in order to bundle many smaller writes in a short timespan. 

92 """ 

93 

94 def __init__( 

95 self, 

96 sleep_between_writes: float = 0.2, 

97 raw: bool = False, 

98 ) -> None: 

99 self.sleep_between_writes = sleep_between_writes 

100 self.raw = raw 

101 

102 self._lock = threading.RLock() 

103 self._buffer: list[str] = [] 

104 

105 # Keep track of the curret app session. 

106 self.app_session = get_app_session() 

107 

108 # See what output is active *right now*. We should do it at this point, 

109 # before this `StdoutProxy` instance is possibly assigned to `sys.stdout`. 

110 # Otherwise, if `patch_stdout` is used, and no `Output` instance has 

111 # been created, then the default output creation code will see this 

112 # proxy object as `sys.stdout`, and get in a recursive loop trying to 

113 # access `StdoutProxy.isatty()` which will again retrieve the output. 

114 self._output: Output = self.app_session.output 

115 

116 # Flush thread 

117 self._flush_queue: queue.Queue[str | _Done] = queue.Queue() 

118 self._flush_thread = self._start_write_thread() 

119 self.closed = False 

120 

121 def __enter__(self) -> StdoutProxy: 

122 return self 

123 

124 def __exit__(self, *args: object) -> None: 

125 self.close() 

126 

127 def close(self) -> None: 

128 """ 

129 Stop `StdoutProxy` proxy. 

130 

131 This will terminate the write thread, make sure everything is flushed 

132 and wait for the write thread to finish. 

133 """ 

134 if not self.closed: 

135 self._flush_queue.put(_Done()) 

136 self._flush_thread.join() 

137 self.closed = True 

138 

139 def _start_write_thread(self) -> threading.Thread: 

140 thread = threading.Thread( 

141 target=self._write_thread, 

142 name="patch-stdout-flush-thread", 

143 daemon=True, 

144 ) 

145 thread.start() 

146 return thread 

147 

148 def _write_thread(self) -> None: 

149 done = False 

150 

151 while not done: 

152 item = self._flush_queue.get() 

153 

154 if isinstance(item, _Done): 

155 break 

156 

157 # Don't bother calling when we got an empty string. 

158 if not item: 

159 continue 

160 

161 text = [] 

162 text.append(item) 

163 

164 # Read the rest of the queue if more data was queued up. 

165 while True: 

166 try: 

167 item = self._flush_queue.get_nowait() 

168 except queue.Empty: 

169 break 

170 else: 

171 if isinstance(item, _Done): 

172 done = True 

173 else: 

174 text.append(item) 

175 

176 app_loop = self._get_app_loop() 

177 self._write_and_flush(app_loop, "".join(text)) 

178 

179 # If an application was running that requires repainting, then wait 

180 # for a very short time, in order to bundle actual writes and avoid 

181 # having to repaint to often. 

182 if app_loop is not None: 

183 time.sleep(self.sleep_between_writes) 

184 

185 def _get_app_loop(self) -> asyncio.AbstractEventLoop | None: 

186 """ 

187 Return the event loop for the application currently running in our 

188 `AppSession`. 

189 """ 

190 app = self.app_session.app 

191 

192 if app is None: 

193 return None 

194 

195 return app.loop 

196 

197 def _write_and_flush( 

198 self, loop: asyncio.AbstractEventLoop | None, text: str 

199 ) -> None: 

200 """ 

201 Write the given text to stdout and flush. 

202 If an application is running, use `run_in_terminal`. 

203 """ 

204 

205 def write_and_flush() -> None: 

206 # Ensure that autowrap is enabled before calling `write`. 

207 # XXX: On Windows, the `Windows10_Output` enables/disables VT 

208 # terminal processing for every flush. It turns out that this 

209 # causes autowrap to be reset (disabled) after each flush. So, 

210 # we have to enable it again before writing text. 

211 self._output.enable_autowrap() 

212 

213 if self.raw: 

214 self._output.write_raw(text) 

215 else: 

216 self._output.write(text) 

217 

218 self._output.flush() 

219 

220 def write_and_flush_in_loop() -> None: 

221 # If an application is running, use `run_in_terminal`, otherwise 

222 # call it directly. 

223 run_in_terminal(write_and_flush, in_executor=False) 

224 

225 if loop is None: 

226 # No loop, write immediately. 

227 write_and_flush() 

228 else: 

229 # Make sure `write_and_flush` is executed *in* the event loop, not 

230 # in another thread. 

231 loop.call_soon_threadsafe(write_and_flush_in_loop) 

232 

233 def _write(self, data: str) -> None: 

234 """ 

235 Note: print()-statements cause to multiple write calls. 

236 (write('line') and write('\n')). Of course we don't want to call 

237 `run_in_terminal` for every individual call, because that's too 

238 expensive, and as long as the newline hasn't been written, the 

239 text itself is again overwritten by the rendering of the input 

240 command line. Therefor, we have a little buffer which holds the 

241 text until a newline is written to stdout. 

242 """ 

243 if "\n" in data: 

244 # When there is a newline in the data, write everything before the 

245 # newline, including the newline itself. 

246 before, after = data.rsplit("\n", 1) 

247 to_write = self._buffer + [before, "\n"] 

248 self._buffer = [after] 

249 

250 text = "".join(to_write) 

251 self._flush_queue.put(text) 

252 else: 

253 # Otherwise, cache in buffer. 

254 self._buffer.append(data) 

255 

256 def _flush(self) -> None: 

257 text = "".join(self._buffer) 

258 self._buffer = [] 

259 self._flush_queue.put(text) 

260 

261 def write(self, data: str) -> int: 

262 with self._lock: 

263 self._write(data) 

264 

265 return len(data) # Pretend everything was written. 

266 

267 def flush(self) -> None: 

268 """ 

269 Flush buffered output. 

270 """ 

271 with self._lock: 

272 self._flush() 

273 

274 @property 

275 def original_stdout(self) -> TextIO: 

276 return self._output.stdout or sys.__stdout__ 

277 

278 # Attributes for compatibility with sys.__stdout__: 

279 

280 def fileno(self) -> int: 

281 return self._output.fileno() 

282 

283 def isatty(self) -> bool: 

284 stdout = self._output.stdout 

285 if stdout is None: 

286 return False 

287 

288 return stdout.isatty() 

289 

290 @property 

291 def encoding(self) -> str: 

292 return self._output.encoding() 

293 

294 @property 

295 def errors(self) -> str: 

296 return "strict"