Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tqdm/utils.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

176 statements  

1""" 

2General helpers required for `tqdm.std`. 

3""" 

4import os 

5import re 

6import sys 

7from functools import partial, partialmethod, wraps 

8from inspect import signature 

9# TODO consider using wcswidth third-party package for 0-width characters 

10from unicodedata import east_asian_width 

11from warnings import warn 

12from weakref import proxy 

13 

14_range, _unich, _unicode, _basestring = range, chr, str, str 

15CUR_OS = sys.platform 

16IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin']) 

17IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin', 'freebsd']) 

18RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]") 

19 

20try: 

21 if IS_WIN: 

22 import colorama 

23 else: 

24 raise ImportError 

25except ImportError: 

26 colorama = None 

27else: 

28 try: 

29 colorama.init(strip=False) 

30 except TypeError: 

31 colorama.init() 

32 

33 

34def envwrap(prefix, types=None, is_method=False): 

35 """ 

36 Override parameter defaults via `os.environ[prefix + param_name]`. 

37 Maps UPPER_CASE env vars map to lower_case param names. 

38 camelCase isn't supported (because Windows ignores case). 

39 

40 Precedence (highest first): 

41 

42 - call (`foo(a=3)`) 

43 - environ (`FOO_A=2`) 

44 - signature (`def foo(a=1)`) 

45 

46 Parameters 

47 ---------- 

48 prefix : str 

49 Env var prefix, e.g. "FOO_" 

50 types : dict, optional 

51 Fallback mappings `{'param_name': type, ...}` if types cannot be 

52 inferred from function signature. 

53 Consider using `types=collections.defaultdict(lambda: ast.literal_eval)`. 

54 is_method : bool, optional 

55 Whether to use `functools.partialmethod`. If (default: False) use `functools.partial`. 

56 

57 Examples 

58 -------- 

59 ``` 

60 $ cat foo.py 

61 from tqdm.utils import envwrap 

62 @envwrap("FOO_") 

63 def test(a=1, b=2, c=3): 

64 print(f"received: a={a}, b={b}, c={c}") 

65 

66 $ FOO_A=42 FOO_C=1337 python -c 'import foo; foo.test(c=99)' 

67 received: a=42, b=2, c=99 

68 ``` 

69 """ 

70 if types is None: 

71 types = {} 

72 i = len(prefix) 

73 env_overrides = {k[i:].lower(): v for k, v in os.environ.items() if k.startswith(prefix)} 

74 part = partialmethod if is_method else partial 

75 

76 def wrap(func): 

77 params = signature(func).parameters 

78 # ignore unknown env vars 

79 overrides = {k: v for k, v in env_overrides.items() if k in params} 

80 # infer overrides' `type`s 

81 for k in overrides: 

82 param = params[k] 

83 if param.annotation is not param.empty: # typehints 

84 for typ in getattr(param.annotation, '__args__', (param.annotation,)): 

85 try: 

86 overrides[k] = typ(overrides[k]) 

87 except Exception: 

88 pass 

89 else: 

90 break 

91 elif param.default is not None: # type of default value 

92 overrides[k] = type(param.default)(overrides[k]) 

93 else: 

94 try: # `types` fallback 

95 overrides[k] = types[k](overrides[k]) 

96 except KeyError: # keep unconverted (`str`) 

97 pass 

98 return part(func, **overrides) 

99 return wrap 

100 

101 

102class FormatReplace(object): 

103 """ 

104 >>> a = FormatReplace('something') 

105 >>> f"{a:5d}" 

106 'something' 

107 """ # NOQA: P102 

108 def __init__(self, replace=''): 

109 self.replace = replace 

110 self.format_called = 0 

111 

112 def __format__(self, _): 

113 self.format_called += 1 

114 return self.replace 

115 

116 

117class Comparable(object): 

118 """Assumes child has self._comparable attr/@property""" 

119 def __lt__(self, other): 

120 return self._comparable < other._comparable 

121 

122 def __le__(self, other): 

123 return (self < other) or (self == other) 

124 

125 def __eq__(self, other): 

126 return self._comparable == other._comparable 

127 

128 def __ne__(self, other): 

129 return not self == other 

130 

131 def __gt__(self, other): 

132 return not self <= other 

133 

