Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rich/file_proxy.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import io
2from typing import IO, TYPE_CHECKING, Any, List
4from .ansi import AnsiDecoder
5from .text import Text
7if TYPE_CHECKING:
8 from .console import Console
11class FileProxy(io.TextIOBase):
12 """Wraps a file (e.g. sys.stdout) and redirects writes to a console."""
14 def __init__(self, console: "Console", file: IO[str]) -> None:
15 self.__console = console
16 self.__file = file
17 self.__buffer: List[str] = []
18 self.__ansi_decoder = AnsiDecoder()
20 @property
21 def rich_proxied_file(self) -> IO[str]:
22 """Get proxied file."""
23 return self.__file
25 def __getattr__(self, name: str) -> Any:
26 return getattr(self.__file, name)
28 def write(self, text: str) -> int:
29 if not isinstance(text, str):
30 raise TypeError(f"write() argument must be str, not {type(text).__name__}")
31 buffer = self.__buffer
32 lines: List[str] = []
33 while text:
34 line, new_line, text = text.partition("\n")
35 if new_line:
36 lines.append("".join(buffer) + line)
37 buffer.clear()
38 else:
39 buffer.append(line)
40 break
41 if lines:
42 console = self.__console
43 with console:
44 output = Text("\n").join(
45 self.__ansi_decoder.decode_line(line) for line in lines
46 )
47 console.print(output)
48 return len(text)
50 def flush(self) -> None:
51 output = "".join(self.__buffer)
52 if output:
53 self.__console.print(output)
54 del self.__buffer[:]
56 def fileno(self) -> int:
57 return self.__file.fileno()