Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/invoke/terminals.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

79 statements  

1""" 

2Utility functions surrounding terminal devices & I/O. 

3 

4Much of this code performs platform-sensitive branching, e.g. Windows support. 

5 

6This is its own module to abstract away what would otherwise be distracting 

7logic-flow interruptions. 

8""" 

9 

10from contextlib import contextmanager 

11from typing import Generator, IO, Optional, Tuple 

12import os 

13import select 

14import sys 

15 

16# TODO: move in here? They're currently platform-agnostic... 

17from .util import has_fileno, isatty 

18 

19 

20WINDOWS = sys.platform == "win32" 

21""" 

22Whether or not the current platform appears to be Windows in nature. 

23 

24Note that Cygwin's Python is actually close enough to "real" UNIXes that it 

25doesn't need (or want!) to use PyWin32 -- so we only test for literal Win32 

26setups (vanilla Python, ActiveState etc) here. 

27 

28.. versionadded:: 1.0 

29""" 

30 

31if sys.platform == "win32": 

32 import msvcrt 

33 from ctypes import ( 

34 Structure, 

35 c_ushort, 

36 windll, 

37 POINTER, 

38 byref, 

39 ) 

40 from ctypes.wintypes import HANDLE, _COORD, _SMALL_RECT 

41else: 

42 import fcntl 

43 import struct 

44 import termios 

45 import tty 

46 

47 

48if sys.platform == "win32": 

49 

50 def _pty_size() -> Tuple[Optional[int], Optional[int]]: 

51 class CONSOLE_SCREEN_BUFFER_INFO(Structure): 

52 _fields_ = [ 

53 ("dwSize", _COORD), 

54 ("dwCursorPosition", _COORD), 

55 ("wAttributes", c_ushort), 

56 ("srWindow", _SMALL_RECT), 

57 ("dwMaximumWindowSize", _COORD), 

58 ] 

59 

60 GetStdHandle = windll.kernel32.GetStdHandle 

61 GetConsoleScreenBufferInfo = windll.kernel32.GetConsoleScreenBufferInfo 

62 GetStdHandle.restype = HANDLE 

63 GetConsoleScreenBufferInfo.argtypes = [ 

64 HANDLE, 

65 POINTER(CONSOLE_SCREEN_BUFFER_INFO), 

66 ] 

67 

68 hstd = GetStdHandle(-11) # STD_OUTPUT_HANDLE = -11 

69 csbi = CONSOLE_SCREEN_BUFFER_INFO() 

70 ret = GetConsoleScreenBufferInfo(hstd, byref(csbi)) 

71 

72 if ret: 

73 sizex = csbi.srWindow.Right - csbi.srWindow.Left + 1 

74 sizey = csbi.srWindow.Bottom - csbi.srWindow.Top + 1 

75 return sizex, sizey 

76 else: 

77 return (None, None) 

78 

79else: 

80 

81 def _pty_size() -> Tuple[Optional[int], Optional[int]]: 

82 """ 

83 Suitable for most POSIX platforms. 

84 

85 .. versionadded:: 1.0 

86 """ 

87 # Sentinel values to be replaced w/ defaults by caller 

88 size = (None, None) 

89 # We want two short unsigned integers (rows, cols) 

90 # Note: TIOCGWINSZ struct contains 4 unsigned shorts, 2 unused 

91 fmt = "HHHH" 

92 # Create an empty (zeroed) buffer for ioctl to map onto. Yay for C! 

93 buf = struct.pack(fmt, 0, 0, 0, 0) 

94 # Call TIOCGWINSZ to get window size of stdout, returns our filled 

95 # buffer 

96 try: 

97 result = fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf) 

98 # Unpack buffer back into Python data types 

99 # NOTE: this unpack gives us rows x cols, but we return the 

100 # inverse. 

101 rows, cols, *_ = struct.unpack(fmt, result) 

102 return (cols, rows) 

103 # Fallback to emptyish return value in various failure cases: 

104 # * sys.stdout being monkeypatched, such as in testing, and lacking 

105 # * .fileno 

106 # * sys.stdout having a .fileno but not actually being attached to a 

107 # * TTY 

108 # * termios not having a TIOCGWINSZ attribute (happens sometimes...) 

109 # * other situations where ioctl doesn't explode but the result isn't 

110 # something unpack can deal with 

111 except (struct.error, TypeError, IOError, AttributeError): 

112 pass 

113 return size 

114 

115 

116def pty_size() -> Tuple[int, int]: 

117 """ 

118 Determine current local pseudoterminal dimensions. 

119 

120 :returns: 

121 A ``(num_cols, num_rows)`` two-tuple describing PTY size. Defaults to 

122 ``(80, 24)`` if unable to get a sensible result dynamically. 

123 

124 .. versionadded:: 1.0 

125 """ 

