Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/callbacks.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

89 statements  

1from functools import wraps 

2 

3 

4class Callback: 

5 """ 

6 Base class and interface for callback mechanism 

7 

8 This class can be used directly for monitoring file transfers by 

9 providing ``callback=Callback(hooks=...)`` (see the ``hooks`` argument, 

10 below), or subclassed for more specialised behaviour. 

11 

12 Parameters 

13 ---------- 

14 size: int (optional) 

15 Nominal quantity for the value that corresponds to a complete 

16 transfer, e.g., total number of tiles or total number of 

17 bytes 

18 value: int (0) 

19 Starting internal counter value 

20 hooks: dict or None 

21 A dict of named functions to be called on each update. The signature 

22 of these must be ``f(size, value, **kwargs)`` 

23 """ 

24 

25 def __init__(self, size=None, value=0, hooks=None, **kwargs): 

26 self.size = size 

27 self.value = value 

28 self.hooks = hooks or {} 

29 self.kw = kwargs 

30 

31 def __enter__(self): 

32 return self 

33 

34 def __exit__(self, *exc_args): 

35 self.close() 

36 

37 def close(self): 

38 """Close callback.""" 

39 

40 def branched(self, path_1, path_2, **kwargs): 

41 """ 

42 Return callback for child transfers 

43 

44 If this callback is operating at a higher level, e.g., put, which may 

45 trigger transfers that can also be monitored. The function returns a callback 

46 that has to be passed to the child method, e.g., put_file, 

47 as `callback=` argument. 

48 

49 The implementation uses `callback.branch` for compatibility. 

50 When implementing callbacks, it is recommended to override this function instead 

51 of `branch` and avoid calling `super().branched(...)`. 

52 

53 Prefer using this function over `branch`. 

54 

55 Parameters 

56 ---------- 

57 path_1: str 

58 Child's source path 

59 path_2: str 

60 Child's destination path 

61 **kwargs: 

62 Arbitrary keyword arguments 

63 

64 Returns 

65 ------- 

66 callback: Callback 

67 A callback instance to be passed to the child method 

68 """ 

69 self.branch(path_1, path_2, kwargs) 

70 # mutate kwargs so that we can force the caller to pass "callback=" explicitly 

71 return kwargs.pop("callback", DEFAULT_CALLBACK) 

72 

73 def branch_coro(self, fn): 

74 """ 

75 Wraps a coroutine, and pass a new child callback to it. 

76 """ 

77 

78 @wraps(fn) 

79 async def func(path1, path2: str, **kwargs): 

80 with self.branched(path1, path2, **kwargs) as child: 

81 return await fn(path1, path2, callback=child, **kwargs) 

82 

83 return func 

84 

85 def set_size(self, size): 

86 """ 

87 Set the internal maximum size attribute 

88 

89 Usually called if not initially set at instantiation. Note that this 

90 triggers a ``call()``. 

91 

92 Parameters 

93 ---------- 

94 size: int 

95 """ 

96 self.size = size 

97 self.call() 

98 

99 def absolute_update(self, value): 

100 """ 

101 Set the internal value state 

102 

103 Triggers ``call()`` 

104 

105 Parameters 

106 ---------- 

107 value: int 

108 """ 

109 self.value = value 

110 self.call() 

111 

112 def relative_update(self, inc=1): 

113 """ 

114 Delta increment the internal counter 

115 

116 Triggers ``call()`` 

117 

118 Parameters 

119 ---------- 

120 inc: int 

121 """ 

122 self.value += inc 

123 self.call() 

124 

125 def call(self, hook_name=None, **kwargs): 

126 """ 

127 Execute hook(s) with current state 

128 

129 Each function is passed the internal size and current value 

130 

131 Parameters 

132 ---------- 

133 hook_name: str or None 

134 If given, execute on this hook 

135 kwargs: passed on to (all) hook(s) 

136 """ 

137 if not self.hooks: 

138 return 

139 kw = self.kw.copy() 

140 kw.update(kwargs) 

141 if hook_name: 

142 if hook_name not in self.hooks: 

143 return 

144 return self.hooks[hook_name](self.size, self.value, **kw) 

145 for hook in self.hooks.values() or []: 

146 hook(self.size, self.value, **kw) 

147 

148 def wrap(self, iterable): 

149 """ 

150 Wrap an iterable to call ``relative_update`` on each iterations 

151 

152 Parameters 

153 ---------- 

154 iterable: Iterable 

155 The iterable that is being wrapped 

156 """ 

157 for item in iterable: 

158 self.relative_update() 

159 yield item 

160 

161 def branch(self, path_1, path_2, kwargs): 

162 """ 

163 Set callbacks for child transfers 

164 

165 If this callback is operating at a higher level, e.g., put, which may 

166 trigger transfers that can also be monitored. The passed kwargs are 

167 to be *mutated* to add ``callback=``, if this class supports branching 

168 to children. 

169 

170 Parameters 

171 ---------- 

172 path_1: str 

173 Child's source path 

174 path_2: str 

175 Child's destination path 

176 kwargs: dict 

177 arguments passed to child method, e.g., put_file. 

178 

179 Returns 

180 ------- 

181 

182 """ 

