Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/patch_stdout.py: 29%
120 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1"""
2patch_stdout
3============
5This implements a context manager that ensures that print statements within
6it won't destroy the user interface. The context manager will replace
7`sys.stdout` by something that draws the output above the current prompt,
8rather than overwriting the UI.
10Usage::
12 with patch_stdout(application):
13 ...
14 application.run()
15 ...
17Multiple applications can run in the body of the context manager, one after the
18other.
19"""
20from __future__ import annotations
22import asyncio
23import queue
24import sys
25import threading
26import time
27from contextlib import contextmanager
28from typing import Generator, TextIO, cast
30from .application import get_app_session, run_in_terminal
31from .output import Output
33__all__ = [
34 "patch_stdout",
35 "StdoutProxy",
36]
39@contextmanager
40def patch_stdout(raw: bool = False) -> Generator[None, None, None]:
41 """
42 Replace `sys.stdout` by an :class:`_StdoutProxy` instance.
44 Writing to this proxy will make sure that the text appears above the
45 prompt, and that it doesn't destroy the output from the renderer. If no
46 application is curring, the behavior should be identical to writing to
47 `sys.stdout` directly.
49 Warning: If a new event loop is installed using `asyncio.set_event_loop()`,
50 then make sure that the context manager is applied after the event loop
51 is changed. Printing to stdout will be scheduled in the event loop
52 that's active when the context manager is created.
54 :param raw: (`bool`) When True, vt100 terminal escape sequences are not
55 removed/escaped.
56 """
57 with StdoutProxy(raw=raw) as proxy:
58 original_stdout = sys.stdout
59 original_stderr = sys.stderr
61 # Enter.
62 sys.stdout = cast(TextIO, proxy)
63 sys.stderr = cast(TextIO, proxy)
65 try:
66 yield
67 finally:
68 sys.stdout = original_stdout
69 sys.stderr = original_stderr
72class _Done:
73 "Sentinel value for stopping the stdout proxy."
76class StdoutProxy:
77 """
78 File-like object, which prints everything written to it, output above the
79 current application/prompt. This class is compatible with other file
80 objects and can be used as a drop-in replacement for `sys.stdout` or can
81 for instance be passed to `logging.StreamHandler`.
83 The current application, above which we print, is determined by looking
84 what application currently runs in the `AppSession` that is active during
85 the creation of this instance.
87 This class can be used as a context manager.
89 In order to avoid having to repaint the prompt continuously for every
90 little write, a short delay of `sleep_between_writes` seconds will be added
91 between writes in order to bundle many smaller writes in a short timespan.
92 """
94 def __init__(
95 self,
96 sleep_between_writes: float = 0.2,
97 raw: bool = False,
98 ) -> None:
99 self.sleep_between_writes = sleep_between_writes
100 self.raw = raw
102 self._lock = threading.RLock()
103 self._buffer: list[str] = []
105 # Keep track of the curret app session.
106 self.app_session = get_app_session()
108 # See what output is active *right now*. We should do it at this point,
109 # before this `StdoutProxy` instance is possibly assigned to `sys.stdout`.
110 # Otherwise, if `patch_stdout` is used, and no `Output` instance has
111 # been created, then the default output creation code will see this
112 # proxy object as `sys.stdout`, and get in a recursive loop trying to
113 # access `StdoutProxy.isatty()` which will again retrieve the output.
114 self._output: Output = self.app_session.output
116 # Flush thread
117 self._flush_queue: queue.Queue[str | _Done] = queue.Queue()
118 self._flush_thread = self._start_write_thread()
119 self.closed = False
121 def __enter__(self) -> StdoutProxy:
122 return self
124 def __exit__(self, *args: object) -> None:
125 self.close()
127 def close(self) -> None:
128 """
129 Stop `StdoutProxy` proxy.
131 This will terminate the write thread, make sure everything is flushed
132 and wait for the write thread to finish.
133 """
134 if not self.closed:
135 self._flush_queue.put(_Done())
136 self._flush_thread.join()
137 self.closed = True
139 def _start_write_thread(self) -> threading.Thread:
140 thread = threading.Thread(
141 target=self._write_thread,
142 name="patch-stdout-flush-thread",
143 daemon=True,
144 )
145 thread.start()
146 return thread
148 def _write_thread(self) -> None:
149 done = False
151 while not done:
152 item = self._flush_queue.get()
154 if isinstance(item, _Done):
155 break
157 # Don't bother calling when we got an empty string.
158 if not item:
159 continue
161 text = []
162 text.append(item)
164 # Read the rest of the queue if more data was queued up.
165 while True:
166 try:
167 item = self._flush_queue.get_nowait()
168 except queue.Empty:
169 break
170 else:
171 if isinstance(item, _Done):
172 done = True
173 else:
174 text.append(item)
176 app_loop = self._get_app_loop()
177 self._write_and_flush(app_loop, "".join(text))
179 # If an application was running that requires repainting, then wait
180 # for a very short time, in order to bundle actual writes and avoid
181 # having to repaint to often.
182 if app_loop is not None:
183 time.sleep(self.sleep_between_writes)
185 def _get_app_loop(self) -> asyncio.AbstractEventLoop | None:
186 """
187 Return the event loop for the application currently running in our
188 `AppSession`.
189 """
190 app = self.app_session.app
192 if app is None:
193 return None
195 return app.loop
197 def _write_and_flush(
198 self, loop: asyncio.AbstractEventLoop | None, text: str
199 ) -> None:
200 """
201 Write the given text to stdout and flush.
202 If an application is running, use `run_in_terminal`.
203 """
205 def write_and_flush() -> None:
206 # Ensure that autowrap is enabled before calling `write`.
207 # XXX: On Windows, the `Windows10_Output` enables/disables VT
208 # terminal processing for every flush. It turns out that this
209 # causes autowrap to be reset (disabled) after each flush. So,
210 # we have to enable it again before writing text.
211 self._output.enable_autowrap()
213 if self.raw:
214 self._output.write_raw(text)
215 else:
216 self._output.write(text)
218 self._output.flush()
220 def write_and_flush_in_loop() -> None:
221 # If an application is running, use `run_in_terminal`, otherwise
222 # call it directly.
223 run_in_terminal(write_and_flush, in_executor=False)
225 if loop is None:
226 # No loop, write immediately.
227 write_and_flush()
228 else:
229 # Make sure `write_and_flush` is executed *in* the event loop, not
230 # in another thread.
231 loop.call_soon_threadsafe(write_and_flush_in_loop)
233 def _write(self, data: str) -> None:
234 """
235 Note: print()-statements cause to multiple write calls.
236 (write('line') and write('\n')). Of course we don't want to call
237 `run_in_terminal` for every individual call, because that's too
238 expensive, and as long as the newline hasn't been written, the
239 text itself is again overwritten by the rendering of the input
240 command line. Therefor, we have a little buffer which holds the
241 text until a newline is written to stdout.
242 """
243 if "\n" in data:
244 # When there is a newline in the data, write everything before the
245 # newline, including the newline itself.
246 before, after = data.rsplit("\n", 1)
247 to_write = self._buffer + [before, "\n"]
248 self._buffer = [after]
250 text = "".join(to_write)
251 self._flush_queue.put(text)
252 else:
253 # Otherwise, cache in buffer.
254 self._buffer.append(data)
256 def _flush(self) -> None:
257 text = "".join(self._buffer)
258 self._buffer = []
259 self._flush_queue.put(text)
261 def write(self, data: str) -> int:
262 with self._lock:
263 self._write(data)
265 return len(data) # Pretend everything was written.
267 def flush(self) -> None:
268 """
269 Flush buffered output.
270 """
271 with self._lock:
272 self._flush()
274 @property
275 def original_stdout(self) -> TextIO:
276 return self._output.stdout or sys.__stdout__
278 # Attributes for compatibility with sys.__stdout__:
280 def fileno(self) -> int:
281 return self._output.fileno()
283 def isatty(self) -> bool:
284 stdout = self._output.stdout
285 if stdout is None:
286 return False
288 return stdout.isatty()
290 @property
291 def encoding(self) -> str:
292 return self._output.encoding()
294 @property
295 def errors(self) -> str:
296 return "strict"