Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tqdm/utils.py: 35%

175 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-30 06:35 +0000

1""" 

2General helpers required for `tqdm.std`. 

3""" 

4import os 

5import re 

6import sys 

7from functools import partial, partialmethod, wraps 

8from inspect import signature 

9# TODO consider using wcswidth third-party package for 0-width characters 

10from unicodedata import east_asian_width 

11from warnings import warn 

12from weakref import proxy 

13 

14_range, _unich, _unicode, _basestring = range, chr, str, str 

15CUR_OS = sys.platform 

16IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin']) 

17IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin']) 

18RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]") 

19 

20try: 

21 if IS_WIN: 

22 import colorama 

23 else: 

24 raise ImportError 

25except ImportError: 

26 colorama = None 

27else: 

28 try: 

29 colorama.init(strip=False) 

30 except TypeError: 

31 colorama.init() 

32 

33 

34def envwrap(prefix, types=None, is_method=False): 

35 """ 

36 Override parameter defaults via `os.environ[prefix + param_name]`. 

37 Maps UPPER_CASE env vars map to lower_case param names. 

38 camelCase isn't supported (because Windows ignores case). 

39 

40 Precedence (highest first): 

41 - call (`foo(a=3)`) 

42 - environ (`FOO_A=2`) 

43 - signature (`def foo(a=1)`) 

44 

45 Parameters 

46 ---------- 

47 prefix : str 

48 Env var prefix, e.g. "FOO_" 

49 types : dict, optional 

50 Fallback mappings `{'param_name': type, ...}` if types cannot be 

51 inferred from function signature. 

52 Consider using `types=collections.defaultdict(lambda: ast.literal_eval)`. 

53 is_method : bool, optional 

54 Whether to use `functools.partialmethod`. If (default: False) use `functools.partial`. 

55 

56 Examples 

57 -------- 

58 ``` 

59 $ cat foo.py 

60 from tqdm.utils import envwrap 

61 @envwrap("FOO_") 

62 def test(a=1, b=2, c=3): 

63 print(f"received: a={a}, b={b}, c={c}") 

64 

65 $ FOO_A=42 FOO_C=1337 python -c 'import foo; foo.test(c=99)' 

66 received: a=42, b=2, c=99 

67 ``` 

68 """ 

69 if types is None: 

70 types = {} 

71 i = len(prefix) 

72 env_overrides = {k[i:].lower(): v for k, v in os.environ.items() if k.startswith(prefix)} 

73 part = partialmethod if is_method else partial 

74 

75 def wrap(func): 

76 params = signature(func).parameters 

77 # ignore unknown env vars 

78 overrides = {k: v for k, v in env_overrides.items() if k in params} 

79 # infer overrides' `type`s 

80 for k in overrides: 

81 param = params[k] 

82 if param.annotation is not param.empty: # typehints 

83 for typ in getattr(param.annotation, '__args__', (param.annotation,)): 

84 try: 

85 overrides[k] = typ(overrides[k]) 

86 except Exception: 

87 pass 

88 else: 

89 break 

90 elif param.default is not None: # type of default value 

91 overrides[k] = type(param.default)(overrides[k]) 

92 else: 

93 try: # `types` fallback 

94 overrides[k] = types[k](overrides[k]) 

95 except KeyError: # keep unconverted (`str`) 

96 pass 

97 return part(func, **overrides) 

98 return wrap 

99 

100 

101class FormatReplace(object): 

102 """ 

103 >>> a = FormatReplace('something') 

104 >>> "{:5d}".format(a) 

105 'something' 

106 """ # NOQA: P102 

107 def __init__(self, replace=''): 

108 self.replace = replace 

109 self.format_called = 0 

110 

111 def __format__(self, _): 

112 self.format_called += 1 

113 return self.replace 

114 

115 

116class Comparable(object): 

117 """Assumes child has self._comparable attr/@property""" 

118 def __lt__(self, other): 

119 return self._comparable < other._comparable 

120 

121 def __le__(self, other): 

122 return (self < other) or (self == other) 

123 

124 def __eq__(self, other): 

125 return self._comparable == other._comparable 

126 

127 def __ne__(self, other): 

128 return not self == other 

129 

130 def __gt__(self, other): 

131 return not self <= other 

132 

133 def __ge__(self, other): 

