Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tqdm/utils.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

187 statements  

1""" 

2General helpers required for `tqdm.std`. 

3""" 

4import os 

5import re 

6import sys 

7from functools import partial, partialmethod, wraps 

8from inspect import signature 

9# TODO consider using wcswidth third-party package for 0-width characters 

10from unicodedata import east_asian_width 

11from warnings import warn 

12from weakref import proxy 

13 

14_range, _unich, _unicode, _basestring = range, chr, str, str 

15CUR_OS = sys.platform 

16IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin']) 

17IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin', 'freebsd']) 

18RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]") 

19 

20try: 

21 if IS_WIN: 

22 import colorama 

23 else: 

24 raise ImportError 

25except ImportError: 

26 colorama = None 

27else: 

28 try: 

29 colorama.init(strip=False) 

30 except TypeError: 

31 colorama.init() 

32 

33 

34def envwrap(name, app="", types=None, is_method=False): 

35 """ 

36 Basic (env-only) version of [envwrap](https://github.com/tqdm/envwrap). 

37 Install `envwrap` for config file support. 

38 """ 

39 if types is None: 

40 types = {} 

41 if name[-1] == "_": 

42 name = name[:-1] 

43 warn("Trailing underscore in `name` is automatic", DeprecationWarning, stacklevel=2) 

44 prefixes = (name, f"{name}_{app}") if app else (name,) 

45 env_overrides = {} 

46 for prefix in prefixes: 

47 prefix = prefix.upper() + "_" 

48 i = len(prefix) 

49 env_overrides.update( 

50 (k[i:].lower(), v) for k, v in os.environ.items() if k.startswith(prefix)) 

51 part = partialmethod if is_method else partial 

52 

53 def wrap(func): 

54 params = signature(func).parameters 

55 # ignore unknown env vars 

56 overrides = {k: v for k, v in env_overrides.items() if k in params} 

57 # infer overrides' `type`s 

58 for k in overrides: 

59 param = params[k] 

60 if param.annotation is not param.empty: # typehints 

61 for typ in getattr(param.annotation, '__args__', (param.annotation,)): 

62 try: 

63 overrides[k] = typ(overrides[k]) 

64 except Exception: # nosec B110 

65 pass 

66 else: 

67 break 

68 elif param.default is not None: # type of default value 

69 overrides[k] = type(param.default)(overrides[k]) 

70 else: 

71 try: # `types` fallback 

72 overrides[k] = types[k](overrides[k]) 

73 except KeyError: # keep unconverted (`str`) 

74 pass 

75 return part(func, **overrides) 

76 return wrap 

77 

78 

79try: 

80 from envwrap import envwrap # noqa: F401, F811, pylint: disable=unused-import 

81except ModuleNotFoundError: 

82 pass 

83 

84 

85class FormatReplace: 

86 """ 

87 >>> a = FormatReplace('something') 

88 >>> f"{a:5d}" 

89 'something' 

90 """ # NOQA: P102 

91 def __init__(self, replace=''): 

92 self.replace = replace 

93 self.format_called = 0 

94 

95 def __format__(self, _): 

96 self.format_called += 1 

97 return self.replace 

98 

99 

100class Comparable: 

101 """Assumes child has self._comparable attr/@property""" 

102 def __lt__(self, other): 

103 return self._comparable < other._comparable 

104 

105 def __le__(self, other): 

106 return (self < other) or (self == other) 

107 

108 def __eq__(self, other): 

109 return self._comparable == other._comparable 

110 

111 def __ne__(self, other): 

112 return not self == other 

113 

114 def __gt__(self, other): 

115 return not self <= other 

116 

117 def __ge__(self, other): 

118 return not self < other 

119 

120 

121class ObjectWrapper: 

122 def __getattr__(self, name): 

123 return getattr(self._wrapped, name) 

124 

125 def __setattr__(self, name, value): 

126 return setattr(self._wrapped, name, value) 

127 

128 def wrapper_getattr(self, name): 

129 """Actual `self.getattr` rather than self._wrapped.getattr""" 

130 try: 

131 return object.__getattr__(self, name) 

132 except AttributeError: # py2 

133 return getattr(self, name) 

134 

135 def wrapper_setattr(self, name, value): 

136 """Actual `self.setattr` rather than self._wrapped.setattr""" 

137 return object.__setattr__(self, name, value) 

138 

139 def __init__(self, wrapped): 

140 """ 

141 Thin wrapper around a given object 

142 """ 

143 self.wrapper_setattr('_wrapped', wrapped) 

144 

145 

146class SimpleTextIOWrapper(ObjectWrapper): 

147 """ 

148 Change only `.write()` of the wrapped object by encoding the passed 

149 value and passing the result to the wrapped object's `.write()` method. 

150 """ 

151 # pylint: disable=too-few-public-methods 

152 def __init__(self, wrapped, encoding): 

153 super().__init__(wrapped) 

154 self.wrapper_setattr('encoding', encoding) 

155 

156 def write(self, s): 

157 """ 

158 Encode `s` and pass to the wrapped object's `.write()` method. 

159 """ 

160 return self._wrapped.write(s.encode(self.wrapper_getattr('encoding'))) 

161 

162 def __eq__(self, other): 

