Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prompt_toolkit/patch_stdout.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

122 statements  

1""" 

2patch_stdout 

3============ 

4 

5This implements a context manager that ensures that print statements within 

6it won't destroy the user interface. The context manager will replace 

7`sys.stdout` by something that draws the output above the current prompt, 

8rather than overwriting the UI. 

9 

10Usage:: 

11 

12 with patch_stdout(application): 

13 ... 

14 application.run() 

15 ... 

16 

17Multiple applications can run in the body of the context manager, one after the 

18other. 

19""" 

20 

21from __future__ import annotations 

22 

23import asyncio 

24import queue 

25import sys 

26import threading 

27import time 

28from contextlib import contextmanager 

29from typing import Generator, TextIO, cast 

30 

31from .application import get_app_session, run_in_terminal 

32from .output import Output 

33 

34__all__ = [ 

35 "patch_stdout", 

36 "StdoutProxy", 

37] 

38 

39 

40@contextmanager 

41def patch_stdout(raw: bool = False) -> Generator[None, None, None]: 

42 """ 

43 Replace `sys.stdout` by an :class:`_StdoutProxy` instance. 

44 

45 Writing to this proxy will make sure that the text appears above the 

46 prompt, and that it doesn't destroy the output from the renderer. If no 

47 application is curring, the behavior should be identical to writing to 

48 `sys.stdout` directly. 

49 

50 Warning: If a new event loop is installed using `asyncio.set_event_loop()`, 

51 then make sure that the context manager is applied after the event loop 

52 is changed. Printing to stdout will be scheduled in the event loop 

53 that's active when the context manager is created. 

54 

55 :param raw: (`bool`) When True, vt100 terminal escape sequences are not 

56 removed/escaped. 

57 """ 

58 with StdoutProxy(raw=raw) as proxy: 

59 original_stdout = sys.stdout 

60 original_stderr = sys.stderr 

61 

62 # Enter. 

63 sys.stdout = cast(TextIO, proxy) 

64 sys.stderr = cast(TextIO, proxy) 

65 

66 try: 

67 yield 

68 finally: 

69 sys.stdout = original_stdout 

70 sys.stderr = original_stderr 

71 

72 

73class _Done: 

74 "Sentinel value for stopping the stdout proxy." 

75 

76 

77class StdoutProxy: 

78 """ 

79 File-like object, which prints everything written to it, output above the 

80 current application/prompt. This class is compatible with other file 

81 objects and can be used as a drop-in replacement for `sys.stdout` or can 

82 for instance be passed to `logging.StreamHandler`. 

83 

84 The current application, above which we print, is determined by looking 

85 what application currently runs in the `AppSession` that is active during 

86 the creation of this instance. 

87 

88 This class can be used as a context manager. 

89 

90 In order to avoid having to repaint the prompt continuously for every 

91 little write, a short delay of `sleep_between_writes` seconds will be added 

92 between writes in order to bundle many smaller writes in a short timespan. 

93 """ 

94 

95 def __init__( 

96 self, 

97 sleep_between_writes: float = 0.2, 

98 raw: bool = False, 

99 ) -> None: 

100 self.sleep_between_writes = sleep_between_writes 

101 self.raw = raw 

102 

103 self._lock = threading.RLock() 

104 self._buffer: list[str] = [] 

105 

106 # Keep track of the curret app session. 

107 self.app_session = get_app_session() 

108 

109 # See what output is active *right now*. We should do it at this point, 

110 # before this `StdoutProxy` instance is possibly assigned to `sys.stdout`. 

111 # Otherwise, if `patch_stdout` is used, and no `Output` instance has 

112 # been created, then the default output creation code will see this 

113 # proxy object as `sys.stdout`, and get in a recursive loop trying to 

114 # access `StdoutProxy.isatty()` which will again retrieve the output. 

115 self._output: Output = self.app_session.output 

116 

117 # Flush thread 

118 self._flush_queue: queue.Queue[str | _Done] = queue.Queue() 

119 self._flush_thread = self._start_write_thread() 

120 self.closed = False 

121 

122 def __enter__(self) -> StdoutProxy: 

123 return self 

124 

125 def __exit__(self, *args: object) -> None: 

126 self.close() 

127 

128 def close(self) -> None: 

129 """ 

130 Stop `StdoutProxy` proxy. 

131 

132 This will terminate the write thread, make sure everything is flushed 

133 and wait for the write thread to finish. 

134 """ 

135 if not self.closed: 

136 self._flush_queue.put(_Done()) 

137 self._flush_thread.join() 

138 self.closed = True 

139 

140 def _start_write_thread(self) -> threading.Thread: 

141 thread = threading.Thread( 

142 target=self._write_thread, 

143 name="patch-stdout-flush-thread", 

144 daemon=True, 

145 ) 

