1"""
2patch_stdout
3============
4
5This implements a context manager that ensures that print statements within
6it won't destroy the user interface. The context manager will replace
7`sys.stdout` by something that draws the output above the current prompt,
8rather than overwriting the UI.
9
10Usage::
11
12 with patch_stdout(application):
13 ...
14 application.run()
15 ...
16
17Multiple applications can run in the body of the context manager, one after the
18other.
19"""
20
21from __future__ import annotations
22
23import asyncio
24import queue
25import sys
26import threading
27import time
28from contextlib import contextmanager
29from typing import Generator, TextIO, cast
30
31from .application import get_app_session, run_in_terminal
32from .output import Output
33
34__all__ = [
35 "patch_stdout",
36 "StdoutProxy",
37]
38
39
40@contextmanager
41def patch_stdout(raw: bool = False) -> Generator[None, None, None]:
42 """
43 Replace `sys.stdout` by an :class:`_StdoutProxy` instance.
44
45 Writing to this proxy will make sure that the text appears above the
46 prompt, and that it doesn't destroy the output from the renderer. If no
47 application is curring, the behavior should be identical to writing to
48 `sys.stdout` directly.
49
50 Warning: If a new event loop is installed using `asyncio.set_event_loop()`,
51 then make sure that the context manager is applied after the event loop
52 is changed. Printing to stdout will be scheduled in the event loop
53 that's active when the context manager is created.
54
55 :param raw: (`bool`) When True, vt100 terminal escape sequences are not
56 removed/escaped.
57 """
58 with StdoutProxy(raw=raw) as proxy:
59 original_stdout = sys.stdout
60 original_stderr = sys.stderr
61
62 # Enter.
63 sys.stdout = cast(TextIO, proxy)
64 sys.stderr = cast(TextIO, proxy)
65
66 try:
67 yield
68 finally:
69 sys.stdout = original_stdout
70 sys.stderr = original_stderr
71
72
73class _Done:
74 "Sentinel value for stopping the stdout proxy."
75
76
77class StdoutProxy:
78 """
79 File-like object, which prints everything written to it, output above the
80 current application/prompt. This class is compatible with other file
81 objects and can be used as a drop-in replacement for `sys.stdout` or can
82 for instance be passed to `logging.StreamHandler`.
83
84 The current application, above which we print, is determined by looking
85 what application currently runs in the `AppSession` that is active during
86 the creation of this instance.
87
88 This class can be used as a context manager.
89
90 In order to avoid having to repaint the prompt continuously for every
91 little write, a short delay of `sleep_between_writes` seconds will be added
92 between writes in order to bundle many smaller writes in a short timespan.
93 """
94
95 def __init__(
96 self,
97 sleep_between_writes: float = 0.2,
98 raw: bool = False,
99 ) -> None:
100 self.sleep_between_writes = sleep_between_writes
101 self.raw = raw
102
103 self._lock = threading.RLock()
104 self._buffer: list[str] = []
105
106 # Keep track of the curret app session.
107 self.app_session = get_app_session()
108
109 # See what output is active *right now*. We should do it at this point,
110 # before this `StdoutProxy` instance is possibly assigned to `sys.stdout`.
111 # Otherwise, if `patch_stdout` is used, and no `Output` instance has
112 # been created, then the default output creation code will see this
113 # proxy object as `sys.stdout`, and get in a recursive loop trying to
114 # access `StdoutProxy.isatty()` which will again retrieve the output.
115 self._output: Output = self.app_session.output
116
117 # Flush thread
118 self._flush_queue: queue.Queue[str | _Done] = queue.Queue()
119 self._flush_thread = self._start_write_thread()
120 self.closed = False
121
122 def __enter__(self) -> StdoutProxy:
123 return self
124
125 def __exit__(self, *args: object) -> None:
126 self.close()
127
128 def close(self) -> None:
129 """
130 Stop `StdoutProxy` proxy.
131
132 This will terminate the write thread, make sure everything is flushed
133 and wait for the write thread to finish.
134 """
135 if not self.closed:
136 self._flush_queue.put(_Done())
137 self._flush_thread.join()
138 self.closed = True
139
140 def _start_write_thread(self) -> threading.Thread:
141 thread = threading.Thread(
142 target=self._write_thread,
143 name="patch-stdout-flush-thread",
144 daemon=True,
145 )
146 thread.start()
147 return thread
148
149 def _write_thread(self) -> None:
150 done = False
151
152 while not done:
153 item = self._flush_queue.get()
154
155 if isinstance(item, _Done):
156 break
157
158 # Don't bother calling when we got an empty string.
159 if not item:
160 continue
161
162 text = []
163 text.append(item)
164
165 # Read the rest of the queue if more data was queued up.
166 while True:
167 try:
168 item = self._flush_queue.get_nowait()
169 except queue.Empty:
170 break
171 else:
172 if isinstance(item, _Done):
173 done = True
174 else:
175 text.append(item)
176
177 app_loop = self._get_app_loop()
178 self._write_and_flush(app_loop, "".join(text))
179
180 # If an application was running that requires repainting, then wait
181 # for a very short time, in order to bundle actual writes and avoid
182 # having to repaint to often.
183 if app_loop is not None:
184 time.sleep(self.sleep_between_writes)
185
186 def _get_app_loop(self) -> asyncio.AbstractEventLoop | None:
187 """
188 Return the event loop for the application currently running in our
189 `AppSession`.
190 """
191 app = self.app_session.app
192
193 if app is None:
194 return None
195
196 return app.loop
197
198 def _write_and_flush(
199 self, loop: asyncio.AbstractEventLoop | None, text: str
200 ) -> None:
201 """
202 Write the given text to stdout and flush.
203 If an application is running, use `run_in_terminal`.
204 """
205
206 def write_and_flush() -> None:
207 # Ensure that autowrap is enabled before calling `write`.
208 # XXX: On Windows, the `Windows10_Output` enables/disables VT
209 # terminal processing for every flush. It turns out that this
210 # causes autowrap to be reset (disabled) after each flush. So,
211 # we have to enable it again before writing text.
212 self._output.enable_autowrap()
213
214 if self.raw:
215 self._output.write_raw(text)
216 else:
217 self._output.write(text)
218
219 self._output.flush()
220
221 def write_and_flush_in_loop() -> None:
222 # If an application is running, use `run_in_terminal`, otherwise
223 # call it directly.
224 run_in_terminal(write_and_flush, in_executor=False)
225
226 if loop is None:
227 # No loop, write immediately.
228 write_and_flush()
229 else:
230 # Make sure `write_and_flush` is executed *in* the event loop, not
231 # in another thread.
232 loop.call_soon_threadsafe(write_and_flush_in_loop)
233
234 def _write(self, data: str) -> None:
235 """
236 Note: print()-statements cause to multiple write calls.
237 (write('line') and write('\n')). Of course we don't want to call
238 `run_in_terminal` for every individual call, because that's too
239 expensive, and as long as the newline hasn't been written, the
240 text itself is again overwritten by the rendering of the input
241 command line. Therefor, we have a little buffer which holds the
242 text until a newline is written to stdout.
243 """
244 if "\n" in data:
245 # When there is a newline in the data, write everything before the
246 # newline, including the newline itself.
247 before, after = data.rsplit("\n", 1)
248 to_write = self._buffer + [before, "\n"]
249 self._buffer = [after]
250
251 text = "".join(to_write)
252 self._flush_queue.put(text)
253 else:
254 # Otherwise, cache in buffer.
255 self._buffer.append(data)
256
257 def _flush(self) -> None:
258 text = "".join(self._buffer)
259 self._buffer = []
260 self._flush_queue.put(text)
261
262 def write(self, data: str) -> int:
263 with self._lock:
264 self._write(data)
265
266 return len(data) # Pretend everything was written.
267
268 def flush(self) -> None:
269 """
270 Flush buffered output.
271 """
272 with self._lock:
273 self._flush()
274
275 @property
276 def original_stdout(self) -> TextIO | None:
277 return self._output.stdout or sys.__stdout__
278
279 # Attributes for compatibility with sys.__stdout__:
280
281 def fileno(self) -> int:
282 return self._output.fileno()
283
284 def isatty(self) -> bool:
285 stdout = self._output.stdout
286 if stdout is None:
287 return False
288
289 return stdout.isatty()
290
291 @property
292 def encoding(self) -> str:
293 return self._output.encoding()
294
295 @property
296 def errors(self) -> str:
297 return "strict"