Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tqdm/utils.py: 55%

148 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:51 +0000

1""" 

2General helpers required for `tqdm.std`. 

3""" 

4import os 

5import re 

6import sys 

7from functools import wraps 

8# TODO consider using wcswidth third-party package for 0-width characters 

9from unicodedata import east_asian_width 

10from warnings import warn 

11from weakref import proxy 

12 

13_range, _unich, _unicode, _basestring = range, chr, str, str 

14CUR_OS = sys.platform 

15IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin']) 

16IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin']) 

17RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]") 

18 

19try: 

20 if IS_WIN: 

21 import colorama 

22 else: 

23 raise ImportError 

24except ImportError: 

25 colorama = None 

26else: 

27 try: 

28 colorama.init(strip=False) 

29 except TypeError: 

30 colorama.init() 

31 

32 

33class FormatReplace(object): 

34 """ 

35 >>> a = FormatReplace('something') 

36 >>> "{:5d}".format(a) 

37 'something' 

38 """ # NOQA: P102 

39 def __init__(self, replace=''): 

40 self.replace = replace 

41 self.format_called = 0 

42 

43 def __format__(self, _): 

44 self.format_called += 1 

45 return self.replace 

46 

47 

48class Comparable(object): 

49 """Assumes child has self._comparable attr/@property""" 

50 def __lt__(self, other): 

51 return self._comparable < other._comparable 

52 

53 def __le__(self, other): 

54 return (self < other) or (self == other) 

55 

56 def __eq__(self, other): 

57 return self._comparable == other._comparable 

58 

59 def __ne__(self, other): 

60 return not self == other 

61 

62 def __gt__(self, other): 

63 return not self <= other 

64 

65 def __ge__(self, other): 

66 return not self < other 

67 

68 

69class ObjectWrapper(object): 

70 def __getattr__(self, name): 

71 return getattr(self._wrapped, name) 

72 

73 def __setattr__(self, name, value): 

74 return setattr(self._wrapped, name, value) 

75 

76 def wrapper_getattr(self, name): 

77 """Actual `self.getattr` rather than self._wrapped.getattr""" 

78 try: 

79 return object.__getattr__(self, name) 

80 except AttributeError: # py2 

81 return getattr(self, name) 

82 

83 def wrapper_setattr(self, name, value): 

84 """Actual `self.setattr` rather than self._wrapped.setattr""" 

85 return object.__setattr__(self, name, value) 

86 

87 def __init__(self, wrapped): 

88 """ 

89 Thin wrapper around a given object 

90 """ 

91 self.wrapper_setattr('_wrapped', wrapped) 

92 

93 

94class SimpleTextIOWrapper(ObjectWrapper): 

95 """ 

96 Change only `.write()` of the wrapped object by encoding the passed 

97 value and passing the result to the wrapped object's `.write()` method. 

98 """ 

99 # pylint: disable=too-few-public-methods 

100 def __init__(self, wrapped, encoding): 

101 super(SimpleTextIOWrapper, self).__init__(wrapped) 

102 self.wrapper_setattr('encoding', encoding) 

103 

104 def write(self, s): 

105 """ 

106 Encode `s` and pass to the wrapped object's `.write()` method. 

107 """ 

108 return self._wrapped.write(s.encode(self.wrapper_getattr('encoding'))) 

109 

110 def __eq__(self, other): 

111 return self._wrapped == getattr(other, '_wrapped', other) 

112 

113 

114class DisableOnWriteError(ObjectWrapper): 

115 """ 

116 Disable the given `tqdm_instance` upon `write()` or `flush()` errors. 

117 """ 

118 @staticmethod 

119 def disable_on_exception(tqdm_instance, func): 

120 """ 

121 Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`. 

122 """ 

123 tqdm_instance = proxy(tqdm_instance) 

124 

125 def inner(*args, **kwargs): 

126 try: 

127 return func(*args, **kwargs) 

128 except OSError as e: 

129 if e.errno != 5: 

130 raise 

131 try: 

132 tqdm_instance.miniters = float('inf') 

133 except ReferenceError: 

134 pass 

135 except ValueError as e: 

136 if 'closed' not in str(e): 

137 raise 

138 try: 

139 tqdm_instance.miniters = float('inf') 

140 except ReferenceError: 

141 pass 

142 return inner 

143 

144 def __init__(self, wrapped, tqdm_instance): 

145 super(DisableOnWriteError, self).__init__(wrapped) 

146 if hasattr(wrapped, 'write'): 

147 self.wrapper_setattr( 

148 'write', self.disable_on_exception(tqdm_instance, wrapped.write)) 

149 if hasattr(wrapped, 'flush'): 

150 self.wrapper_setattr( 

151 'flush', self.disable_on_exception(tqdm_instance, wrapped.flush)) 

152 

153 def __eq__(self, other): 

154 return self._wrapped == getattr(other, '_wrapped', other) 

155 

156 

157class CallbackIOWrapper(ObjectWrapper): 

158 def __init__(self, callback, stream, method="read"): 

159 """ 

160 Wrap a given `file`-like object's `read()` or `write()` to report 

161 lengths to the given `callback` 

162 """ 