163 return self._wrapped == getattr(other, '_wrapped', other) 

164 

165 

166class DisableOnWriteError(ObjectWrapper): 

167 """ 

168 Disable the given `tqdm_instance` upon `write()` or `flush()` errors. 

169 """ 

170 @staticmethod 

171 def disable_on_exception(tqdm_instance, func): 

172 """ 

173 Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`. 

174 """ 

175 tqdm_instance = proxy(tqdm_instance) 

176 

177 def inner(*args, **kwargs): 

178 try: 

179 return func(*args, **kwargs) 

180 except OSError as e: 

181 if e.errno != 5: 

182 raise 

183 try: 

184 tqdm_instance.miniters = float('inf') 

185 except ReferenceError: 

186 pass 

187 except ValueError as e: 

188 if 'closed' not in str(e): 

189 raise 

190 try: 

191 tqdm_instance.miniters = float('inf') 

192 except ReferenceError: 

193 pass 

194 return inner 

195 

196 def __init__(self, wrapped, tqdm_instance): # noqa: B042 

197 super().__init__(wrapped) 

198 if hasattr(wrapped, 'write'): 

199 self.wrapper_setattr( 

200 'write', self.disable_on_exception(tqdm_instance, wrapped.write)) 

201 if hasattr(wrapped, 'flush'): 

202 self.wrapper_setattr( 

203 'flush', self.disable_on_exception(tqdm_instance, wrapped.flush)) 

204 

205 def __eq__(self, other): 

206 return self._wrapped == getattr(other, '_wrapped', other) 

207 

208 

209class CallbackIOWrapper(ObjectWrapper): 

210 def __init__(self, callback, stream, method="read"): 

211 """ 

212 Wrap a given `file`-like object's `read()` or `write()` to report 

213 lengths to the given `callback` 

214 """ 

215 super().__init__(stream) 

216 func = getattr(stream, method) 

217 if method == "write": 

218 @wraps(func) 

219 def write(data, *args, **kwargs): 

220 res = func(data, *args, **kwargs) 

221 callback(len(data)) 

222 return res 

223 self.wrapper_setattr('write', write) 

224 elif method == "read": 

225 @wraps(func) 

226 def read(*args, **kwargs): 

227 data = func(*args, **kwargs) 

228 callback(len(data)) 

229 return data 

230 self.wrapper_setattr('read', read) 

231 else: 

232 raise KeyError("Can only wrap read/write methods") 

233 

234 

235def _is_utf(encoding): 

236 try: 

237 '\u2588\u2589'.encode(encoding) 

238 except UnicodeEncodeError: 

239 return False 

240 except Exception: 

241 try: 

242 return encoding.lower().startswith('utf-') or ('U8' == encoding) 

243 except Exception: 

244 return False 

245 else: 

246 return True 

247 

248 

249def _supports_unicode(fp): 

250 try: 

251 return _is_utf(fp.encoding) 

252 except AttributeError: 

253 return False 

254 

255 

256def _is_ascii(s): 

257 if isinstance(s, str): 

258 for c in s: 

259 if ord(c) > 255: 

260 return False 

261 return True 

262 return _supports_unicode(s) 

263 

264 

265def _screen_shape_wrapper(): # pragma: no cover 

266 """ 

267 Return a function which returns console dimensions (width, height). 

268 Supported: linux, osx, windows, cygwin. 

269 """ 

270 from os import get_terminal_size 

271 

272 def inner(fp): 

273 try: 

274 cols, lines = get_terminal_size(getattr(fp, 'fileno', lambda: None)()) 

275 return cols - 1, lines - 1 

276 except Exception: 

277 return None, None 

278 

279 return inner 

280 

281 

282def _environ_cols_wrapper(): # pragma: no cover 

283 """ 

284 Return a function which returns console width. 

285 Supported: linux, osx, windows, cygwin. 

286 """ 

287 warn("Use `_screen_shape_wrapper()(file)[0]` instead of" 

288 " `_environ_cols_wrapper()(file)`", DeprecationWarning, stacklevel=2) 

289 shape = _screen_shape_wrapper() 

290 if not shape: 

291 return None 

292 

293 @wraps(shape) 

294 def inner(fp): 

295 return shape(fp)[0] 

296 

297 return inner 

298 

299 

300def _term_move_up(): # pragma: no cover 

301 return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A' 

302 

303 

304def _text_width(s): 

305 return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in str(s)) 

306 

307 

308def disp_len(data): 

309 """ 

310 Returns the real on-screen length of a string which may contain 

311 ANSI control codes and wide chars. 

312 """ 

313 return _text_width(RE_ANSI.sub('', data)) 

314 

315 

316def disp_trim(data, length): 

317 """ 

318 Trim a string which may contain ANSI control characters. 

319 """ 

320 if len(data) == disp_len(data): 

321 return data[:length] 

322 

323 ansi_present = bool(RE_ANSI.search(data)) 

324 while disp_len(data) > length: # carefully delete one char at a time 

325 data = data[:-1] 

326 if ansi_present and bool(RE_ANSI.search(data)): 

327 # assume ANSI reset is required 

328 return data if data.endswith("\033[0m") else data + "\033[0m" 

329 return data