Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tqdm/cli.py: 8%

188 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:51 +0000

1""" 

2Module version for monitoring CLI pipes (`... | python -m tqdm | ...`). 

3""" 

4import logging 

5import re 

6import sys 

7from ast import literal_eval as numeric 

8 

9from .std import TqdmKeyError, TqdmTypeError, tqdm 

10from .version import __version__ 

11 

12__all__ = ["main"] 

13log = logging.getLogger(__name__) 

14 

15 

16def cast(val, typ): 

17 log.debug((val, typ)) 

18 if " or " in typ: 

19 for t in typ.split(" or "): 

20 try: 

21 return cast(val, t) 

22 except TqdmTypeError: 

23 pass 

24 raise TqdmTypeError(val + ' : ' + typ) 

25 

26 # sys.stderr.write('\ndebug | `val:type`: `' + val + ':' + typ + '`.\n') 

27 if typ == 'bool': 

28 if (val == 'True') or (val == ''): 

29 return True 

30 elif val == 'False': 

31 return False 

32 else: 

33 raise TqdmTypeError(val + ' : ' + typ) 

34 try: 

35 return eval(typ + '("' + val + '")') 

36 except Exception: 

37 if typ == 'chr': 

38 return chr(ord(eval('"' + val + '"'))).encode() 

39 else: 

40 raise TqdmTypeError(val + ' : ' + typ) 

41 

42 

43def posix_pipe(fin, fout, delim=b'\\n', buf_size=256, 

44 callback=lambda float: None, callback_len=True): 

45 """ 

46 Params 

47 ------ 

48 fin : binary file with `read(buf_size : int)` method 

49 fout : binary file with `write` (and optionally `flush`) methods. 

50 callback : function(float), e.g.: `tqdm.update` 

51 callback_len : If (default: True) do `callback(len(buffer))`. 

52 Otherwise, do `callback(data) for data in buffer.split(delim)`. 

53 """ 

54 fp_write = fout.write 

55 

56 if not delim: 

57 while True: 

58 tmp = fin.read(buf_size) 

59 

60 # flush at EOF 

61 if not tmp: 

62 getattr(fout, 'flush', lambda: None)() 

63 return 

64 

65 fp_write(tmp) 

66 callback(len(tmp)) 

67 # return 

68 

69 buf = b'' 

70 len_delim = len(delim) 

71 # n = 0 

72 while True: 

73 tmp = fin.read(buf_size) 

74 

75 # flush at EOF 

76 if not tmp: 

77 if buf: 

78 fp_write(buf) 

79 if callback_len: 

80 # n += 1 + buf.count(delim) 

81 callback(1 + buf.count(delim)) 

82 else: 

83 for i in buf.split(delim): 

84 callback(i) 

85 getattr(fout, 'flush', lambda: None)() 

86 return # n 

87 

88 while True: 

89 i = tmp.find(delim) 

90 if i < 0: 

91 buf += tmp 

92 break 

93 fp_write(buf + tmp[:i + len(delim)]) 

94 # n += 1 

95 callback(1 if callback_len else (buf + tmp[:i])) 

96 buf = b'' 

97 tmp = tmp[i + len_delim:] 

98 

99 

100# ((opt, type), ... ) 

101RE_OPTS = re.compile(r'\n {8}(\S+)\s{2,}:\s*([^,]+)') 

102# better split method assuming no positional args 

103RE_SHLEX = re.compile(r'\s*(?<!\S)--?([^\s=]+)(\s+|=|$)') 

104 

105# TODO: add custom support for some of the following? 

106UNSUPPORTED_OPTS = ('iterable', 'gui', 'out', 'file') 

107 

108# The 8 leading spaces are required for consistency 

