1"""
2Utility functions surrounding terminal devices & I/O.
3
4Much of this code performs platform-sensitive branching, e.g. Windows support.
5
6This is its own module to abstract away what would otherwise be distracting
7logic-flow interruptions.
8"""
9
10from contextlib import contextmanager
11from typing import Generator, IO, Optional, Tuple
12import os
13import select
14import sys
15
16# TODO: move in here? They're currently platform-agnostic...
17from .util import has_fileno, isatty
18
19
20WINDOWS = sys.platform == "win32"
21"""
22Whether or not the current platform appears to be Windows in nature.
23
24Note that Cygwin's Python is actually close enough to "real" UNIXes that it
25doesn't need (or want!) to use PyWin32 -- so we only test for literal Win32
26setups (vanilla Python, ActiveState etc) here.
27
28.. versionadded:: 1.0
29"""
30
31if sys.platform == "win32":
32 import msvcrt
33 from ctypes import (
34 Structure,
35 c_ushort,
36 windll,
37 POINTER,
38 byref,
39 )
40 from ctypes.wintypes import HANDLE, _COORD, _SMALL_RECT
41else:
42 import fcntl
43 import struct
44 import termios
45 import tty
46
47
48if sys.platform == "win32":
49
50 def _pty_size() -> Tuple[Optional[int], Optional[int]]:
51 class CONSOLE_SCREEN_BUFFER_INFO(Structure):
52 _fields_ = [
53 ("dwSize", _COORD),
54 ("dwCursorPosition", _COORD),
55 ("wAttributes", c_ushort),
56 ("srWindow", _SMALL_RECT),
57 ("dwMaximumWindowSize", _COORD),
58 ]
59
60 GetStdHandle = windll.kernel32.GetStdHandle
61 GetConsoleScreenBufferInfo = windll.kernel32.GetConsoleScreenBufferInfo
62 GetStdHandle.restype = HANDLE
63 GetConsoleScreenBufferInfo.argtypes = [
64 HANDLE,
65 POINTER(CONSOLE_SCREEN_BUFFER_INFO),
66 ]
67
68 hstd = GetStdHandle(-11) # STD_OUTPUT_HANDLE = -11
69 csbi = CONSOLE_SCREEN_BUFFER_INFO()
70 ret = GetConsoleScreenBufferInfo(hstd, byref(csbi))
71
72 if ret:
73 sizex = csbi.srWindow.Right - csbi.srWindow.Left + 1
74 sizey = csbi.srWindow.Bottom - csbi.srWindow.Top + 1
75 return sizex, sizey
76 else:
77 return (None, None)
78
79else:
80
81 def _pty_size() -> Tuple[Optional[int], Optional[int]]:
82 """
83 Suitable for most POSIX platforms.
84
85 .. versionadded:: 1.0
86 """
87 # Sentinel values to be replaced w/ defaults by caller
88 size = (None, None)
89 # We want two short unsigned integers (rows, cols)
90 # Note: TIOCGWINSZ struct contains 4 unsigned shorts, 2 unused
91 fmt = "HHHH"
92 # Create an empty (zeroed) buffer for ioctl to map onto. Yay for C!
93 buf = struct.pack(fmt, 0, 0, 0, 0)
94 # Call TIOCGWINSZ to get window size of stdout, returns our filled
95 # buffer
96 try:
97 result = fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
98 # Unpack buffer back into Python data types
99 # NOTE: this unpack gives us rows x cols, but we return the
100 # inverse.
101 rows, cols, *_ = struct.unpack(fmt, result)
102 return (cols, rows)
103 # Fallback to emptyish return value in various failure cases:
104 # * sys.stdout being monkeypatched, such as in testing, and lacking
105 # * .fileno
106 # * sys.stdout having a .fileno but not actually being attached to a
107 # * TTY
108 # * termios not having a TIOCGWINSZ attribute (happens sometimes...)
109 # * other situations where ioctl doesn't explode but the result isn't
110 # something unpack can deal with
111 except (struct.error, TypeError, IOError, AttributeError):
112 pass
113 return size
114
115
116def pty_size() -> Tuple[int, int]:
117 """
118 Determine current local pseudoterminal dimensions.
119
120 :returns:
121 A ``(num_cols, num_rows)`` two-tuple describing PTY size. Defaults to
122 ``(80, 24)`` if unable to get a sensible result dynamically.
123
124 .. versionadded:: 1.0
125 """
126 cols, rows = _pty_size()
127 # TODO: make defaults configurable?
128 return (cols or 80, rows or 24)
129
130
131def stdin_is_foregrounded_tty(stream: IO) -> bool:
132 """
133 Detect if given stdin ``stream`` seems to be in the foreground of a TTY.
134
135 Specifically, compares the current Python process group ID to that of the
136 stream's file descriptor to see if they match; if they do not match, it is
137 likely that the process has been placed in the background.
138
139 This is used as a test to determine whether we should manipulate an active
140 stdin so it runs in a character-buffered mode; touching the terminal in
141 this way when the process is backgrounded, causes most shells to pause
142 execution.
143
144 .. note::
145 Processes that aren't attached to a terminal to begin with, will always
146 fail this test, as it starts with "do you have a real ``fileno``?".
147
148 .. versionadded:: 1.0
149 """
150 if not has_fileno(stream):
151 return False
152 return os.getpgrp() == os.tcgetpgrp(stream.fileno())
153
154
155def cbreak_already_set(stream: IO) -> bool:
156 # Explicitly not docstringed to remain private, for now. Eh.
157 # Checks whether tty.setcbreak appears to have already been run against
158 # ``stream`` (or if it would otherwise just not do anything).
159 # Used to effect idempotency for character-buffering a stream, which also
160 # lets us avoid multiple capture-then-restore cycles.
161 attrs = termios.tcgetattr(stream)
162 lflags, cc = attrs[3], attrs[6]
163 echo = bool(lflags & termios.ECHO)
164 icanon = bool(lflags & termios.ICANON)
165 # setcbreak sets ECHO and ICANON to 0/off, CC[VMIN] to 1-ish, and CC[VTIME]
166 # to 0-ish. If any of that is not true we can reasonably assume it has not
167 # yet been executed against this stream.
168 sentinels = (
169 not echo,
170 not icanon,
171 cc[termios.VMIN] in [1, b"\x01"],
172 cc[termios.VTIME] in [0, b"\x00"],
173 )
174 return all(sentinels)
175
176
177@contextmanager
178def character_buffered(
179 stream: IO,
180) -> Generator[None, None, None]:
181 """
182 Force local terminal ``stream`` be character, not line, buffered.
183
184 Only applies to Unix-based systems; on Windows this is a no-op.
185
186 .. versionadded:: 1.0
187 """
188 if (
189 WINDOWS
190 or not isatty(stream)
191 or not stdin_is_foregrounded_tty(stream)
192 or cbreak_already_set(stream)
193 ):
194 yield
195 else:
196 old_settings = termios.tcgetattr(stream)
197 tty.setcbreak(stream)
198 try:
199 yield
200 finally:
201 termios.tcsetattr(stream, termios.TCSADRAIN, old_settings)
202
203
204def ready_for_reading(input_: IO) -> bool:
205 """
206 Test ``input_`` to determine whether a read action will succeed.
207
208 :param input_: Input stream object (file-like).
209
210 :returns: ``True`` if a read should succeed, ``False`` otherwise.
211
212 .. versionadded:: 1.0
213 """
214 # A "real" terminal stdin needs select/kbhit to tell us when it's ready for
215 # a nonblocking read().
216 # Otherwise, assume a "safer" file-like object that can be read from in a
217 # nonblocking fashion (e.g. a StringIO or regular file).
218 if not has_fileno(input_):
219 return True
220 if sys.platform == "win32":
221 return msvcrt.kbhit()
222 else:
223 reads, _, _ = select.select([input_], [], [], 0.0)
224 return bool(reads and reads[0] is input_)
225
226
227def bytes_to_read(input_: IO) -> int:
228 """
229 Query stream ``input_`` to see how many bytes may be readable.
230
231 .. note::
232 If we are unable to tell (e.g. if ``input_`` isn't a true file
233 descriptor or isn't a valid TTY) we fall back to suggesting reading 1
234 byte only.
235
236 :param input: Input stream object (file-like).
237
238 :returns: `int` number of bytes to read.
239
240 .. versionadded:: 1.0
241 """
242 # NOTE: we have to check both possibilities here; situations exist where
243 # it's not a tty but has a fileno, or vice versa; neither is typically
244 # going to work re: ioctl().
245 if not WINDOWS and isatty(input_) and has_fileno(input_):
246 fionread = fcntl.ioctl(input_, termios.FIONREAD, b" ")
247 return int(struct.unpack("h", fionread)[0])
248 return 1