146 thread.start() 

147 return thread 

148 

149 def _write_thread(self) -> None: 

150 done = False 

151 

152 while not done: 

153 item = self._flush_queue.get() 

154 

155 if isinstance(item, _Done): 

156 break 

157 

158 # Don't bother calling when we got an empty string. 

159 if not item: 

160 continue 

161 

162 text = [] 

163 text.append(item) 

164 

165 # Read the rest of the queue if more data was queued up. 

166 while True: 

167 try: 

168 item = self._flush_queue.get_nowait() 

169 except queue.Empty: 

170 break 

171 else: 

172 if isinstance(item, _Done): 

173 done = True 

174 else: 

175 text.append(item) 

176 

177 app_loop = self._get_app_loop() 

178 self._write_and_flush(app_loop, "".join(text)) 

179 

180 # If an application was running that requires repainting, then wait 

181 # for a very short time, in order to bundle actual writes and avoid 

182 # having to repaint to often. 

183 if app_loop is not None: 

184 time.sleep(self.sleep_between_writes) 

185 

186 def _get_app_loop(self) -> asyncio.AbstractEventLoop | None: 

187 """ 

188 Return the event loop for the application currently running in our 

189 `AppSession`. 

190 """ 

191 app = self.app_session.app 

192 

193 if app is None: 

194 return None 

195 

196 return app.loop 

197 

198 def _write_and_flush( 

199 self, loop: asyncio.AbstractEventLoop | None, text: str 

200 ) -> None: 

201 """ 

202 Write the given text to stdout and flush. 

203 If an application is running, use `run_in_terminal`. 

204 """ 

205 

206 def write_and_flush() -> None: 

207 # Ensure that autowrap is enabled before calling `write`. 

208 # XXX: On Windows, the `Windows10_Output` enables/disables VT 

209 # terminal processing for every flush. It turns out that this 

210 # causes autowrap to be reset (disabled) after each flush. So, 

211 # we have to enable it again before writing text. 

212 self._output.enable_autowrap() 

213 

214 if self.raw: 

215 self._output.write_raw(text) 

216 else: 

217 self._output.write(text) 

218 

219 self._output.flush() 

220 

221 def write_and_flush_in_loop() -> None: 

222 # If an application is running, use `run_in_terminal`, otherwise 

223 # call it directly. 

224 run_in_terminal(write_and_flush, in_executor=False) 

225 

226 if loop is None: 

227 # No loop, write immediately. 

228 write_and_flush() 

229 else: 

230 # Make sure `write_and_flush` is executed *in* the event loop, not 

231 # in another thread. 

232 loop.call_soon_threadsafe(write_and_flush_in_loop) 

233 

234 def _write(self, data: str) -> None: 

235 """ 

236 Note: print()-statements cause to multiple write calls. 

237 (write('line') and write('\n')). Of course we don't want to call 

238 `run_in_terminal` for every individual call, because that's too 

239 expensive, and as long as the newline hasn't been written, the 

240 text itself is again overwritten by the rendering of the input 

241 command line. Therefor, we have a little buffer which holds the 

242 text until a newline is written to stdout. 

243 """ 

244 if "\n" in data: 

245 # When there is a newline in the data, write everything before the 

246 # newline, including the newline itself. 

247 before, after = data.rsplit("\n", 1) 

248 to_write = self._buffer + [before, "\n"] 

249 self._buffer = [after] 

250 

251 text = "".join(to_write) 

252 self._flush_queue.put(text) 

253 else: 

254 # Otherwise, cache in buffer. 

255 self._buffer.append(data) 

256 

257 def _flush(self) -> None: 

258 text = "".join(self._buffer) 

259 self._buffer = [] 

260 self._flush_queue.put(text) 

261 

262 def write(self, data: str) -> int: 

263 with self._lock: 

264 self._write(data) 

265 

266 return len(data) # Pretend everything was written. 

267 

268 def flush(self) -> None: 

269 """ 

270 Flush buffered output. 

271 """ 

272 with self._lock: 

273 self._flush() 

274 

275 @property 

276 def original_stdout(self) -> TextIO | None: 

277 return self._output.stdout or sys.__stdout__ 

278 

279 # Attributes for compatibility with sys.__stdout__: 

280 

281 def fileno(self) -> int: 

282 return self._output.fileno() 

283 

284 def isatty(self) -> bool: 

285 stdout = self._output.stdout 

286 if stdout is None: 

287 return False 

288 

289 return stdout.isatty() 

290 

291 @property 

292 def encoding(self) -> str: 

293 return self._output.encoding() 

294 

295 @property 

296 def errors(self) -> str: 

297 return "strict"