Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tqdm/utils.py: 35%
175 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 06:35 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 06:35 +0000
1"""
2General helpers required for `tqdm.std`.
3"""
4import os
5import re
6import sys
7from functools import partial, partialmethod, wraps
8from inspect import signature
9# TODO consider using wcswidth third-party package for 0-width characters
10from unicodedata import east_asian_width
11from warnings import warn
12from weakref import proxy
14_range, _unich, _unicode, _basestring = range, chr, str, str
15CUR_OS = sys.platform
16IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin'])
17IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin'])
18RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]")
20try:
21 if IS_WIN:
22 import colorama
23 else:
24 raise ImportError
25except ImportError:
26 colorama = None
27else:
28 try:
29 colorama.init(strip=False)
30 except TypeError:
31 colorama.init()
34def envwrap(prefix, types=None, is_method=False):
35 """
36 Override parameter defaults via `os.environ[prefix + param_name]`.
37 Maps UPPER_CASE env vars map to lower_case param names.
38 camelCase isn't supported (because Windows ignores case).
40 Precedence (highest first):
41 - call (`foo(a=3)`)
42 - environ (`FOO_A=2`)
43 - signature (`def foo(a=1)`)
45 Parameters
46 ----------
47 prefix : str
48 Env var prefix, e.g. "FOO_"
49 types : dict, optional
50 Fallback mappings `{'param_name': type, ...}` if types cannot be
51 inferred from function signature.
52 Consider using `types=collections.defaultdict(lambda: ast.literal_eval)`.
53 is_method : bool, optional
54 Whether to use `functools.partialmethod`. If (default: False) use `functools.partial`.
56 Examples
57 --------
58 ```
59 $ cat foo.py
60 from tqdm.utils import envwrap
61 @envwrap("FOO_")
62 def test(a=1, b=2, c=3):
63 print(f"received: a={a}, b={b}, c={c}")
65 $ FOO_A=42 FOO_C=1337 python -c 'import foo; foo.test(c=99)'
66 received: a=42, b=2, c=99
67 ```
68 """
69 if types is None:
70 types = {}
71 i = len(prefix)
72 env_overrides = {k[i:].lower(): v for k, v in os.environ.items() if k.startswith(prefix)}
73 part = partialmethod if is_method else partial
75 def wrap(func):
76 params = signature(func).parameters
77 # ignore unknown env vars
78 overrides = {k: v for k, v in env_overrides.items() if k in params}
79 # infer overrides' `type`s
80 for k in overrides:
81 param = params[k]
82 if param.annotation is not param.empty: # typehints
83 for typ in getattr(param.annotation, '__args__', (param.annotation,)):
84 try:
85 overrides[k] = typ(overrides[k])
86 except Exception:
87 pass
88 else:
89 break
90 elif param.default is not None: # type of default value
91 overrides[k] = type(param.default)(overrides[k])
92 else:
93 try: # `types` fallback
94 overrides[k] = types[k](overrides[k])
95 except KeyError: # keep unconverted (`str`)
96 pass
97 return part(func, **overrides)
98 return wrap
101class FormatReplace(object):
102 """
103 >>> a = FormatReplace('something')
104 >>> "{:5d}".format(a)
105 'something'
106 """ # NOQA: P102
107 def __init__(self, replace=''):
108 self.replace = replace
109 self.format_called = 0
111 def __format__(self, _):
112 self.format_called += 1
113 return self.replace
116class Comparable(object):
117 """Assumes child has self._comparable attr/@property"""
118 def __lt__(self, other):
119 return self._comparable < other._comparable
121 def __le__(self, other):
122 return (self < other) or (self == other)
124 def __eq__(self, other):
125 return self._comparable == other._comparable
127 def __ne__(self, other):
128 return not self == other
130 def __gt__(self, other):
131 return not self <= other
133 def __ge__(self, other):
134 return not self < other
137class ObjectWrapper(object):
138 def __getattr__(self, name):
139 return getattr(self._wrapped, name)
141 def __setattr__(self, name, value):
142 return setattr(self._wrapped, name, value)
144 def wrapper_getattr(self, name):
145 """Actual `self.getattr` rather than self._wrapped.getattr"""
146 try:
147 return object.__getattr__(self, name)
148 except AttributeError: # py2
149 return getattr(self, name)
151 def wrapper_setattr(self, name, value):
152 """Actual `self.setattr` rather than self._wrapped.setattr"""
153 return object.__setattr__(self, name, value)
155 def __init__(self, wrapped):
156 """
157 Thin wrapper around a given object
158 """
159 self.wrapper_setattr('_wrapped', wrapped)
162class SimpleTextIOWrapper(ObjectWrapper):
163 """
164 Change only `.write()` of the wrapped object by encoding the passed
165 value and passing the result to the wrapped object's `.write()` method.
166 """
167 # pylint: disable=too-few-public-methods
168 def __init__(self, wrapped, encoding):
169 super(SimpleTextIOWrapper, self).__init__(wrapped)
170 self.wrapper_setattr('encoding', encoding)
172 def write(self, s):
173 """
174 Encode `s` and pass to the wrapped object's `.write()` method.
175 """
176 return self._wrapped.write(s.encode(self.wrapper_getattr('encoding')))
178 def __eq__(self, other):
179 return self._wrapped == getattr(other, '_wrapped', other)
182class DisableOnWriteError(ObjectWrapper):
183 """
184 Disable the given `tqdm_instance` upon `write()` or `flush()` errors.
185 """
186 @staticmethod
187 def disable_on_exception(tqdm_instance, func):
188 """
189 Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`.
190 """
191 tqdm_instance = proxy(tqdm_instance)
193 def inner(*args, **kwargs):
194 try:
195 return func(*args, **kwargs)
196 except OSError as e:
197 if e.errno != 5:
198 raise
199 try:
200 tqdm_instance.miniters = float('inf')
201 except ReferenceError:
202 pass
203 except ValueError as e:
204 if 'closed' not in str(e):
205 raise
206 try:
207 tqdm_instance.miniters = float('inf')
208 except ReferenceError:
209 pass
210 return inner
212 def __init__(self, wrapped, tqdm_instance):
213 super(DisableOnWriteError, self).__init__(wrapped)
214 if hasattr(wrapped, 'write'):
215 self.wrapper_setattr(
216 'write', self.disable_on_exception(tqdm_instance, wrapped.write))
217 if hasattr(wrapped, 'flush'):
218 self.wrapper_setattr(
219 'flush', self.disable_on_exception(tqdm_instance, wrapped.flush))
221 def __eq__(self, other):
222 return self._wrapped == getattr(other, '_wrapped', other)
225class CallbackIOWrapper(ObjectWrapper):
226 def __init__(self, callback, stream, method="read"):
227 """
228 Wrap a given `file`-like object's `read()` or `write()` to report
229 lengths to the given `callback`
230 """
231 super(CallbackIOWrapper, self).__init__(stream)
232 func = getattr(stream, method)
233 if method == "write":
234 @wraps(func)
235 def write(data, *args, **kwargs):
236 res = func(data, *args, **kwargs)
237 callback(len(data))
238 return res
239 self.wrapper_setattr('write', write)
240 elif method == "read":
241 @wraps(func)
242 def read(*args, **kwargs):
243 data = func(*args, **kwargs)
244 callback(len(data))
245 return data
246 self.wrapper_setattr('read', read)
247 else:
248 raise KeyError("Can only wrap read/write methods")
251def _is_utf(encoding):
252 try:
253 u'\u2588\u2589'.encode(encoding)
254 except UnicodeEncodeError:
255 return False
256 except Exception:
257 try:
258 return encoding.lower().startswith('utf-') or ('U8' == encoding)
259 except Exception:
260 return False
261 else:
262 return True
265def _supports_unicode(fp):
266 try:
267 return _is_utf(fp.encoding)
268 except AttributeError:
269 return False
272def _is_ascii(s):
273 if isinstance(s, str):
274 for c in s:
275 if ord(c) > 255:
276 return False
277 return True
278 return _supports_unicode(s)
281def _screen_shape_wrapper(): # pragma: no cover
282 """
283 Return a function which returns console dimensions (width, height).
284 Supported: linux, osx, windows, cygwin.
285 """
286 _screen_shape = None
287 if IS_WIN:
288 _screen_shape = _screen_shape_windows
289 if _screen_shape is None:
290 _screen_shape = _screen_shape_tput
291 if IS_NIX:
292 _screen_shape = _screen_shape_linux
293 return _screen_shape
296def _screen_shape_windows(fp): # pragma: no cover
297 try:
298 import struct
299 from ctypes import create_string_buffer, windll
300 from sys import stdin, stdout
302 io_handle = -12 # assume stderr
303 if fp == stdin:
304 io_handle = -10
305 elif fp == stdout:
306 io_handle = -11
308 h = windll.kernel32.GetStdHandle(io_handle)
309 csbi = create_string_buffer(22)
310 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
311 if res:
312 (_bufx, _bufy, _curx, _cury, _wattr, left, top, right, bottom,
313 _maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)
314 return right - left, bottom - top # +1
315 except Exception: # nosec
316 pass
317 return None, None
320def _screen_shape_tput(*_): # pragma: no cover
321 """cygwin xterm (windows)"""
322 try:
323 import shlex
324 from subprocess import check_call # nosec
325 return [int(check_call(shlex.split('tput ' + i))) - 1
326 for i in ('cols', 'lines')]
327 except Exception: # nosec
328 pass
329 return None, None
332def _screen_shape_linux(fp): # pragma: no cover
334 try:
335 from array import array
336 from fcntl import ioctl
337 from termios import TIOCGWINSZ
338 except ImportError:
339 return None, None
340 else:
341 try:
342 rows, cols = array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[:2]
343 return cols, rows
344 except Exception:
345 try:
346 return [int(os.environ[i]) - 1 for i in ("COLUMNS", "LINES")]
347 except (KeyError, ValueError):
348 return None, None
351def _environ_cols_wrapper(): # pragma: no cover
352 """
353 Return a function which returns console width.
354 Supported: linux, osx, windows, cygwin.
355 """
356 warn("Use `_screen_shape_wrapper()(file)[0]` instead of"
357 " `_environ_cols_wrapper()(file)`", DeprecationWarning, stacklevel=2)
358 shape = _screen_shape_wrapper()
359 if not shape:
360 return None
362 @wraps(shape)
363 def inner(fp):
364 return shape(fp)[0]
366 return inner
369def _term_move_up(): # pragma: no cover
370 return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A'
373def _text_width(s):
374 return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in str(s))
377def disp_len(data):
378 """
379 Returns the real on-screen length of a string which may contain
380 ANSI control codes and wide chars.
381 """
382 return _text_width(RE_ANSI.sub('', data))
385def disp_trim(data, length):
386 """
387 Trim a string which may contain ANSI control characters.
388 """
389 if len(data) == disp_len(data):
390 return data[:length]
392 ansi_present = bool(RE_ANSI.search(data))
393 while disp_len(data) > length: # carefully delete one char at a time
394 data = data[:-1]
395 if ansi_present and bool(RE_ANSI.search(data)):
396 # assume ANSI reset is required
397 return data if data.endswith("\033[0m") else data + "\033[0m"
398 return data