1from functools import wraps
2
3
4class Callback:
5 """
6 Base class and interface for callback mechanism
7
8 This class can be used directly for monitoring file transfers by
9 providing ``callback=Callback(hooks=...)`` (see the ``hooks`` argument,
10 below), or subclassed for more specialised behaviour.
11
12 Parameters
13 ----------
14 size: int (optional)
15 Nominal quantity for the value that corresponds to a complete
16 transfer, e.g., total number of tiles or total number of
17 bytes
18 value: int (0)
19 Starting internal counter value
20 hooks: dict or None
21 A dict of named functions to be called on each update. The signature
22 of these must be ``f(size, value, **kwargs)``
23 """
24
25 def __init__(self, size=None, value=0, hooks=None, **kwargs):
26 self.size = size
27 self.value = value
28 self.hooks = hooks or {}
29 self.kw = kwargs
30
31 def __enter__(self):
32 return self
33
34 def __exit__(self, *exc_args):
35 self.close()
36
37 def close(self):
38 """Close callback."""
39
40 def branched(self, path_1, path_2, **kwargs):
41 """
42 Return callback for child transfers
43
44 If this callback is operating at a higher level, e.g., put, which may
45 trigger transfers that can also be monitored. The function returns a callback
46 that has to be passed to the child method, e.g., put_file,
47 as `callback=` argument.
48
49 The implementation uses `callback.branch` for compatibility.
50 When implementing callbacks, it is recommended to override this function instead
51 of `branch` and avoid calling `super().branched(...)`.
52
53 Prefer using this function over `branch`.
54
55 Parameters
56 ----------
57 path_1: str
58 Child's source path
59 path_2: str
60 Child's destination path
61 **kwargs:
62 Arbitrary keyword arguments
63
64 Returns
65 -------
66 callback: Callback
67 A callback instance to be passed to the child method
68 """
69 self.branch(path_1, path_2, kwargs)
70 # mutate kwargs so that we can force the caller to pass "callback=" explicitly
71 return kwargs.pop("callback", DEFAULT_CALLBACK)
72
73 def branch_coro(self, fn):
74 """
75 Wraps a coroutine, and pass a new child callback to it.
76 """
77
78 @wraps(fn)
79 async def func(path1, path2: str, **kwargs):
80 with self.branched(path1, path2, **kwargs) as child:
81 return await fn(path1, path2, callback=child, **kwargs)
82
83 return func
84
85 def set_size(self, size):
86 """
87 Set the internal maximum size attribute
88
89 Usually called if not initially set at instantiation. Note that this
90 triggers a ``call()``.
91
92 Parameters
93 ----------
94 size: int
95 """
96 self.size = size
97 self.call()
98
99 def absolute_update(self, value):
100 """
101 Set the internal value state
102
103 Triggers ``call()``
104
105 Parameters
106 ----------
107 value: int
108 """
109 self.value = value
110 self.call()
111
112 def relative_update(self, inc=1):
113 """
114 Delta increment the internal counter
115
116 Triggers ``call()``
117
118 Parameters
119 ----------
120 inc: int
121 """
122 self.value += inc
123 self.call()
124
125 def call(self, hook_name=None, **kwargs):
126 """
127 Execute hook(s) with current state
128
129 Each function is passed the internal size and current value
130
131 Parameters
132 ----------
133 hook_name: str or None
134 If given, execute on this hook
135 kwargs: passed on to (all) hook(s)
136 """
137 if not self.hooks:
138 return
139 kw = self.kw.copy()
140 kw.update(kwargs)
141 if hook_name:
142 if hook_name not in self.hooks:
143 return
144 return self.hooks[hook_name](self.size, self.value, **kw)
145 for hook in self.hooks.values() or []:
146 hook(self.size, self.value, **kw)
147
148 def wrap(self, iterable):
149 """
150 Wrap an iterable to call ``relative_update`` on each iterations
151
152 Parameters
153 ----------
154 iterable: Iterable
155 The iterable that is being wrapped
156 """
157 for item in iterable:
158 self.relative_update()
159 yield item
160
161 def branch(self, path_1, path_2, kwargs):
162 """
163 Set callbacks for child transfers
164
165 If this callback is operating at a higher level, e.g., put, which may
166 trigger transfers that can also be monitored. The passed kwargs are
167 to be *mutated* to add ``callback=``, if this class supports branching
168 to children.
169
170 Parameters
171 ----------
172 path_1: str
173 Child's source path
174 path_2: str
175 Child's destination path
176 kwargs: dict
177 arguments passed to child method, e.g., put_file.
178
179 Returns
180 -------
181
182 """
183 return None
184
185 def no_op(self, *_, **__):
186 pass
187
188 def __getattr__(self, item):
189 """
190 If undefined methods are called on this class, nothing happens
191 """
192 return self.no_op
193
194 @classmethod
195 def as_callback(cls, maybe_callback=None):
196 """Transform callback=... into Callback instance
197
198 For the special value of ``None``, return the global instance of
199 ``NoOpCallback``. This is an alternative to including
200 ``callback=DEFAULT_CALLBACK`` directly in a method signature.
201 """
202 if maybe_callback is None:
203 return DEFAULT_CALLBACK
204 return maybe_callback
205
206
207class NoOpCallback(Callback):
208 """
209 This implementation of Callback does exactly nothing
210 """
211
212 def call(self, *args, **kwargs):
213 return None
214
215
216class DotPrinterCallback(Callback):
217 """
218 Simple example Callback implementation
219
220 Almost identical to Callback with a hook that prints a char; here we
221 demonstrate how the outer layer may print "#" and the inner layer "."
222 """
223
224 def __init__(self, chr_to_print="#", **kwargs):
225 self.chr = chr_to_print
226 super().__init__(**kwargs)
227
228 def branch(self, path_1, path_2, kwargs):
229 """Mutate kwargs to add new instance with different print char"""
230 kwargs["callback"] = DotPrinterCallback(".")
231
232 def call(self, **kwargs):
233 """Just outputs a character"""
234 print(self.chr, end="")
235
236
237class TqdmCallback(Callback):
238 """
239 A callback to display a progress bar using tqdm
240
241 Parameters
242 ----------
243 tqdm_kwargs : dict, (optional)
244 Any argument accepted by the tqdm constructor.
245 See the `tqdm doc <https://tqdm.github.io/docs/tqdm/#__init__>`_.
246 Will be forwarded to `tqdm_cls`.
247 tqdm_cls: (optional)
248 subclass of `tqdm.tqdm`. If not passed, it will default to `tqdm.tqdm`.
249
250 Examples
251 --------
252 >>> import fsspec
253 >>> from fsspec.callbacks import TqdmCallback
254 >>> fs = fsspec.filesystem("memory")
255 >>> path2distant_data = "/your-path"
256 >>> fs.upload(
257 ".",
258 path2distant_data,
259 recursive=True,
260 callback=TqdmCallback(),
261 )
262
263 You can forward args to tqdm using the ``tqdm_kwargs`` parameter.
264
265 >>> fs.upload(
266 ".",
267 path2distant_data,
268 recursive=True,
269 callback=TqdmCallback(tqdm_kwargs={"desc": "Your tqdm description"}),
270 )
271
272 You can also customize the progress bar by passing a subclass of `tqdm`.
273
274 .. code-block:: python
275
276 class TqdmFormat(tqdm):
277 '''Provides a `total_time` format parameter'''
278 @property
279 def format_dict(self):
280 d = super().format_dict
281 total_time = d["elapsed"] * (d["total"] or 0) / max(d["n"], 1)
282 d.update(total_time=self.format_interval(total_time) + " in total")
283 return d
284
285 >>> with TqdmCallback(
286 tqdm_kwargs={
287 "desc": "desc",
288 "bar_format": "{total_time}: {percentage:.0f}%|{bar}{r_bar}",
289 },
290 tqdm_cls=TqdmFormat,
291 ) as callback:
292 fs.upload(".", path2distant_data, recursive=True, callback=callback)
293 """
294
295 def __init__(self, tqdm_kwargs=None, *args, **kwargs):
296 try:
297 from tqdm import tqdm
298
299 except ImportError as exce:
300 raise ImportError(
301 "Using TqdmCallback requires tqdm to be installed"
302 ) from exce
303
304 self._tqdm_cls = kwargs.pop("tqdm_cls", tqdm)
305 self._tqdm_kwargs = tqdm_kwargs or {}
306 self.tqdm = None
307 super().__init__(*args, **kwargs)
308
309 def call(self, *args, **kwargs):
310 if self.tqdm is None:
311 self.tqdm = self._tqdm_cls(total=self.size, **self._tqdm_kwargs)
312 self.tqdm.total = self.size
313 self.tqdm.update(self.value - self.tqdm.n)
314
315 def close(self):
316 if self.tqdm is not None:
317 self.tqdm.close()
318 self.tqdm = None
319
320 def __del__(self):
321 return self.close()
322
323
324DEFAULT_CALLBACK = _DEFAULT_CALLBACK = NoOpCallback()