134 return not self < other 

135 

136 

137class ObjectWrapper(object): 

138 def __getattr__(self, name): 

139 return getattr(self._wrapped, name) 

140 

141 def __setattr__(self, name, value): 

142 return setattr(self._wrapped, name, value) 

143 

144 def wrapper_getattr(self, name): 

145 """Actual `self.getattr` rather than self._wrapped.getattr""" 

146 try: 

147 return object.__getattr__(self, name) 

148 except AttributeError: # py2 

149 return getattr(self, name) 

150 

151 def wrapper_setattr(self, name, value): 

152 """Actual `self.setattr` rather than self._wrapped.setattr""" 

153 return object.__setattr__(self, name, value) 

154 

155 def __init__(self, wrapped): 

156 """ 

157 Thin wrapper around a given object 

158 """ 

159 self.wrapper_setattr('_wrapped', wrapped) 

160 

161 

162class SimpleTextIOWrapper(ObjectWrapper): 

163 """ 

164 Change only `.write()` of the wrapped object by encoding the passed 

165 value and passing the result to the wrapped object's `.write()` method. 

166 """ 

167 # pylint: disable=too-few-public-methods 

168 def __init__(self, wrapped, encoding): 

169 super(SimpleTextIOWrapper, self).__init__(wrapped) 

170 self.wrapper_setattr('encoding', encoding) 

171 

172 def write(self, s): 

173 """ 

174 Encode `s` and pass to the wrapped object's `.write()` method. 

175 """ 

176 return self._wrapped.write(s.encode(self.wrapper_getattr('encoding'))) 

177 

178 def __eq__(self, other): 

179 return self._wrapped == getattr(other, '_wrapped', other) 

180 

181 

182class DisableOnWriteError(ObjectWrapper): 

183 """ 

184 Disable the given `tqdm_instance` upon `write()` or `flush()` errors. 

185 """ 

186 @staticmethod 

187 def disable_on_exception(tqdm_instance, func): 

188 """ 

189 Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`. 

190 """ 

191 tqdm_instance = proxy(tqdm_instance) 

192 

193 def inner(*args, **kwargs): 

194 try: 

195 return func(*args, **kwargs) 

196 except OSError as e: 

197 if e.errno != 5: 

198 raise 

199 try: 

200 tqdm_instance.miniters = float('inf') 

201 except ReferenceError: 

202 pass 

203 except ValueError as e: 

204 if 'closed' not in str(e): 

205 raise 

206 try: 

207 tqdm_instance.miniters = float('inf') 

208 except ReferenceError: 

209 pass 

210 return inner 

211 

212 def __init__(self, wrapped, tqdm_instance): 

213 super(DisableOnWriteError, self).__init__(wrapped) 

214 if hasattr(wrapped, 'write'): 

215 self.wrapper_setattr( 

216 'write', self.disable_on_exception(tqdm_instance, wrapped.write)) 

217 if hasattr(wrapped, 'flush'): 

218 self.wrapper_setattr( 

219 'flush', self.disable_on_exception(tqdm_instance, wrapped.flush)) 

220 

221 def __eq__(self, other): 

222 return self._wrapped == getattr(other, '_wrapped', other) 

223 

224 

225class CallbackIOWrapper(ObjectWrapper): 

226 def __init__(self, callback, stream, method="read"): 

227 """ 

228 Wrap a given `file`-like object's `read()` or `write()` to report 

229 lengths to the given `callback` 

230 """ 

231 super(CallbackIOWrapper, self).__init__(stream) 

232 func = getattr(stream, method) 

233 if method == "write": 

234 @wraps(func) 

235 def write(data, *args, **kwargs): 

236 res = func(data, *args, **kwargs) 

237 callback(len(data)) 

238 return res 

239 self.wrapper_setattr('write', write) 

240 elif method == "read": 

241 @wraps(func) 

242 def read(*args, **kwargs): 

243 data = func(*args, **kwargs) 

244 callback(len(data)) 

245 return data 

246 self.wrapper_setattr('read', read) 

247 else: 

248 raise KeyError("Can only wrap read/write methods") 

249 

250 

251def _is_utf(encoding): 

252 try: 

253 u'\u2588\u2589'.encode(encoding) 

254 except UnicodeEncodeError: 

255 return False 

