Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tqdm/utils.py: 56%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2General helpers required for `tqdm.std`.
3"""
4import os
5import re
6import sys
7from functools import partial, partialmethod, wraps
8from inspect import signature
9# TODO consider using wcswidth third-party package for 0-width characters
10from unicodedata import east_asian_width
11from warnings import warn
12from weakref import proxy
14_range, _unich, _unicode, _basestring = range, chr, str, str
15CUR_OS = sys.platform
16IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin'])
17IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin', 'freebsd'])
18RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]")
20try:
21 if IS_WIN:
22 import colorama
23 else:
24 raise ImportError
25except ImportError:
26 colorama = None
27else:
28 try:
29 colorama.init(strip=False)
30 except TypeError:
31 colorama.init()
34def envwrap(name, app="", types=None, is_method=False):
35 """
36 Basic (env-only) version of [envwrap](https://github.com/tqdm/envwrap).
37 Install `envwrap` for config file support.
38 """
39 if types is None:
40 types = {}
41 if name[-1] == "_":
42 name = name[:-1]
43 warn("Trailing underscore in `name` is automatic", DeprecationWarning, stacklevel=2)
44 prefixes = (name, f"{name}_{app}") if app else (name,)
45 env_overrides = {}
46 for prefix in prefixes:
47 prefix = prefix.upper() + "_"
48 i = len(prefix)
49 env_overrides.update(
50 (k[i:].lower(), v) for k, v in os.environ.items() if k.startswith(prefix))
51 part = partialmethod if is_method else partial
53 def wrap(func):
54 params = signature(func).parameters
55 # ignore unknown env vars
56 overrides = {k: v for k, v in env_overrides.items() if k in params}
57 # infer overrides' `type`s
58 for k in overrides:
59 param = params[k]
60 if param.annotation is not param.empty: # typehints
61 for typ in getattr(param.annotation, '__args__', (param.annotation,)):
62 try:
63 overrides[k] = typ(overrides[k])
64 except Exception: # nosec B110
65 pass
66 else:
67 break
68 elif param.default is not None: # type of default value
69 overrides[k] = type(param.default)(overrides[k])
70 else:
71 try: # `types` fallback
72 overrides[k] = types[k](overrides[k])
73 except KeyError: # keep unconverted (`str`)
74 pass
75 return part(func, **overrides)
76 return wrap
79try:
80 from envwrap import envwrap # noqa: F401, F811, pylint: disable=unused-import
81except ModuleNotFoundError:
82 pass
85class FormatReplace:
86 """
87 >>> a = FormatReplace('something')
88 >>> f"{a:5d}"
89 'something'
90 """ # NOQA: P102
91 def __init__(self, replace=''):
92 self.replace = replace
93 self.format_called = 0
95 def __format__(self, _):
96 self.format_called += 1
97 return self.replace
100class Comparable:
101 """Assumes child has self._comparable attr/@property"""
102 def __lt__(self, other):
103 return self._comparable < other._comparable
105 def __le__(self, other):
106 return (self < other) or (self == other)
108 def __eq__(self, other):
109 return self._comparable == other._comparable
111 def __ne__(self, other):
112 return not self == other
114 def __gt__(self, other):
115 return not self <= other
117 def __ge__(self, other):
118 return not self < other
121class ObjectWrapper:
122 def __getattr__(self, name):
123 return getattr(self._wrapped, name)
125 def __setattr__(self, name, value):
126 return setattr(self._wrapped, name, value)
128 def wrapper_getattr(self, name):
129 """Actual `self.getattr` rather than self._wrapped.getattr"""
130 try:
131 return object.__getattr__(self, name)
132 except AttributeError: # py2
133 return getattr(self, name)
135 def wrapper_setattr(self, name, value):
136 """Actual `self.setattr` rather than self._wrapped.setattr"""
137 return object.__setattr__(self, name, value)
139 def __init__(self, wrapped):
140 """
141 Thin wrapper around a given object
142 """
143 self.wrapper_setattr('_wrapped', wrapped)
146class SimpleTextIOWrapper(ObjectWrapper):
147 """
148 Change only `.write()` of the wrapped object by encoding the passed
149 value and passing the result to the wrapped object's `.write()` method.
150 """
151 # pylint: disable=too-few-public-methods
152 def __init__(self, wrapped, encoding):
153 super().__init__(wrapped)
154 self.wrapper_setattr('encoding', encoding)
156 def write(self, s):
157 """
158 Encode `s` and pass to the wrapped object's `.write()` method.
159 """
160 return self._wrapped.write(s.encode(self.wrapper_getattr('encoding')))
162 def __eq__(self, other):
163 return self._wrapped == getattr(other, '_wrapped', other)
166class DisableOnWriteError(ObjectWrapper):
167 """
168 Disable the given `tqdm_instance` upon `write()` or `flush()` errors.
169 """
170 @staticmethod
171 def disable_on_exception(tqdm_instance, func):
172 """
173 Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`.
174 """
175 tqdm_instance = proxy(tqdm_instance)
177 def inner(*args, **kwargs):
178 try:
179 return func(*args, **kwargs)
180 except OSError as e:
181 if e.errno != 5:
182 raise
183 try:
184 tqdm_instance.miniters = float('inf')
185 except ReferenceError:
186 pass
187 except ValueError as e:
188 if 'closed' not in str(e):
189 raise
190 try:
191 tqdm_instance.miniters = float('inf')
192 except ReferenceError:
193 pass
194 return inner
196 def __init__(self, wrapped, tqdm_instance): # noqa: B042
197 super().__init__(wrapped)
198 if hasattr(wrapped, 'write'):
199 self.wrapper_setattr(
200 'write', self.disable_on_exception(tqdm_instance, wrapped.write))
201 if hasattr(wrapped, 'flush'):
202 self.wrapper_setattr(
203 'flush', self.disable_on_exception(tqdm_instance, wrapped.flush))
205 def __eq__(self, other):
206 return self._wrapped == getattr(other, '_wrapped', other)
209class CallbackIOWrapper(ObjectWrapper):
210 def __init__(self, callback, stream, method="read"):
211 """
212 Wrap a given `file`-like object's `read()` or `write()` to report
213 lengths to the given `callback`
214 """
215 super().__init__(stream)
216 func = getattr(stream, method)
217 if method == "write":
218 @wraps(func)
219 def write(data, *args, **kwargs):
220 res = func(data, *args, **kwargs)
221 callback(len(data))
222 return res
223 self.wrapper_setattr('write', write)
224 elif method == "read":
225 @wraps(func)
226 def read(*args, **kwargs):
227 data = func(*args, **kwargs)
228 callback(len(data))
229 return data
230 self.wrapper_setattr('read', read)
231 else:
232 raise KeyError("Can only wrap read/write methods")
235def _is_utf(encoding):
236 try:
237 '\u2588\u2589'.encode(encoding)
238 except UnicodeEncodeError:
239 return False
240 except Exception:
241 try:
242 return encoding.lower().startswith('utf-') or ('U8' == encoding)
243 except Exception:
244 return False
245 else:
246 return True
249def _supports_unicode(fp):
250 try:
251 return _is_utf(fp.encoding)
252 except AttributeError:
253 return False
256def _is_ascii(s):
257 if isinstance(s, str):
258 for c in s:
259 if ord(c) > 255:
260 return False
261 return True
262 return _supports_unicode(s)
265def _screen_shape_wrapper(): # pragma: no cover
266 """
267 Return a function which returns console dimensions (width, height).
268 Supported: linux, osx, windows, cygwin.
269 """
270 from os import get_terminal_size
272 def inner(fp):
273 try:
274 cols, lines = get_terminal_size(getattr(fp, 'fileno', lambda: None)())
275 return cols - 1, lines - 1
276 except Exception:
277 return None, None
279 return inner
282def _environ_cols_wrapper(): # pragma: no cover
283 """
284 Return a function which returns console width.
285 Supported: linux, osx, windows, cygwin.
286 """
287 warn("Use `_screen_shape_wrapper()(file)[0]` instead of"
288 " `_environ_cols_wrapper()(file)`", DeprecationWarning, stacklevel=2)
289 shape = _screen_shape_wrapper()
290 if not shape:
291 return None
293 @wraps(shape)
294 def inner(fp):
295 return shape(fp)[0]
297 return inner
300def _term_move_up(): # pragma: no cover
301 return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A'
304def _text_width(s):
305 return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in str(s))
308def disp_len(data):
309 """
310 Returns the real on-screen length of a string which may contain
311 ANSI control codes and wide chars.
312 """
313 return _text_width(RE_ANSI.sub('', data))
316def disp_trim(data, length):
317 """
318 Trim a string which may contain ANSI control characters.
319 """
320 if len(data) == disp_len(data):
321 return data[:length]
323 ansi_present = bool(RE_ANSI.search(data))
324 while disp_len(data) > length: # carefully delete one char at a time
325 data = data[:-1]
326 if ansi_present and bool(RE_ANSI.search(data)):
327 # assume ANSI reset is required
328 return data if data.endswith("\033[0m") else data + "\033[0m"
329 return data