134 def __ge__(self, other): 

135 return not self < other 

136 

137 

138class ObjectWrapper(object): 

139 def __getattr__(self, name): 

140 return getattr(self._wrapped, name) 

141 

142 def __setattr__(self, name, value): 

143 return setattr(self._wrapped, name, value) 

144 

145 def wrapper_getattr(self, name): 

146 """Actual `self.getattr` rather than self._wrapped.getattr""" 

147 try: 

148 return object.__getattr__(self, name) 

149 except AttributeError: # py2 

150 return getattr(self, name) 

151 

152 def wrapper_setattr(self, name, value): 

153 """Actual `self.setattr` rather than self._wrapped.setattr""" 

154 return object.__setattr__(self, name, value) 

155 

156 def __init__(self, wrapped): 

157 """ 

158 Thin wrapper around a given object 

159 """ 

160 self.wrapper_setattr('_wrapped', wrapped) 

161 

162 

163class SimpleTextIOWrapper(ObjectWrapper): 

164 """ 

165 Change only `.write()` of the wrapped object by encoding the passed 

166 value and passing the result to the wrapped object's `.write()` method. 

167 """ 

168 # pylint: disable=too-few-public-methods 

169 def __init__(self, wrapped, encoding): 

170 super().__init__(wrapped) 

171 self.wrapper_setattr('encoding', encoding) 

172 

173 def write(self, s): 

174 """ 

175 Encode `s` and pass to the wrapped object's `.write()` method. 

176 """ 

177 return self._wrapped.write(s.encode(self.wrapper_getattr('encoding'))) 

178 

179 def __eq__(self, other): 

180 return self._wrapped == getattr(other, '_wrapped', other) 

181 

182 

183class DisableOnWriteError(ObjectWrapper): 

184 """ 

185 Disable the given `tqdm_instance` upon `write()` or `flush()` errors. 

186 """ 

187 @staticmethod 

188 def disable_on_exception(tqdm_instance, func): 

189 """ 

190 Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`. 

191 """ 

192 tqdm_instance = proxy(tqdm_instance) 

193 

194 def inner(*args, **kwargs): 

195 try: 

196 return func(*args, **kwargs) 

197 except OSError as e: 

198 if e.errno != 5: 

199 raise 

200 try: 

201 tqdm_instance.miniters = float('inf') 

202 except ReferenceError: 

203 pass 

204 except ValueError as e: 

205 if 'closed' not in str(e): 

206 raise 

207 try: 

208 tqdm_instance.miniters = float('inf') 

209 except ReferenceError: 

210 pass 

211 return inner 

212 

213 def __init__(self, wrapped, tqdm_instance): 

214 super().__init__(wrapped) 

215 if hasattr(wrapped, 'write'): 

216 self.wrapper_setattr( 

217 'write', self.disable_on_exception(tqdm_instance, wrapped.write)) 

218 if hasattr(wrapped, 'flush'): 

219 self.wrapper_setattr( 

220 'flush', self.disable_on_exception(tqdm_instance, wrapped.flush)) 

221 

222 def __eq__(self, other): 

223 return self._wrapped == getattr(other, '_wrapped', other) 

224 

225 

226class CallbackIOWrapper(ObjectWrapper): 

227 def __init__(self, callback, stream, method="read"): 

228 """ 

229 Wrap a given `file`-like object's `read()` or `write()` to report 

230 lengths to the given `callback` 

231 """ 

232 super().__init__(stream) 

233 func = getattr(stream, method) 

234 if method == "write": 

235 @wraps(func) 

236 def write(data, *args, **kwargs): 

237 res = func(data, *args, **kwargs) 

238 callback(len(data)) 

239 return res 

240 self.wrapper_setattr('write', write) 

241 elif method == "read": 

242 @wraps(func) 

243 def read(*args, **kwargs): 

244 data = func(*args, **kwargs) 

245 callback(len(data)) 

246 return data 

247 self.wrapper_setattr('read', read) 

248 else: 

249 raise KeyError("Can only wrap read/write methods") 

250 

251 

252def _is_utf(encoding): 

253 try: 

254 u'\u2588\u2589'.encode(encoding) 

255 except UnicodeEncodeError: 

256 return False 

257 except Exception: 

258 try: 

