Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/invoke/terminals.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

79 statements  

1""" 

2Utility functions surrounding terminal devices & I/O. 

3 

4Much of this code performs platform-sensitive branching, e.g. Windows support. 

5 

6This is its own module to abstract away what would otherwise be distracting 

7logic-flow interruptions. 

8""" 

9 

10from contextlib import contextmanager 

11from typing import Generator, IO, Optional, Tuple 

12import os 

13import select 

14import sys 

15 

16# TODO: move in here? They're currently platform-agnostic... 

17from .util import has_fileno, isatty 

18 

19 

20WINDOWS = sys.platform == "win32" 

21""" 

22Whether or not the current platform appears to be Windows in nature. 

23 

24Note that Cygwin's Python is actually close enough to "real" UNIXes that it 

25doesn't need (or want!) to use PyWin32 -- so we only test for literal Win32 

26setups (vanilla Python, ActiveState etc) here. 

27 

28.. versionadded:: 1.0 

29""" 

30 

31if sys.platform == "win32": 

32 import msvcrt 

33 from ctypes import ( 

34 Structure, 

35 c_ushort, 

36 windll, 

37 POINTER, 

38 byref, 

39 ) 

40 from ctypes.wintypes import HANDLE, _COORD, _SMALL_RECT 

41else: 

42 import fcntl 

43 import struct 

44 import termios 

45 import tty 

46 

47 

48if sys.platform == "win32": 

49 

50 def _pty_size() -> Tuple[Optional[int], Optional[int]]: 

51 class CONSOLE_SCREEN_BUFFER_INFO(Structure): 

52 _fields_ = [ 

53 ("dwSize", _COORD), 

54 ("dwCursorPosition", _COORD), 

55 ("wAttributes", c_ushort), 

56 ("srWindow", _SMALL_RECT), 

57 ("dwMaximumWindowSize", _COORD), 

58 ] 

59 

60 GetStdHandle = windll.kernel32.GetStdHandle 

61 GetConsoleScreenBufferInfo = windll.kernel32.GetConsoleScreenBufferInfo 

62 GetStdHandle.restype = HANDLE 

63 GetConsoleScreenBufferInfo.argtypes = [ 

64 HANDLE, 

65 POINTER(CONSOLE_SCREEN_BUFFER_INFO), 

66 ] 

67 

68 hstd = GetStdHandle(-11) # STD_OUTPUT_HANDLE = -11 

69 csbi = CONSOLE_SCREEN_BUFFER_INFO() 

70 ret = GetConsoleScreenBufferInfo(hstd, byref(csbi)) 

71 

72 if ret: 

73 sizex = csbi.srWindow.Right - csbi.srWindow.Left + 1 

74 sizey = csbi.srWindow.Bottom - csbi.srWindow.Top + 1 

75 return sizex, sizey 

76 else: 

77 return (None, None) 

78 

79else: 

80 

81 def _pty_size() -> Tuple[Optional[int], Optional[int]]: 

82 """ 

83 Suitable for most POSIX platforms. 

84 

85 .. versionadded:: 1.0 

86 """ 

87 # Sentinel values to be replaced w/ defaults by caller 

88 size = (None, None) 

89 # We want two short unsigned integers (rows, cols) 

90 fmt = "HH" 

91 # Create an empty (zeroed) buffer for ioctl to map onto. Yay for C! 

92 buf = struct.pack(fmt, 0, 0) 

93 # Call TIOCGWINSZ to get window size of stdout, returns our filled 

94 # buffer 

95 try: 

96 result = fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf) 

97 # Unpack buffer back into Python data types 

98 # NOTE: this unpack gives us rows x cols, but we return the 

99 # inverse. 

100 rows, cols = struct.unpack(fmt, result) 

101 return (cols, rows) 

102 # Fallback to emptyish return value in various failure cases: 

103 # * sys.stdout being monkeypatched, such as in testing, and lacking 

104 # * .fileno 

105 # * sys.stdout having a .fileno but not actually being attached to a 

106 # * TTY 

107 # * termios not having a TIOCGWINSZ attribute (happens sometimes...) 

108 # * other situations where ioctl doesn't explode but the result isn't 

109 # something unpack can deal with 

110 except (struct.error, TypeError, IOError, AttributeError): 

111 pass 

112 return size 

113 

114 

115def pty_size() -> Tuple[int, int]: 

116 """ 

117 Determine current local pseudoterminal dimensions. 

118 

119 :returns: 

120 A ``(num_cols, num_rows)`` two-tuple describing PTY size. Defaults to 

121 ``(80, 24)`` if unable to get a sensible result dynamically. 

122 

123 .. versionadded:: 1.0 

124 """ 

125 cols, rows = _pty_size() 

126 # TODO: make defaults configurable? 

