Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tqdm/utils.py: 55%
148 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:51 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:51 +0000
1"""
2General helpers required for `tqdm.std`.
3"""
4import os
5import re
6import sys
7from functools import wraps
8# TODO consider using wcswidth third-party package for 0-width characters
9from unicodedata import east_asian_width
10from warnings import warn
11from weakref import proxy
13_range, _unich, _unicode, _basestring = range, chr, str, str
14CUR_OS = sys.platform
15IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin'])
16IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin'])
17RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]")
19try:
20 if IS_WIN:
21 import colorama
22 else:
23 raise ImportError
24except ImportError:
25 colorama = None
26else:
27 try:
28 colorama.init(strip=False)
29 except TypeError:
30 colorama.init()
33class FormatReplace(object):
34 """
35 >>> a = FormatReplace('something')
36 >>> "{:5d}".format(a)
37 'something'
38 """ # NOQA: P102
39 def __init__(self, replace=''):
40 self.replace = replace
41 self.format_called = 0
43 def __format__(self, _):
44 self.format_called += 1
45 return self.replace
48class Comparable(object):
49 """Assumes child has self._comparable attr/@property"""
50 def __lt__(self, other):
51 return self._comparable < other._comparable
53 def __le__(self, other):
54 return (self < other) or (self == other)
56 def __eq__(self, other):
57 return self._comparable == other._comparable
59 def __ne__(self, other):
60 return not self == other
62 def __gt__(self, other):
63 return not self <= other
65 def __ge__(self, other):
66 return not self < other
69class ObjectWrapper(object):
70 def __getattr__(self, name):
71 return getattr(self._wrapped, name)
73 def __setattr__(self, name, value):
74 return setattr(self._wrapped, name, value)
76 def wrapper_getattr(self, name):
77 """Actual `self.getattr` rather than self._wrapped.getattr"""
78 try:
79 return object.__getattr__(self, name)
80 except AttributeError: # py2
81 return getattr(self, name)
83 def wrapper_setattr(self, name, value):
84 """Actual `self.setattr` rather than self._wrapped.setattr"""
85 return object.__setattr__(self, name, value)
87 def __init__(self, wrapped):
88 """
89 Thin wrapper around a given object
90 """
91 self.wrapper_setattr('_wrapped', wrapped)
94class SimpleTextIOWrapper(ObjectWrapper):
95 """
96 Change only `.write()` of the wrapped object by encoding the passed
97 value and passing the result to the wrapped object's `.write()` method.
98 """
99 # pylint: disable=too-few-public-methods
100 def __init__(self, wrapped, encoding):
101 super(SimpleTextIOWrapper, self).__init__(wrapped)
102 self.wrapper_setattr('encoding', encoding)
104 def write(self, s):
105 """
106 Encode `s` and pass to the wrapped object's `.write()` method.
107 """
108 return self._wrapped.write(s.encode(self.wrapper_getattr('encoding')))
110 def __eq__(self, other):
111 return self._wrapped == getattr(other, '_wrapped', other)
114class DisableOnWriteError(ObjectWrapper):
115 """
116 Disable the given `tqdm_instance` upon `write()` or `flush()` errors.
117 """
118 @staticmethod
119 def disable_on_exception(tqdm_instance, func):
120 """
121 Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`.
122 """
123 tqdm_instance = proxy(tqdm_instance)
125 def inner(*args, **kwargs):
126 try:
127 return func(*args, **kwargs)
128 except OSError as e:
129 if e.errno != 5:
130 raise
131 try:
132 tqdm_instance.miniters = float('inf')
133 except ReferenceError:
134 pass
135 except ValueError as e:
136 if 'closed' not in str(e):
137 raise
138 try:
139 tqdm_instance.miniters = float('inf')
140 except ReferenceError:
141 pass
142 return inner
144 def __init__(self, wrapped, tqdm_instance):
145 super(DisableOnWriteError, self).__init__(wrapped)
146 if hasattr(wrapped, 'write'):
147 self.wrapper_setattr(
148 'write', self.disable_on_exception(tqdm_instance, wrapped.write))
149 if hasattr(wrapped, 'flush'):
150 self.wrapper_setattr(
151 'flush', self.disable_on_exception(tqdm_instance, wrapped.flush))
153 def __eq__(self, other):
154 return self._wrapped == getattr(other, '_wrapped', other)
157class CallbackIOWrapper(ObjectWrapper):
158 def __init__(self, callback, stream, method="read"):
159 """
160 Wrap a given `file`-like object's `read()` or `write()` to report
161 lengths to the given `callback`
162 """
163 super(CallbackIOWrapper, self).__init__(stream)
164 func = getattr(stream, method)
165 if method == "write":
166 @wraps(func)
167 def write(data, *args, **kwargs):
168 res = func(data, *args, **kwargs)
169 callback(len(data))
170 return res
171 self.wrapper_setattr('write', write)
172 elif method == "read":
173 @wraps(func)
174 def read(*args, **kwargs):
175 data = func(*args, **kwargs)
176 callback(len(data))
177 return data
178 self.wrapper_setattr('read', read)
179 else:
180 raise KeyError("Can only wrap read/write methods")
183def _is_utf(encoding):
184 try:
185 u'\u2588\u2589'.encode(encoding)
186 except UnicodeEncodeError:
187 return False
188 except Exception:
189 try:
190 return encoding.lower().startswith('utf-') or ('U8' == encoding)
191 except Exception:
192 return False
193 else:
194 return True
197def _supports_unicode(fp):
198 try:
199 return _is_utf(fp.encoding)
200 except AttributeError:
201 return False
204def _is_ascii(s):
205 if isinstance(s, str):
206 for c in s:
207 if ord(c) > 255:
208 return False
209 return True
210 return _supports_unicode(s)
213def _screen_shape_wrapper(): # pragma: no cover
214 """
215 Return a function which returns console dimensions (width, height).
216 Supported: linux, osx, windows, cygwin.
217 """
218 _screen_shape = None
219 if IS_WIN:
220 _screen_shape = _screen_shape_windows
221 if _screen_shape is None:
222 _screen_shape = _screen_shape_tput
223 if IS_NIX:
224 _screen_shape = _screen_shape_linux
225 return _screen_shape
228def _screen_shape_windows(fp): # pragma: no cover
229 try:
230 import struct
231 from ctypes import create_string_buffer, windll
232 from sys import stdin, stdout
234 io_handle = -12 # assume stderr
235 if fp == stdin:
236 io_handle = -10
237 elif fp == stdout:
238 io_handle = -11
240 h = windll.kernel32.GetStdHandle(io_handle)
241 csbi = create_string_buffer(22)
242 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
243 if res:
244 (_bufx, _bufy, _curx, _cury, _wattr, left, top, right, bottom,
245 _maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)
246 return right - left, bottom - top # +1
247 except Exception: # nosec
248 pass
249 return None, None
252def _screen_shape_tput(*_): # pragma: no cover
253 """cygwin xterm (windows)"""
254 try:
255 import shlex
256 from subprocess import check_call # nosec
257 return [int(check_call(shlex.split('tput ' + i))) - 1
258 for i in ('cols', 'lines')]
259 except Exception: # nosec
260 pass
261 return None, None
264def _screen_shape_linux(fp): # pragma: no cover
266 try:
267 from array import array
268 from fcntl import ioctl
269 from termios import TIOCGWINSZ
270 except ImportError:
271 return None, None
272 else:
273 try:
274 rows, cols = array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[:2]
275 return cols, rows
276 except Exception:
277 try:
278 return [int(os.environ[i]) - 1 for i in ("COLUMNS", "LINES")]
279 except (KeyError, ValueError):
280 return None, None
283def _environ_cols_wrapper(): # pragma: no cover
284 """
285 Return a function which returns console width.
286 Supported: linux, osx, windows, cygwin.
287 """
288 warn("Use `_screen_shape_wrapper()(file)[0]` instead of"
289 " `_environ_cols_wrapper()(file)`", DeprecationWarning, stacklevel=2)
290 shape = _screen_shape_wrapper()
291 if not shape:
292 return None
294 @wraps(shape)
295 def inner(fp):
296 return shape(fp)[0]
298 return inner
301def _term_move_up(): # pragma: no cover
302 return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A'
305def _text_width(s):
306 return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in str(s))
309def disp_len(data):
310 """
311 Returns the real on-screen length of a string which may contain
312 ANSI control codes and wide chars.
313 """
314 return _text_width(RE_ANSI.sub('', data))
317def disp_trim(data, length):
318 """
319 Trim a string which may contain ANSI control characters.
320 """
321 if len(data) == disp_len(data):
322 return data[:length]
324 ansi_present = bool(RE_ANSI.search(data))
325 while disp_len(data) > length: # carefully delete one char at a time
326 data = data[:-1]
327 if ansi_present and bool(RE_ANSI.search(data)):
328 # assume ANSI reset is required
329 return data if data.endswith("\033[0m") else data + "\033[0m"
330 return data