256 except Exception: 

257 try: 

258 return encoding.lower().startswith('utf-') or ('U8' == encoding) 

259 except Exception: 

260 return False 

261 else: 

262 return True 

263 

264 

265def _supports_unicode(fp): 

266 try: 

267 return _is_utf(fp.encoding) 

268 except AttributeError: 

269 return False 

270 

271 

272def _is_ascii(s): 

273 if isinstance(s, str): 

274 for c in s: 

275 if ord(c) > 255: 

276 return False 

277 return True 

278 return _supports_unicode(s) 

279 

280 

281def _screen_shape_wrapper(): # pragma: no cover 

282 """ 

283 Return a function which returns console dimensions (width, height). 

284 Supported: linux, osx, windows, cygwin. 

285 """ 

286 _screen_shape = None 

287 if IS_WIN: 

288 _screen_shape = _screen_shape_windows 

289 if _screen_shape is None: 

290 _screen_shape = _screen_shape_tput 

291 if IS_NIX: 

292 _screen_shape = _screen_shape_linux 

293 return _screen_shape 

294 

295 

296def _screen_shape_windows(fp): # pragma: no cover 

297 try: 

298 import struct 

299 from ctypes import create_string_buffer, windll 

300 from sys import stdin, stdout 

301 

302 io_handle = -12 # assume stderr 

303 if fp == stdin: 

304 io_handle = -10 

305 elif fp == stdout: 

306 io_handle = -11 

307 

308 h = windll.kernel32.GetStdHandle(io_handle) 

309 csbi = create_string_buffer(22) 

310 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) 

311 if res: 

312 (_bufx, _bufy, _curx, _cury, _wattr, left, top, right, bottom, 

313 _maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) 

314 return right - left, bottom - top # +1 

315 except Exception: # nosec 

316 pass 

317 return None, None 

318 

319 

320def _screen_shape_tput(*_): # pragma: no cover 

321 """cygwin xterm (windows)""" 

322 try: 

323 import shlex 

324 from subprocess import check_call # nosec 

325 return [int(check_call(shlex.split('tput ' + i))) - 1 

326 for i in ('cols', 'lines')] 

327 except Exception: # nosec 

328 pass 

329 return None, None 

330 

331 

332def _screen_shape_linux(fp): # pragma: no cover 

333 

334 try: 

335 from array import array 

336 from fcntl import ioctl 

337 from termios import TIOCGWINSZ 

338 except ImportError: 

339 return None, None 

340 else: 

341 try: 

342 rows, cols = array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[:2] 

343 return cols, rows 

344 except Exception: 

345 try: 

346 return [int(os.environ[i]) - 1 for i in ("COLUMNS", "LINES")] 

347 except (KeyError, ValueError): 

348 return None, None 

349 

350 

351def _environ_cols_wrapper(): # pragma: no cover 

352 """ 

353 Return a function which returns console width. 

354 Supported: linux, osx, windows, cygwin. 

355 """ 

356 warn("Use `_screen_shape_wrapper()(file)[0]` instead of" 

357 " `_environ_cols_wrapper()(file)`", DeprecationWarning, stacklevel=2) 

358 shape = _screen_shape_wrapper() 

359 if not shape: 

360 return None 

361 

362 @wraps(shape) 

363 def inner(fp): 

364 return shape(fp)[0] 

365 

366 return inner 

367 

368 

369def _term_move_up(): # pragma: no cover 

370 return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A' 

371 

372 

373def _text_width(s): 

374 return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in str(s)) 

375 

376 

377def disp_len(data): 

378 """ 

379 Returns the real on-screen length of a string which may contain 

380 ANSI control codes and wide chars. 

381 """ 

382 return _text_width(RE_ANSI.sub('', data)) 

383 

384 

385def disp_trim(data, length): 

386 """ 

387 Trim a string which may contain ANSI control characters. 

388 """ 

389 if len(data) == disp_len(data): 

390 return data[:length] 

391 

392 ansi_present = bool(RE_ANSI.search(data)) 

393 while disp_len(data) > length: # carefully delete one char at a time 

394 data = data[:-1] 

395 if ansi_present and bool(RE_ANSI.search(data)): 

396 # assume ANSI reset is required 

397 return data if data.endswith("\033[0m") else data + "\033[0m" 

398 return data