259 return encoding.lower().startswith('utf-') or ('U8' == encoding) 

260 except Exception: 

261 return False 

262 else: 

263 return True 

264 

265 

266def _supports_unicode(fp): 

267 try: 

268 return _is_utf(fp.encoding) 

269 except AttributeError: 

270 return False 

271 

272 

273def _is_ascii(s): 

274 if isinstance(s, str): 

275 for c in s: 

276 if ord(c) > 255: 

277 return False 

278 return True 

279 return _supports_unicode(s) 

280 

281 

282def _screen_shape_wrapper(): # pragma: no cover 

283 """ 

284 Return a function which returns console dimensions (width, height). 

285 Supported: linux, osx, windows, cygwin. 

286 """ 

287 _screen_shape = None 

288 if IS_WIN: 

289 _screen_shape = _screen_shape_windows 

290 if _screen_shape is None: 

291 _screen_shape = _screen_shape_tput 

292 if IS_NIX: 

293 _screen_shape = _screen_shape_linux 

294 return _screen_shape 

295 

296 

297def _screen_shape_windows(fp): # pragma: no cover 

298 try: 

299 import struct 

300 from ctypes import create_string_buffer, windll 

301 from sys import stdin, stdout 

302 

303 io_handle = -12 # assume stderr 

304 if fp == stdin: 

305 io_handle = -10 

306 elif fp == stdout: 

307 io_handle = -11 

308 

309 h = windll.kernel32.GetStdHandle(io_handle) 

310 csbi = create_string_buffer(22) 

311 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) 

312 if res: 

313 (_bufx, _bufy, _curx, _cury, _wattr, left, top, right, bottom, 

314 _maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) 

315 return right - left, bottom - top # +1 

316 except Exception: # nosec 

317 pass 

318 return None, None 

319 

320 

321def _screen_shape_tput(*_): # pragma: no cover 

322 """cygwin xterm (windows)""" 

323 try: 

324 import shlex 

325 from subprocess import check_call # nosec 

326 return [int(check_call(shlex.split('tput ' + i))) - 1 

327 for i in ('cols', 'lines')] 

328 except Exception: # nosec 

329 pass 

330 return None, None 

331 

332 

333def _screen_shape_linux(fp): # pragma: no cover 

334 

335 try: 

336 from array import array 

337 from fcntl import ioctl 

338 from termios import TIOCGWINSZ 

339 except ImportError: 

340 return None, None 

341 else: 

342 try: 

343 rows, cols = array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[:2] 

344 return cols, rows 

345 except Exception: 

346 try: 

347 return [int(os.environ[i]) - 1 for i in ("COLUMNS", "LINES")] 

348 except (KeyError, ValueError): 

349 return None, None 

350 

351 

352def _environ_cols_wrapper(): # pragma: no cover 

353 """ 

354 Return a function which returns console width. 

355 Supported: linux, osx, windows, cygwin. 

356 """ 

357 warn("Use `_screen_shape_wrapper()(file)[0]` instead of" 

358 " `_environ_cols_wrapper()(file)`", DeprecationWarning, stacklevel=2) 

359 shape = _screen_shape_wrapper() 

360 if not shape: 

361 return None 

362 

363 @wraps(shape) 

364 def inner(fp): 

365 return shape(fp)[0] 

366 

367 return inner 

368 

369 

370def _term_move_up(): # pragma: no cover 

371 return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A' 

372 

373 

374def _text_width(s): 

375 return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in str(s)) 

376 

377 

378def disp_len(data): 

379 """ 

380 Returns the real on-screen length of a string which may contain 

381 ANSI control codes and wide chars. 

382 """ 

383 return _text_width(RE_ANSI.sub('', data)) 

384 

385 

386def disp_trim(data, length): 

387 """ 

388 Trim a string which may contain ANSI control characters. 

389 """ 

390 if len(data) == disp_len(data): 

391 return data[:length] 

392 

393 ansi_present = bool(RE_ANSI.search(data)) 

394 while disp_len(data) > length: # carefully delete one char at a time 

395 data = data[:-1] 

396 if ansi_present and bool(RE_ANSI.search(data)): 

397 # assume ANSI reset is required 

398 return data if data.endswith("\033[0m") else data + "\033[0m" 

399 return data