126 cols, rows = _pty_size() 

127 # TODO: make defaults configurable? 

128 return (cols or 80, rows or 24) 

129 

130 

131def stdin_is_foregrounded_tty(stream: IO) -> bool: 

132 """ 

133 Detect if given stdin ``stream`` seems to be in the foreground of a TTY. 

134 

135 Specifically, compares the current Python process group ID to that of the 

136 stream's file descriptor to see if they match; if they do not match, it is 

137 likely that the process has been placed in the background. 

138 

139 This is used as a test to determine whether we should manipulate an active 

140 stdin so it runs in a character-buffered mode; touching the terminal in 

141 this way when the process is backgrounded, causes most shells to pause 

142 execution. 

143 

144 .. note:: 

145 Processes that aren't attached to a terminal to begin with, will always 

146 fail this test, as it starts with "do you have a real ``fileno``?". 

147 

148 .. versionadded:: 1.0 

149 """ 

150 if not has_fileno(stream): 

151 return False 

152 return os.getpgrp() == os.tcgetpgrp(stream.fileno()) 

153 

154 

155def cbreak_already_set(stream: IO) -> bool: 

156 # Explicitly not docstringed to remain private, for now. Eh. 

157 # Checks whether tty.setcbreak appears to have already been run against 

158 # ``stream`` (or if it would otherwise just not do anything). 

159 # Used to effect idempotency for character-buffering a stream, which also 

160 # lets us avoid multiple capture-then-restore cycles. 

161 attrs = termios.tcgetattr(stream) 

162 lflags, cc = attrs[3], attrs[6] 

163 echo = bool(lflags & termios.ECHO) 

164 icanon = bool(lflags & termios.ICANON) 

165 # setcbreak sets ECHO and ICANON to 0/off, CC[VMIN] to 1-ish, and CC[VTIME] 

166 # to 0-ish. If any of that is not true we can reasonably assume it has not 

167 # yet been executed against this stream. 

168 sentinels = ( 

169 not echo, 

170 not icanon, 

171 cc[termios.VMIN] in [1, b"\x01"], 

172 cc[termios.VTIME] in [0, b"\x00"], 

173 ) 

174 return all(sentinels) 

175 

176 

177@contextmanager 

178def character_buffered( 

179 stream: IO, 

180) -> Generator[None, None, None]: 

181 """ 

182 Force local terminal ``stream`` be character, not line, buffered. 

183 

184 Only applies to Unix-based systems; on Windows this is a no-op. 

185 

186 .. versionadded:: 1.0 

187 """ 

188 if ( 

189 WINDOWS 

190 or not isatty(stream) 

191 or not stdin_is_foregrounded_tty(stream) 

192 or cbreak_already_set(stream) 

193 ): 

194 yield 

195 else: 

196 old_settings = termios.tcgetattr(stream) 

197 tty.setcbreak(stream) 

198 try: 

199 yield 

200 finally: 

201 termios.tcsetattr(stream, termios.TCSADRAIN, old_settings) 

202 

203 

204def ready_for_reading(input_: IO) -> bool: 

205 """ 

206 Test ``input_`` to determine whether a read action will succeed. 

207 

208 :param input_: Input stream object (file-like). 

209 

210 :returns: ``True`` if a read should succeed, ``False`` otherwise. 

211 

212 .. versionadded:: 1.0 

213 """ 

214 # A "real" terminal stdin needs select/kbhit to tell us when it's ready for 

215 # a nonblocking read(). 

216 # Otherwise, assume a "safer" file-like object that can be read from in a 

217 # nonblocking fashion (e.g. a StringIO or regular file). 

218 if not has_fileno(input_): 

219 return True 

220 if sys.platform == "win32": 

221 return msvcrt.kbhit() 

222 else: 

223 reads, _, _ = select.select([input_], [], [], 0.0) 

224 return bool(reads and reads[0] is input_) 

225 

226 

227def bytes_to_read(input_: IO) -> int: 

228 """ 

229 Query stream ``input_`` to see how many bytes may be readable. 

230 

231 .. note:: 

232 If we are unable to tell (e.g. if ``input_`` isn't a true file 

233 descriptor or isn't a valid TTY) we fall back to suggesting reading 1 

234 byte only. 

235 

236 :param input: Input stream object (file-like). 

237 

238 :returns: `int` number of bytes to read. 

239 

240 .. versionadded:: 1.0 

241 """ 

242 # NOTE: we have to check both possibilities here; situations exist where 

243 # it's not a tty but has a fileno, or vice versa; neither is typically 

244 # going to work re: ioctl(). 

245 if not WINDOWS and isatty(input_) and has_fileno(input_): 

246 fionread = fcntl.ioctl(input_, termios.FIONREAD, b" ") 

247 return int(struct.unpack("h", fionread)[0]) 

248 return 1