Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rich/file_proxy.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

42 statements  

1import io 

2from typing import IO, TYPE_CHECKING, Any, List 

3 

4from .ansi import AnsiDecoder 

5from .text import Text 

6 

7if TYPE_CHECKING: 

8 from .console import Console 

9 

10 

11class FileProxy(io.TextIOBase): 

12 """Wraps a file (e.g. sys.stdout) and redirects writes to a console.""" 

13 

14 def __init__(self, console: "Console", file: IO[str]) -> None: 

15 self.__console = console 

16 self.__file = file 

17 self.__buffer: List[str] = [] 

18 self.__ansi_decoder = AnsiDecoder() 

19 

20 @property 

21 def rich_proxied_file(self) -> IO[str]: 

22 """Get proxied file.""" 

23 return self.__file 

24 

25 def __getattr__(self, name: str) -> Any: 

26 return getattr(self.__file, name) 

27 

28 def write(self, text: str) -> int: 

29 if not isinstance(text, str): 

30 raise TypeError(f"write() argument must be str, not {type(text).__name__}") 

31 buffer = self.__buffer 

32 lines: List[str] = [] 

33 while text: 

34 line, new_line, text = text.partition("\n") 

35 if new_line: 

36 lines.append("".join(buffer) + line) 

37 buffer.clear() 

38 else: 

39 buffer.append(line) 

40 break 

41 if lines: 

42 console = self.__console 

43 with console: 

44 output = Text("\n").join( 

45 self.__ansi_decoder.decode_line(line) for line in lines 

46 ) 

47 console.print(output) 

48 return len(text) 

49 

50 def flush(self) -> None: 

51 output = "".join(self.__buffer) 

52 if output: 

53 self.__console.print(output) 

54 del self.__buffer[:] 

55 

56 def fileno(self) -> int: 

57 return self.__file.fileno()