183 return None 

184 

185 def no_op(self, *_, **__): 

186 pass 

187 

188 def __getattr__(self, item): 

189 """ 

190 If undefined methods are called on this class, nothing happens 

191 """ 

192 return self.no_op 

193 

194 @classmethod 

195 def as_callback(cls, maybe_callback=None): 

196 """Transform callback=... into Callback instance 

197 

198 For the special value of ``None``, return the global instance of 

199 ``NoOpCallback``. This is an alternative to including 

200 ``callback=DEFAULT_CALLBACK`` directly in a method signature. 

201 """ 

202 if maybe_callback is None: 

203 return DEFAULT_CALLBACK 

204 return maybe_callback 

205 

206 

207class NoOpCallback(Callback): 

208 """ 

209 This implementation of Callback does exactly nothing 

210 """ 

211 

212 def call(self, *args, **kwargs): 

213 return None 

214 

215 

216class DotPrinterCallback(Callback): 

217 """ 

218 Simple example Callback implementation 

219 

220 Almost identical to Callback with a hook that prints a char; here we 

221 demonstrate how the outer layer may print "#" and the inner layer "." 

222 """ 

223 

224 def __init__(self, chr_to_print="#", **kwargs): 

225 self.chr = chr_to_print 

226 super().__init__(**kwargs) 

227 

228 def branch(self, path_1, path_2, kwargs): 

229 """Mutate kwargs to add new instance with different print char""" 

230 kwargs["callback"] = DotPrinterCallback(".") 

231 

232 def call(self, **kwargs): 

233 """Just outputs a character""" 

234 print(self.chr, end="") 

235 

236 

237class TqdmCallback(Callback): 

238 """ 

239 A callback to display a progress bar using tqdm 

240 

241 Parameters 

242 ---------- 

243 tqdm_kwargs : dict, (optional) 

244 Any argument accepted by the tqdm constructor. 

245 See the `tqdm doc <https://tqdm.github.io/docs/tqdm/#__init__>`_. 

246 Will be forwarded to `tqdm_cls`. 

247 tqdm_cls: (optional) 

248 subclass of `tqdm.tqdm`. If not passed, it will default to `tqdm.tqdm`. 

249 

250 Examples 

251 -------- 

252 >>> import fsspec 

253 >>> from fsspec.callbacks import TqdmCallback 

254 >>> fs = fsspec.filesystem("memory") 

255 >>> path2distant_data = "/your-path" 

256 >>> fs.upload( 

257 ".", 

258 path2distant_data, 

259 recursive=True, 

260 callback=TqdmCallback(), 

261 ) 

262 

263 You can forward args to tqdm using the ``tqdm_kwargs`` parameter. 

264 

265 >>> fs.upload( 

266 ".", 

267 path2distant_data, 

268 recursive=True, 

269 callback=TqdmCallback(tqdm_kwargs={"desc": "Your tqdm description"}), 

270 ) 

271 

272 You can also customize the progress bar by passing a subclass of `tqdm`. 

273 

274 .. code-block:: python 

275 

276 class TqdmFormat(tqdm): 

277 '''Provides a `total_time` format parameter''' 

278 @property 

279 def format_dict(self): 

280 d = super().format_dict 

281 total_time = d["elapsed"] * (d["total"] or 0) / max(d["n"], 1) 

282 d.update(total_time=self.format_interval(total_time) + " in total") 

283 return d 

284 

285 >>> with TqdmCallback( 

286 tqdm_kwargs={ 

287 "desc": "desc", 

288 "bar_format": "{total_time}: {percentage:.0f}%|{bar}{r_bar}", 

289 }, 

290 tqdm_cls=TqdmFormat, 

291 ) as callback: 

292 fs.upload(".", path2distant_data, recursive=True, callback=callback) 

293 """ 

294 

295 def __init__(self, tqdm_kwargs=None, *args, **kwargs): 

296 try: 

297 from tqdm import tqdm 

298 

299 except ImportError as exce: 

300 raise ImportError( 

301 "Using TqdmCallback requires tqdm to be installed" 

302 ) from exce 

303 

304 self._tqdm_cls = kwargs.pop("tqdm_cls", tqdm) 

305 self._tqdm_kwargs = tqdm_kwargs or {} 

306 self.tqdm = None 

307 super().__init__(*args, **kwargs) 

308 

309 def call(self, *args, **kwargs): 

310 if self.tqdm is None: 

311 self.tqdm = self._tqdm_cls(total=self.size, **self._tqdm_kwargs) 

312 self.tqdm.total = self.size 

313 self.tqdm.update(self.value - self.tqdm.n) 

314 

315 def close(self): 

316 if self.tqdm is not None: 

317 self.tqdm.close() 

318 self.tqdm = None 

319 

320 def __del__(self): 

321 return self.close() 

322 

323 

324DEFAULT_CALLBACK = _DEFAULT_CALLBACK = NoOpCallback()