Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tqdm/utils.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2General helpers required for `tqdm.std`.
3"""
4import os
5import re
6import sys
7from functools import partial, partialmethod, wraps
8from inspect import signature
9# TODO consider using wcswidth third-party package for 0-width characters
10from unicodedata import east_asian_width
11from warnings import warn
12from weakref import proxy
14_range, _unich, _unicode, _basestring = range, chr, str, str
15CUR_OS = sys.platform
16IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin'])
17IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin', 'freebsd'])
18RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]")
20try:
21 if IS_WIN:
22 import colorama
23 else:
24 raise ImportError
25except ImportError:
26 colorama = None
27else:
28 try:
29 colorama.init(strip=False)
30 except TypeError:
31 colorama.init()
34def envwrap(prefix, types=None, is_method=False):
35 """
36 Override parameter defaults via `os.environ[prefix + param_name]`.
37 Maps UPPER_CASE env vars map to lower_case param names.
38 camelCase isn't supported (because Windows ignores case).
40 Precedence (highest first):
42 - call (`foo(a=3)`)
43 - environ (`FOO_A=2`)
44 - signature (`def foo(a=1)`)
46 Parameters
47 ----------
48 prefix : str
49 Env var prefix, e.g. "FOO_"
50 types : dict, optional
51 Fallback mappings `{'param_name': type, ...}` if types cannot be
52 inferred from function signature.
53 Consider using `types=collections.defaultdict(lambda: ast.literal_eval)`.
54 is_method : bool, optional
55 Whether to use `functools.partialmethod`. If (default: False) use `functools.partial`.
57 Examples
58 --------
59 ```
60 $ cat foo.py
61 from tqdm.utils import envwrap
62 @envwrap("FOO_")
63 def test(a=1, b=2, c=3):
64 print(f"received: a={a}, b={b}, c={c}")
66 $ FOO_A=42 FOO_C=1337 python -c 'import foo; foo.test(c=99)'
67 received: a=42, b=2, c=99
68 ```
69 """
70 if types is None:
71 types = {}
72 i = len(prefix)
73 env_overrides = {k[i:].lower(): v for k, v in os.environ.items() if k.startswith(prefix)}
74 part = partialmethod if is_method else partial
76 def wrap(func):
77 params = signature(func).parameters
78 # ignore unknown env vars
79 overrides = {k: v for k, v in env_overrides.items() if k in params}
80 # infer overrides' `type`s
81 for k in overrides:
82 param = params[k]
83 if param.annotation is not param.empty: # typehints
84 for typ in getattr(param.annotation, '__args__', (param.annotation,)):
85 try:
86 overrides[k] = typ(overrides[k])
87 except Exception:
88 pass
89 else:
90 break
91 elif param.default is not None: # type of default value
92 overrides[k] = type(param.default)(overrides[k])
93 else:
94 try: # `types` fallback
95 overrides[k] = types[k](overrides[k])
96 except KeyError: # keep unconverted (`str`)
97 pass
98 return part(func, **overrides)
99 return wrap
102class FormatReplace(object):
103 """
104 >>> a = FormatReplace('something')
105 >>> f"{a:5d}"
106 'something'
107 """ # NOQA: P102
108 def __init__(self, replace=''):
109 self.replace = replace
110 self.format_called = 0
112 def __format__(self, _):
113 self.format_called += 1
114 return self.replace
117class Comparable(object):
118 """Assumes child has self._comparable attr/@property"""
119 def __lt__(self, other):
120 return self._comparable < other._comparable
122 def __le__(self, other):
123 return (self < other) or (self == other)
125 def __eq__(self, other):
126 return self._comparable == other._comparable
128 def __ne__(self, other):
129 return not self == other
131 def __gt__(self, other):
132 return not self <= other
134 def __ge__(self, other):
135 return not self < other
138class ObjectWrapper(object):
139 def __getattr__(self, name):
140 return getattr(self._wrapped, name)
142 def __setattr__(self, name, value):
143 return setattr(self._wrapped, name, value)
145 def wrapper_getattr(self, name):
146 """Actual `self.getattr` rather than self._wrapped.getattr"""
147 try:
148 return object.__getattr__(self, name)
149 except AttributeError: # py2
150 return getattr(self, name)
152 def wrapper_setattr(self, name, value):
153 """Actual `self.setattr` rather than self._wrapped.setattr"""
154 return object.__setattr__(self, name, value)
156 def __init__(self, wrapped):
157 """
158 Thin wrapper around a given object
159 """
160 self.wrapper_setattr('_wrapped', wrapped)
163class SimpleTextIOWrapper(ObjectWrapper):
164 """
165 Change only `.write()` of the wrapped object by encoding the passed
166 value and passing the result to the wrapped object's `.write()` method.
167 """
168 # pylint: disable=too-few-public-methods
169 def __init__(self, wrapped, encoding):
170 super().__init__(wrapped)
171 self.wrapper_setattr('encoding', encoding)
173 def write(self, s):
174 """
175 Encode `s` and pass to the wrapped object's `.write()` method.
176 """
177 return self._wrapped.write(s.encode(self.wrapper_getattr('encoding')))
179 def __eq__(self, other):
180 return self._wrapped == getattr(other, '_wrapped', other)
183class DisableOnWriteError(ObjectWrapper):
184 """
185 Disable the given `tqdm_instance` upon `write()` or `flush()` errors.
186 """
187 @staticmethod
188 def disable_on_exception(tqdm_instance, func):
189 """
190 Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`.
191 """
192 tqdm_instance = proxy(tqdm_instance)
194 def inner(*args, **kwargs):
195 try:
196 return func(*args, **kwargs)
197 except OSError as e:
198 if e.errno != 5:
199 raise
200 try:
201 tqdm_instance.miniters = float('inf')
202 except ReferenceError:
203 pass
204 except ValueError as e:
205 if 'closed' not in str(e):
206 raise
207 try:
208 tqdm_instance.miniters = float('inf')
209 except ReferenceError:
210 pass
211 return inner
213 def __init__(self, wrapped, tqdm_instance):
214 super().__init__(wrapped)
215 if hasattr(wrapped, 'write'):
216 self.wrapper_setattr(
217 'write', self.disable_on_exception(tqdm_instance, wrapped.write))
218 if hasattr(wrapped, 'flush'):
219 self.wrapper_setattr(
220 'flush', self.disable_on_exception(tqdm_instance, wrapped.flush))
222 def __eq__(self, other):
223 return self._wrapped == getattr(other, '_wrapped', other)
226class CallbackIOWrapper(ObjectWrapper):
227 def __init__(self, callback, stream, method="read"):
228 """
229 Wrap a given `file`-like object's `read()` or `write()` to report
230 lengths to the given `callback`
231 """
232 super().__init__(stream)
233 func = getattr(stream, method)
234 if method == "write":
235 @wraps(func)
236 def write(data, *args, **kwargs):
237 res = func(data, *args, **kwargs)
238 callback(len(data))
239 return res
240 self.wrapper_setattr('write', write)
241 elif method == "read":
242 @wraps(func)
243 def read(*args, **kwargs):
244 data = func(*args, **kwargs)
245 callback(len(data))
246 return data
247 self.wrapper_setattr('read', read)
248 else:
249 raise KeyError("Can only wrap read/write methods")
252def _is_utf(encoding):
253 try:
254 u'\u2588\u2589'.encode(encoding)
255 except UnicodeEncodeError:
256 return False
257 except Exception:
258 try:
259 return encoding.lower().startswith('utf-') or ('U8' == encoding)
260 except Exception:
261 return False
262 else:
263 return True
266def _supports_unicode(fp):
267 try:
268 return _is_utf(fp.encoding)
269 except AttributeError:
270 return False
273def _is_ascii(s):
274 if isinstance(s, str):
275 for c in s:
276 if ord(c) > 255:
277 return False
278 return True
279 return _supports_unicode(s)
282def _screen_shape_wrapper(): # pragma: no cover
283 """
284 Return a function which returns console dimensions (width, height).
285 Supported: linux, osx, windows, cygwin.
286 """
287 _screen_shape = None
288 if IS_WIN:
289 _screen_shape = _screen_shape_windows
290 if _screen_shape is None:
291 _screen_shape = _screen_shape_tput
292 if IS_NIX:
293 _screen_shape = _screen_shape_linux
294 return _screen_shape
297def _screen_shape_windows(fp): # pragma: no cover
298 try:
299 import struct
300 from ctypes import create_string_buffer, windll
301 from sys import stdin, stdout
303 io_handle = -12 # assume stderr
304 if fp == stdin:
305 io_handle = -10
306 elif fp == stdout:
307 io_handle = -11
309 h = windll.kernel32.GetStdHandle(io_handle)
310 csbi = create_string_buffer(22)
311 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
312 if res:
313 (_bufx, _bufy, _curx, _cury, _wattr, left, top, right, bottom,
314 _maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)
315 return right - left, bottom - top # +1
316 except Exception: # nosec
317 pass
318 return None, None
321def _screen_shape_tput(*_): # pragma: no cover
322 """cygwin xterm (windows)"""
323 try:
324 import shlex
325 from subprocess import check_call # nosec
326 return [int(check_call(shlex.split('tput ' + i))) - 1
327 for i in ('cols', 'lines')]
328 except Exception: # nosec
329 pass
330 return None, None
333def _screen_shape_linux(fp): # pragma: no cover
335 try:
336 from array import array
337 from fcntl import ioctl
338 from termios import TIOCGWINSZ
339 except ImportError:
340 return None, None
341 else:
342 try:
343 rows, cols = array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[:2]
344 return cols, rows
345 except Exception:
346 try:
347 return [int(os.environ[i]) - 1 for i in ("COLUMNS", "LINES")]
348 except (KeyError, ValueError):
349 return None, None
352def _environ_cols_wrapper(): # pragma: no cover
353 """
354 Return a function which returns console width.
355 Supported: linux, osx, windows, cygwin.
356 """
357 warn("Use `_screen_shape_wrapper()(file)[0]` instead of"
358 " `_environ_cols_wrapper()(file)`", DeprecationWarning, stacklevel=2)
359 shape = _screen_shape_wrapper()
360 if not shape:
361 return None
363 @wraps(shape)
364 def inner(fp):
365 return shape(fp)[0]
367 return inner
370def _term_move_up(): # pragma: no cover
371 return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A'
374def _text_width(s):
375 return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in str(s))
378def disp_len(data):
379 """
380 Returns the real on-screen length of a string which may contain
381 ANSI control codes and wide chars.
382 """
383 return _text_width(RE_ANSI.sub('', data))
386def disp_trim(data, length):
387 """
388 Trim a string which may contain ANSI control characters.
389 """
390 if len(data) == disp_len(data):
391 return data[:length]
393 ansi_present = bool(RE_ANSI.search(data))
394 while disp_len(data) > length: # carefully delete one char at a time
395 data = data[:-1]
396 if ansi_present and bool(RE_ANSI.search(data)):
397 # assume ANSI reset is required
398 return data if data.endswith("\033[0m") else data + "\033[0m"
399 return data