127 return (cols or 80, rows or 24) 

128 

129 

130def stdin_is_foregrounded_tty(stream: IO) -> bool: 

131 """ 

132 Detect if given stdin ``stream`` seems to be in the foreground of a TTY. 

133 

134 Specifically, compares the current Python process group ID to that of the 

135 stream's file descriptor to see if they match; if they do not match, it is 

136 likely that the process has been placed in the background. 

137 

138 This is used as a test to determine whether we should manipulate an active 

139 stdin so it runs in a character-buffered mode; touching the terminal in 

140 this way when the process is backgrounded, causes most shells to pause 

141 execution. 

142 

143 .. note:: 

144 Processes that aren't attached to a terminal to begin with, will always 

145 fail this test, as it starts with "do you have a real ``fileno``?". 

146 

147 .. versionadded:: 1.0 

148 """ 

149 if not has_fileno(stream): 

150 return False 

151 return os.getpgrp() == os.tcgetpgrp(stream.fileno()) 

152 

153 

154def cbreak_already_set(stream: IO) -> bool: 

155 # Explicitly not docstringed to remain private, for now. Eh. 

156 # Checks whether tty.setcbreak appears to have already been run against 

157 # ``stream`` (or if it would otherwise just not do anything). 

158 # Used to effect idempotency for character-buffering a stream, which also 

159 # lets us avoid multiple capture-then-restore cycles. 

160 attrs = termios.tcgetattr(stream) 

161 lflags, cc = attrs[3], attrs[6] 

162 echo = bool(lflags & termios.ECHO) 

163 icanon = bool(lflags & termios.ICANON) 

164 # setcbreak sets ECHO and ICANON to 0/off, CC[VMIN] to 1-ish, and CC[VTIME] 

165 # to 0-ish. If any of that is not true we can reasonably assume it has not 

166 # yet been executed against this stream. 

167 sentinels = ( 

168 not echo, 

169 not icanon, 

170 cc[termios.VMIN] in [1, b"\x01"], 

171 cc[termios.VTIME] in [0, b"\x00"], 

172 ) 

173 return all(sentinels) 

174 

175 

176@contextmanager 

177def character_buffered( 

178 stream: IO, 

179) -> Generator[None, None, None]: 

180 """ 

181 Force local terminal ``stream`` be character, not line, buffered. 

182 

183 Only applies to Unix-based systems; on Windows this is a no-op. 

184 

185 .. versionadded:: 1.0 

186 """ 

187 if ( 

188 WINDOWS 

189 or not isatty(stream) 

190 or not stdin_is_foregrounded_tty(stream) 

191 or cbreak_already_set(stream) 

192 ): 

193 yield 

194 else: 

195 old_settings = termios.tcgetattr(stream) 

196 tty.setcbreak(stream) 

197 try: 

198 yield 

199 finally: 

200 termios.tcsetattr(stream, termios.TCSADRAIN, old_settings) 

201 

202 

203def ready_for_reading(input_: IO) -> bool: 

204 """ 

205 Test ``input_`` to determine whether a read action will succeed. 

206 

207 :param input_: Input stream object (file-like). 

208 

209 :returns: ``True`` if a read should succeed, ``False`` otherwise. 

210 

211 .. versionadded:: 1.0 

212 """ 

213 # A "real" terminal stdin needs select/kbhit to tell us when it's ready for 

214 # a nonblocking read(). 

215 # Otherwise, assume a "safer" file-like object that can be read from in a 

216 # nonblocking fashion (e.g. a StringIO or regular file). 

217 if not has_fileno(input_): 

218 return True 

219 if sys.platform == "win32": 

220 return msvcrt.kbhit() 

221 else: 

222 reads, _, _ = select.select([input_], [], [], 0.0) 

223 return bool(reads and reads[0] is input_) 

224 

225 

226def bytes_to_read(input_: IO) -> int: 

227 """ 

228 Query stream ``input_`` to see how many bytes may be readable. 

229 

230 .. note:: 

231 If we are unable to tell (e.g. if ``input_`` isn't a true file 

232 descriptor or isn't a valid TTY) we fall back to suggesting reading 1 

233 byte only. 

234 

235 :param input: Input stream object (file-like). 

236 

237 :returns: `int` number of bytes to read. 

238 

239 .. versionadded:: 1.0 

240 """ 

241 # NOTE: we have to check both possibilities here; situations exist where 

242 # it's not a tty but has a fileno, or vice versa; neither is typically 

243 # going to work re: ioctl(). 

244 if not WINDOWS and isatty(input_) and has_fileno(input_): 

245 fionread = fcntl.ioctl(input_, termios.FIONREAD, b" ") 

246 return int(struct.unpack("h", fionread)[0]) 

247 return 1