109CLI_EXTRA_DOC = r""" 

110 Extra CLI Options 

111 ----------------- 

112 name : type, optional 

113 TODO: find out why this is needed. 

114 delim : chr, optional 

115 Delimiting character [default: '\n']. Use '\0' for null. 

116 N.B.: on Windows systems, Python converts '\n' to '\r\n'. 

117 buf_size : int, optional 

118 String buffer size in bytes [default: 256] 

119 used when `delim` is specified. 

120 bytes : bool, optional 

121 If true, will count bytes, ignore `delim`, and default 

122 `unit_scale` to True, `unit_divisor` to 1024, and `unit` to 'B'. 

123 tee : bool, optional 

124 If true, passes `stdin` to both `stderr` and `stdout`. 

125 update : bool, optional 

126 If true, will treat input as newly elapsed iterations, 

127 i.e. numbers to pass to `update()`. Note that this is slow 

128 (~2e5 it/s) since every input must be decoded as a number. 

129 update_to : bool, optional 

130 If true, will treat input as total elapsed iterations, 

131 i.e. numbers to assign to `self.n`. Note that this is slow 

132 (~2e5 it/s) since every input must be decoded as a number. 

133 null : bool, optional 

134 If true, will discard input (no stdout). 

135 manpath : str, optional 

136 Directory in which to install tqdm man pages. 

137 comppath : str, optional 

138 Directory in which to place tqdm completion. 

139 log : str, optional 

140 CRITICAL|FATAL|ERROR|WARN(ING)|[default: 'INFO']|DEBUG|NOTSET. 

141""" 

142 

143 

144def main(fp=sys.stderr, argv=None): 

145 """ 

146 Parameters (internal use only) 

147 --------- 

148 fp : file-like object for tqdm 

149 argv : list (default: sys.argv[1:]) 

150 """ 

151 if argv is None: 

152 argv = sys.argv[1:] 

153 try: 

154 log_idx = argv.index('--log') 

155 except ValueError: 

156 for i in argv: 

157 if i.startswith('--log='): 

158 logLevel = i[len('--log='):] 

159 break 

160 else: 

161 logLevel = 'INFO' 

162 else: 

163 # argv.pop(log_idx) 

164 # logLevel = argv.pop(log_idx) 

165 logLevel = argv[log_idx + 1] 

166 logging.basicConfig(level=getattr(logging, logLevel), 

167 format="%(levelname)s:%(module)s:%(lineno)d:%(message)s") 

168 

169 d = tqdm.__init__.__doc__ + CLI_EXTRA_DOC 

170 

171 opt_types = dict(RE_OPTS.findall(d)) 

172 # opt_types['delim'] = 'chr' 

173 

174 for o in UNSUPPORTED_OPTS: 

175 opt_types.pop(o) 

176 

177 log.debug(sorted(opt_types.items())) 

178 

179 # d = RE_OPTS.sub(r' --\1=<\1> : \2', d) 

180 split = RE_OPTS.split(d) 

181 opt_types_desc = zip(split[1::3], split[2::3], split[3::3]) 

182 d = ''.join(('\n --{0} : {2}{3}' if otd[1] == 'bool' else 

183 '\n --{0}=<{1}> : {2}{3}').format( 

184 otd[0].replace('_', '-'), otd[0], *otd[1:]) 

185 for otd in opt_types_desc if otd[0] not in UNSUPPORTED_OPTS) 

186 

187 help_short = "Usage:\n tqdm [--help | options]\n" 

188 d = help_short + """ 

189Options: 

190 -h, --help Print this help and exit. 

191 -v, --version Print version and exit. 

192""" + d.strip('\n') + '\n' 

193 

194 # opts = docopt(d, version=__version__) 

195 if any(v in argv for v in ('-v', '--version')): 

196 sys.stdout.write(__version__ + '\n') 

197 sys.exit(0) 

198 elif any(v in argv for v in ('-h', '--help')): 

199 sys.stdout.write(d + '\n') 

200 sys.exit(0) 

201 elif argv and argv[0][:2] != '--': 

202 sys.stderr.write(f"Error:Unknown argument:{argv[0]}\n{help_short}") 

203 

204 argv = RE_SHLEX.split(' '.join(["tqdm"] + argv)) 

205 opts = dict(zip(argv[1::3], argv[3::3])) 

206 

207 log.debug(opts) 

208 opts.pop('log', True) 

209 

210 tqdm_args = {'file': fp} 

