Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dask/callbacks.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

60 statements  

1from __future__ import annotations 

2 

3from collections.abc import Callable 

4from contextlib import contextmanager 

5from typing import ClassVar 

6 

7__all__ = ["Callback", "add_callbacks"] 

8 

9 

10class Callback: 

11 """Base class for using the callback mechanism 

12 

13 Create a callback with functions of the following signatures: 

14 

15 >>> def start(dsk): 

16 ... pass 

17 >>> def start_state(dsk, state): 

18 ... pass 

19 >>> def pretask(key, dsk, state): 

20 ... pass 

21 >>> def posttask(key, result, dsk, state, worker_id): 

22 ... pass 

23 >>> def finish(dsk, state, failed): 

24 ... pass 

25 

26 You may then construct a callback object with any number of them 

27 

28 >>> cb = Callback(pretask=pretask, finish=finish) 

29 

30 And use it either as a context manager over a compute/get call 

31 

32 >>> with cb: # doctest: +SKIP 

33 ... x.compute() 

34 

35 Or globally with the ``register`` method 

36 

37 >>> cb.register() 

38 >>> cb.unregister() 

39 

40 Alternatively subclass the ``Callback`` class with your own methods. 

41 

42 >>> class PrintKeys(Callback): 

43 ... def _pretask(self, key, dask, state): 

44 ... print("Computing: {0}!".format(repr(key))) 

45 

46 >>> with PrintKeys(): # doctest: +SKIP 

47 ... x.compute() 

48 """ 

49 

50 active: ClassVar[set[tuple[Callable | None, ...]]] = set() 

51 

52 def __init__( 

53 self, start=None, start_state=None, pretask=None, posttask=None, finish=None 

54 ): 

55 if start: 

56 self._start = start 

57 if start_state: 

58 self._start_state = start_state 

59 if pretask: 

60 self._pretask = pretask 

61 if posttask: 

62 self._posttask = posttask 

63 if finish: 

64 self._finish = finish 

65 

66 @property 

67 def _callback(self) -> tuple[Callable | None, ...]: 

68 fields = ["_start", "_start_state", "_pretask", "_posttask", "_finish"] 

69 return tuple(getattr(self, i, None) for i in fields) 

70 

71 def __enter__(self): 

72 self._cm = add_callbacks(self) 

73 self._cm.__enter__() 

74 return self 

75 

76 def __exit__(self, *args): 

77 self._cm.__exit__(*args) 

78 

79 def register(self) -> None: 

80 Callback.active.add(self._callback) 

81 

82 def unregister(self) -> None: 

83 Callback.active.remove(self._callback) 

84 

85 

86def unpack_callbacks(cbs): 

87 """Take an iterable of callbacks, return a list of each callback.""" 

88 if cbs: 

89 return [[i for i in f if i] for f in zip(*cbs)] 

90 else: 

91 return [(), (), (), (), ()] 

92 

93 

94@contextmanager 

95def local_callbacks(callbacks=None): 

96 """Allows callbacks to work with nested schedulers. 

97 

98 Callbacks will only be used by the first started scheduler they encounter. 

99 This means that only the outermost scheduler will use global callbacks.""" 

100 global_callbacks = callbacks is None 

101 if global_callbacks: 

102 callbacks, Callback.active = Callback.active, set() 

103 try: 

104 yield callbacks or () 

105 finally: 

106 if global_callbacks: 

107 Callback.active = callbacks 

108 

109 

110def normalize_callback(cb): 

111 """Normalizes a callback to a tuple""" 

112 if isinstance(cb, Callback): 

113 return cb._callback 

114 elif isinstance(cb, tuple): 

115 return cb 

116 else: 

117 raise TypeError("Callbacks must be either `Callback` or `tuple`") 

118 

119 

120class add_callbacks: 

121 """Context manager for callbacks. 

122 

123 Takes several callbacks and applies them only in the enclosed context. 

124 Callbacks can either be represented as a ``Callback`` object, or as a tuple 

125 of length 4. 

126 

127 Examples 

128 -------- 

129 >>> def pretask(key, dsk, state): 

130 ... print("Now running {0}").format(key) 

131 >>> callbacks = (None, pretask, None, None) 

132 >>> with add_callbacks(callbacks): # doctest: +SKIP 

133 ... res.compute() 

134 """ 

135 

136 def __init__(self, *callbacks): 

137 self.callbacks = [normalize_callback(c) for c in callbacks] 

138 Callback.active.update(self.callbacks) 

139 

140 def __enter__(self): 

141 return 

142 

143 def __exit__(self, type, value, traceback): 

144 for c in self.callbacks: 

145 Callback.active.discard(c)