Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/callbacks.py: 37%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3from collections.abc import Callable
4from contextlib import contextmanager
5from typing import ClassVar
7__all__ = ["Callback", "add_callbacks"]
10class Callback:
11 """Base class for using the callback mechanism
13 Create a callback with functions of the following signatures:
15 >>> def start(dsk):
16 ... pass
17 >>> def start_state(dsk, state):
18 ... pass
19 >>> def pretask(key, dsk, state):
20 ... pass
21 >>> def posttask(key, result, dsk, state, worker_id):
22 ... pass
23 >>> def finish(dsk, state, failed):
24 ... pass
26 You may then construct a callback object with any number of them
28 >>> cb = Callback(pretask=pretask, finish=finish)
30 And use it either as a context manager over a compute/get call
32 >>> with cb: # doctest: +SKIP
33 ... x.compute()
35 Or globally with the ``register`` method
37 >>> cb.register()
38 >>> cb.unregister()
40 Alternatively subclass the ``Callback`` class with your own methods.
42 >>> class PrintKeys(Callback):
43 ... def _pretask(self, key, dask, state):
44 ... print("Computing: {0}!".format(repr(key)))
46 >>> with PrintKeys(): # doctest: +SKIP
47 ... x.compute()
48 """
50 active: ClassVar[set[tuple[Callable | None, ...]]] = set()
52 def __init__(
53 self, start=None, start_state=None, pretask=None, posttask=None, finish=None
54 ):
55 if start:
56 self._start = start
57 if start_state:
58 self._start_state = start_state
59 if pretask:
60 self._pretask = pretask
61 if posttask:
62 self._posttask = posttask
63 if finish:
64 self._finish = finish
66 @property
67 def _callback(self) -> tuple[Callable | None, ...]:
68 fields = ["_start", "_start_state", "_pretask", "_posttask", "_finish"]
69 return tuple(getattr(self, i, None) for i in fields)
71 def __enter__(self):
72 self._cm = add_callbacks(self)
73 self._cm.__enter__()
74 return self
76 def __exit__(self, *args):
77 self._cm.__exit__(*args)
79 def register(self) -> None:
80 Callback.active.add(self._callback)
82 def unregister(self) -> None:
83 Callback.active.remove(self._callback)
86def unpack_callbacks(cbs):
87 """Take an iterable of callbacks, return a list of each callback."""
88 if cbs:
89 return [[i for i in f if i] for f in zip(*cbs)]
90 else:
91 return [(), (), (), (), ()]
94@contextmanager
95def local_callbacks(callbacks=None):
96 """Allows callbacks to work with nested schedulers.
98 Callbacks will only be used by the first started scheduler they encounter.
99 This means that only the outermost scheduler will use global callbacks."""
100 global_callbacks = callbacks is None
101 if global_callbacks:
102 callbacks, Callback.active = Callback.active, set()
103 try:
104 yield callbacks or ()
105 finally:
106 if global_callbacks:
107 Callback.active = callbacks
110def normalize_callback(cb):
111 """Normalizes a callback to a tuple"""
112 if isinstance(cb, Callback):
113 return cb._callback
114 elif isinstance(cb, tuple):
115 return cb
116 else:
117 raise TypeError("Callbacks must be either `Callback` or `tuple`")
120class add_callbacks:
121 """Context manager for callbacks.
123 Takes several callbacks and applies them only in the enclosed context.
124 Callbacks can either be represented as a ``Callback`` object, or as a tuple
125 of length 4.
127 Examples
128 --------
129 >>> def pretask(key, dsk, state):
130 ... print("Now running {0}").format(key)
131 >>> callbacks = (None, pretask, None, None)
132 >>> with add_callbacks(callbacks): # doctest: +SKIP
133 ... res.compute()
134 """
136 def __init__(self, *callbacks):
137 self.callbacks = [normalize_callback(c) for c in callbacks]
138 Callback.active.update(self.callbacks)
140 def __enter__(self):
141 return
143 def __exit__(self, type, value, traceback):
144 for c in self.callbacks:
145 Callback.active.discard(c)