211 try: 

212 for (o, v) in opts.items(): 

213 o = o.replace('-', '_') 

214 try: 

215 tqdm_args[o] = cast(v, opt_types[o]) 

216 except KeyError as e: 

217 raise TqdmKeyError(str(e)) 

218 log.debug('args:' + str(tqdm_args)) 

219 

220 delim_per_char = tqdm_args.pop('bytes', False) 

221 update = tqdm_args.pop('update', False) 

222 update_to = tqdm_args.pop('update_to', False) 

223 if sum((delim_per_char, update, update_to)) > 1: 

224 raise TqdmKeyError("Can only have one of --bytes --update --update_to") 

225 except Exception: 

226 fp.write("\nError:\n" + help_short) 

227 stdin, stdout_write = sys.stdin, sys.stdout.write 

228 for i in stdin: 

229 stdout_write(i) 

230 raise 

231 else: 

232 buf_size = tqdm_args.pop('buf_size', 256) 

233 delim = tqdm_args.pop('delim', b'\\n') 

234 tee = tqdm_args.pop('tee', False) 

235 manpath = tqdm_args.pop('manpath', None) 

236 comppath = tqdm_args.pop('comppath', None) 

237 if tqdm_args.pop('null', False): 

238 class stdout(object): 

239 @staticmethod 

240 def write(_): 

241 pass 

242 else: 

243 stdout = sys.stdout 

244 stdout = getattr(stdout, 'buffer', stdout) 

245 stdin = getattr(sys.stdin, 'buffer', sys.stdin) 

246 if manpath or comppath: 

247 from importlib import resources 

248 from os import path 

249 from shutil import copyfile 

250 

251 def cp(name, dst): 

252 """copy resource `name` to `dst`""" 

253 if hasattr(resources, 'files'): 

254 copyfile(str(resources.files('tqdm') / name), dst) 

255 else: # py<3.9 

256 with resources.path('tqdm', name) as src: 

257 copyfile(str(src), dst) 

258 log.info("written:%s", dst) 

259 if manpath is not None: 

260 cp('tqdm.1', path.join(manpath, 'tqdm.1')) 

261 if comppath is not None: 

262 cp('completion.sh', path.join(comppath, 'tqdm_completion.sh')) 

263 sys.exit(0) 

264 if tee: 

265 stdout_write = stdout.write 

266 fp_write = getattr(fp, 'buffer', fp).write 

267 

268 class stdout(object): # pylint: disable=function-redefined 

269 @staticmethod 

270 def write(x): 

271 with tqdm.external_write_mode(file=fp): 

272 fp_write(x) 

273 stdout_write(x) 

274 if delim_per_char: 

275 tqdm_args.setdefault('unit', 'B') 

276 tqdm_args.setdefault('unit_scale', True) 

277 tqdm_args.setdefault('unit_divisor', 1024) 

278 log.debug(tqdm_args) 

279 with tqdm(**tqdm_args) as t: 

280 posix_pipe(stdin, stdout, '', buf_size, t.update) 

281 elif delim == b'\\n': 

282 log.debug(tqdm_args) 

283 write = stdout.write 

284 if update or update_to: 

285 with tqdm(**tqdm_args) as t: 

286 if update: 

287 def callback(i): 

288 t.update(numeric(i.decode())) 

289 else: # update_to 

290 def callback(i): 

291 t.update(numeric(i.decode()) - t.n) 

292 for i in stdin: 

293 write(i) 

294 callback(i) 

295 else: 

296 for i in tqdm(stdin, **tqdm_args): 

297 write(i) 

298 else: 

299 log.debug(tqdm_args) 

300 with tqdm(**tqdm_args) as t: 

301 callback_len = False 

302 if update: 

303 def callback(i): 

304 t.update(numeric(i.decode())) 

305 elif update_to: 

306 def callback(i): 

307 t.update(numeric(i.decode()) - t.n) 

308 else: 

309 callback = t.update 

310 callback_len = True 

311 posix_pipe(stdin, stdout, delim, buf_size, callback, callback_len)