163 super(CallbackIOWrapper, self).__init__(stream) 

164 func = getattr(stream, method) 

165 if method == "write": 

166 @wraps(func) 

167 def write(data, *args, **kwargs): 

168 res = func(data, *args, **kwargs) 

169 callback(len(data)) 

170 return res 

171 self.wrapper_setattr('write', write) 

172 elif method == "read": 

173 @wraps(func) 

174 def read(*args, **kwargs): 

175 data = func(*args, **kwargs) 

176 callback(len(data)) 

177 return data 

178 self.wrapper_setattr('read', read) 

179 else: 

180 raise KeyError("Can only wrap read/write methods") 

181 

182 

183def _is_utf(encoding): 

184 try: 

185 u'\u2588\u2589'.encode(encoding) 

186 except UnicodeEncodeError: 

187 return False 

188 except Exception: 

189 try: 

190 return encoding.lower().startswith('utf-') or ('U8' == encoding) 

191 except Exception: 

192 return False 

193 else: 

194 return True 

195 

196 

197def _supports_unicode(fp): 

198 try: 

199 return _is_utf(fp.encoding) 

200 except AttributeError: 

201 return False 

202 

203 

204def _is_ascii(s): 

205 if isinstance(s, str): 

206 for c in s: 

207 if ord(c) > 255: 

208 return False 

209 return True 

210 return _supports_unicode(s) 

211 

212 

213def _screen_shape_wrapper(): # pragma: no cover 

214 """ 

215 Return a function which returns console dimensions (width, height). 

216 Supported: linux, osx, windows, cygwin. 

217 """ 

218 _screen_shape = None 

219 if IS_WIN: 

220 _screen_shape = _screen_shape_windows 

221 if _screen_shape is None: 

222 _screen_shape = _screen_shape_tput 

223 if IS_NIX: 

224 _screen_shape = _screen_shape_linux 

225 return _screen_shape 

226 

227 

228def _screen_shape_windows(fp): # pragma: no cover 

229 try: 

230 import struct 

231 from ctypes import create_string_buffer, windll 

232 from sys import stdin, stdout 

233 

234 io_handle = -12 # assume stderr 

235 if fp == stdin: 

236 io_handle = -10 

237 elif fp == stdout: 

238 io_handle = -11 

239 

240 h = windll.kernel32.GetStdHandle(io_handle) 

241 csbi = create_string_buffer(22) 

242 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) 

243 if res: 

244 (_bufx, _bufy, _curx, _cury, _wattr, left, top, right, bottom, 

245 _maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) 

246 return right - left, bottom - top # +1 

247 except Exception: # nosec 

248 pass 

249 return None, None 

250 

251 

252def _screen_shape_tput(*_): # pragma: no cover 

253 """cygwin xterm (windows)""" 

254 try: 

255 import shlex 

256 from subprocess import check_call # nosec 

257 return [int(check_call(shlex.split('tput ' + i))) - 1 

258 for i in ('cols', 'lines')] 

259 except Exception: # nosec 

260 pass 

261 return None, None 

262 

263 

264def _screen_shape_linux(fp): # pragma: no cover 

265 

266 try: 

267 from array import array 

268 from fcntl import ioctl 

269 from termios import TIOCGWINSZ 

270 except ImportError: 

271 return None, None 

272 else: 

273 try: 

274 rows, cols = array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[:2] 

275 return cols, rows 

276 except Exception: 

277 try: 

278 return [int(os.environ[i]) - 1 for i in ("COLUMNS", "LINES")] 

279 except (KeyError, ValueError): 

280 return None, None 

281 

282 

283def _environ_cols_wrapper(): # pragma: no cover 

284 """ 

285 Return a function which returns console width. 

286 Supported: linux, osx, windows, cygwin. 

287 """ 

288 warn("Use `_screen_shape_wrapper()(file)[0]` instead of" 

289 " `_environ_cols_wrapper()(file)`", DeprecationWarning, stacklevel=2) 

290 shape = _screen_shape_wrapper() 

291 if not shape: 

292 return None 

293 

294 @wraps(shape) 

295 def inner(fp): 

296 return shape(fp)[0] 

297 

298 return inner 

299 

300 

301def _term_move_up(): # pragma: no cover 

302 return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A' 

303 

304 

305def _text_width(s): 

306 return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in str(s)) 

307 

308 

309def disp_len(data): 

310 """ 

311 Returns the real on-screen length of a string which may contain 

312 ANSI control codes and wide chars. 

313 """ 

314 return _text_width(RE_ANSI.sub('', data)) 

315 

316 

317def disp_trim(data, length): 

318 """ 

319 Trim a string which may contain ANSI control characters. 

320 """ 

321 if len(data) == disp_len(data): 

322 return data[:length] 

323 

324 ansi_present = bool(RE_ANSI.search(data)) 

325 while disp_len(data) > length: # carefully delete one char at a time 

326 data = data[:-1] 

327 if ansi_present and bool(RE_ANSI.search(data)): 

328 # assume ANSI reset is required 

329 return data if data.endswith("\033[0m") else data + "\033[0m" 

330 return data