1"""
2Utility functions surrounding terminal devices & I/O.
3
4Much of this code performs platform-sensitive branching, e.g. Windows support.
5
6This is its own module to abstract away what would otherwise be distracting
7logic-flow interruptions.
8"""
9
10from contextlib import contextmanager
11from typing import Generator, IO, Optional, Tuple
12import os
13import select
14import sys
15
16# TODO: move in here? They're currently platform-agnostic...
17from .util import has_fileno, isatty
18
19
20WINDOWS = sys.platform == "win32"
21"""
22Whether or not the current platform appears to be Windows in nature.
23
24Note that Cygwin's Python is actually close enough to "real" UNIXes that it
25doesn't need (or want!) to use PyWin32 -- so we only test for literal Win32
26setups (vanilla Python, ActiveState etc) here.
27
28.. versionadded:: 1.0
29"""
30
31if sys.platform == "win32":
32 import msvcrt
33 from ctypes import (
34 Structure,
35 c_ushort,
36 windll,
37 POINTER,
38 byref,
39 )
40 from ctypes.wintypes import HANDLE, _COORD, _SMALL_RECT
41else:
42 import fcntl
43 import struct
44 import termios
45 import tty
46
47
48if sys.platform == "win32":
49
50 def _pty_size() -> Tuple[Optional[int], Optional[int]]:
51 class CONSOLE_SCREEN_BUFFER_INFO(Structure):
52 _fields_ = [
53 ("dwSize", _COORD),
54 ("dwCursorPosition", _COORD),
55 ("wAttributes", c_ushort),
56 ("srWindow", _SMALL_RECT),
57 ("dwMaximumWindowSize", _COORD),
58 ]
59
60 GetStdHandle = windll.kernel32.GetStdHandle
61 GetConsoleScreenBufferInfo = windll.kernel32.GetConsoleScreenBufferInfo
62 GetStdHandle.restype = HANDLE
63 GetConsoleScreenBufferInfo.argtypes = [
64 HANDLE,
65 POINTER(CONSOLE_SCREEN_BUFFER_INFO),
66 ]
67
68 hstd = GetStdHandle(-11) # STD_OUTPUT_HANDLE = -11
69 csbi = CONSOLE_SCREEN_BUFFER_INFO()
70 ret = GetConsoleScreenBufferInfo(hstd, byref(csbi))
71
72 if ret:
73 sizex = csbi.srWindow.Right - csbi.srWindow.Left + 1
74 sizey = csbi.srWindow.Bottom - csbi.srWindow.Top + 1
75 return sizex, sizey
76 else:
77 return (None, None)
78
79else:
80
81 def _pty_size() -> Tuple[Optional[int], Optional[int]]:
82 """
83 Suitable for most POSIX platforms.
84
85 .. versionadded:: 1.0
86 """
87 # Sentinel values to be replaced w/ defaults by caller
88 size = (None, None)
89 # We want two short unsigned integers (rows, cols)
90 fmt = "HH"
91 # Create an empty (zeroed) buffer for ioctl to map onto. Yay for C!
92 buf = struct.pack(fmt, 0, 0)
93 # Call TIOCGWINSZ to get window size of stdout, returns our filled
94 # buffer
95 try:
96 result = fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
97 # Unpack buffer back into Python data types
98 # NOTE: this unpack gives us rows x cols, but we return the
99 # inverse.
100 rows, cols = struct.unpack(fmt, result)
101 return (cols, rows)
102 # Fallback to emptyish return value in various failure cases:
103 # * sys.stdout being monkeypatched, such as in testing, and lacking
104 # * .fileno
105 # * sys.stdout having a .fileno but not actually being attached to a
106 # * TTY
107 # * termios not having a TIOCGWINSZ attribute (happens sometimes...)
108 # * other situations where ioctl doesn't explode but the result isn't
109 # something unpack can deal with
110 except (struct.error, TypeError, IOError, AttributeError):
111 pass
112 return size
113
114
115def pty_size() -> Tuple[int, int]:
116 """
117 Determine current local pseudoterminal dimensions.
118
119 :returns:
120 A ``(num_cols, num_rows)`` two-tuple describing PTY size. Defaults to
121 ``(80, 24)`` if unable to get a sensible result dynamically.
122
123 .. versionadded:: 1.0
124 """
125 cols, rows = _pty_size()
126 # TODO: make defaults configurable?
127 return (cols or 80, rows or 24)
128
129
130def stdin_is_foregrounded_tty(stream: IO) -> bool:
131 """
132 Detect if given stdin ``stream`` seems to be in the foreground of a TTY.
133
134 Specifically, compares the current Python process group ID to that of the
135 stream's file descriptor to see if they match; if they do not match, it is
136 likely that the process has been placed in the background.
137
138 This is used as a test to determine whether we should manipulate an active
139 stdin so it runs in a character-buffered mode; touching the terminal in
140 this way when the process is backgrounded, causes most shells to pause
141 execution.
142
143 .. note::
144 Processes that aren't attached to a terminal to begin with, will always
145 fail this test, as it starts with "do you have a real ``fileno``?".
146
147 .. versionadded:: 1.0
148 """
149 if not has_fileno(stream):
150 return False
151 return os.getpgrp() == os.tcgetpgrp(stream.fileno())
152
153
154def cbreak_already_set(stream: IO) -> bool:
155 # Explicitly not docstringed to remain private, for now. Eh.
156 # Checks whether tty.setcbreak appears to have already been run against
157 # ``stream`` (or if it would otherwise just not do anything).
158 # Used to effect idempotency for character-buffering a stream, which also
159 # lets us avoid multiple capture-then-restore cycles.
160 attrs = termios.tcgetattr(stream)
161 lflags, cc = attrs[3], attrs[6]
162 echo = bool(lflags & termios.ECHO)
163 icanon = bool(lflags & termios.ICANON)
164 # setcbreak sets ECHO and ICANON to 0/off, CC[VMIN] to 1-ish, and CC[VTIME]
165 # to 0-ish. If any of that is not true we can reasonably assume it has not
166 # yet been executed against this stream.
167 sentinels = (
168 not echo,
169 not icanon,
170 cc[termios.VMIN] in [1, b"\x01"],
171 cc[termios.VTIME] in [0, b"\x00"],
172 )
173 return all(sentinels)
174
175
176@contextmanager
177def character_buffered(
178 stream: IO,
179) -> Generator[None, None, None]:
180 """
181 Force local terminal ``stream`` be character, not line, buffered.
182
183 Only applies to Unix-based systems; on Windows this is a no-op.
184
185 .. versionadded:: 1.0
186 """
187 if (
188 WINDOWS
189 or not isatty(stream)
190 or not stdin_is_foregrounded_tty(stream)
191 or cbreak_already_set(stream)
192 ):
193 yield
194 else:
195 old_settings = termios.tcgetattr(stream)
196 tty.setcbreak(stream)
197 try:
198 yield
199 finally:
200 termios.tcsetattr(stream, termios.TCSADRAIN, old_settings)
201
202
203def ready_for_reading(input_: IO) -> bool:
204 """
205 Test ``input_`` to determine whether a read action will succeed.
206
207 :param input_: Input stream object (file-like).
208
209 :returns: ``True`` if a read should succeed, ``False`` otherwise.
210
211 .. versionadded:: 1.0
212 """
213 # A "real" terminal stdin needs select/kbhit to tell us when it's ready for
214 # a nonblocking read().
215 # Otherwise, assume a "safer" file-like object that can be read from in a
216 # nonblocking fashion (e.g. a StringIO or regular file).
217 if not has_fileno(input_):
218 return True
219 if sys.platform == "win32":
220 return msvcrt.kbhit()
221 else:
222 reads, _, _ = select.select([input_], [], [], 0.0)
223 return bool(reads and reads[0] is input_)
224
225
226def bytes_to_read(input_: IO) -> int:
227 """
228 Query stream ``input_`` to see how many bytes may be readable.
229
230 .. note::
231 If we are unable to tell (e.g. if ``input_`` isn't a true file
232 descriptor or isn't a valid TTY) we fall back to suggesting reading 1
233 byte only.
234
235 :param input: Input stream object (file-like).
236
237 :returns: `int` number of bytes to read.
238
239 .. versionadded:: 1.0
240 """
241 # NOTE: we have to check both possibilities here; situations exist where
242 # it's not a tty but has a fileno, or vice versa; neither is typically
243 # going to work re: ioctl().
244 if not WINDOWS and isatty(input_) and has_fileno(input_):
245 fionread = fcntl.ioctl(input_, termios.FIONREAD, b" ")
246 return int(struct.unpack("h", fionread)[0])
247 return 1