Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipyparallel/controller/dependency.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

102 statements  

1"""Dependency utilities""" 

2 

3from types import ModuleType 

4 

5from ipyparallel.client.asyncresult import AsyncResult 

6from ipyparallel.error import UnmetDependency 

7from ipyparallel.serialize import can 

8from ipyparallel.util import interactive 

9 

10 

11class depend: 

12 """Dependency decorator, for use with tasks. 

13 

14 `@depend` lets you define a function for engine dependencies 

15 just like you use `apply` for tasks. 

16 

17 

18 Examples 

19 -------- 

20 :: 

21 

22 @depend(df, a,b, c=5) 

23 def f(m,n,p) 

24 

25 view.apply(f, 1,2,3) 

26 

27 will call df(a,b,c=5) on the engine, and if it returns False or 

28 raises an UnmetDependency error, then the task will not be run 

29 and another engine will be tried. 

30 """ 

31 

32 def __init__(self, _wrapped_f, *args, **kwargs): 

33 self.f = _wrapped_f 

34 self.args = args 

35 self.kwargs = kwargs 

36 

37 def __call__(self, f): 

38 return dependent(f, self.f, *self.args, **self.kwargs) 

39 

40 

41class dependent: 

42 """A function that depends on another function. 

43 This is an object to prevent the closure used 

44 in traditional decorators, which are not picklable. 

45 """ 

46 

47 def __init__(self, _wrapped_f, _wrapped_df, *dargs, **dkwargs): 

48 self.f = _wrapped_f 

49 name = getattr(_wrapped_f, '__name__', 'f') 

50 self.__name__ = name 

51 self.df = _wrapped_df 

52 self.dargs = dargs 

53 self.dkwargs = dkwargs 

54 

55 def check_dependency(self): 

56 if self.df(*self.dargs, **self.dkwargs) is False: 

57 raise UnmetDependency() 

58 

59 def __call__(self, *args, **kwargs): 

60 return self.f(*args, **kwargs) 

61 

62 

63@interactive 

64def _require(*modules, **mapping): 

65 """Helper for @require decorator.""" 

66 from ipyparallel.error import UnmetDependency 

67 from ipyparallel.serialize import uncan 

68 

69 user_ns = globals() 

70 for name in modules: 

71 try: 

72 exec(f'import {name}', user_ns) 

73 except ImportError: 

74 raise UnmetDependency(name) 

75 

76 for name, cobj in mapping.items(): 

77 user_ns[name] = uncan(cobj, user_ns) 

78 return True 

79 

80 

81def require(*objects, **mapping): 

82 """Simple decorator for requiring local objects and modules to be available 

83 when the decorated function is called on the engine. 

84 

85 Modules specified by name or passed directly will be imported 

86 prior to calling the decorated function. 

87 

88 Objects other than modules will be pushed as a part of the task. 

89 Functions can be passed positionally, 

90 and will be pushed to the engine with their __name__. 

91 Other objects can be passed by keyword arg. 

92 

93 Examples 

94 -------- 

95 :: 

96 

97 In [1]: @ipp.require('numpy') 

98 ...: def norm(a): 

99 ...: return numpy.linalg.norm(a,2) 

100 

101 :: 

102 

103 In [2]: foo = lambda x: x*x 

104 In [3]: @ipp.require(foo) 

105 ...: def bar(a): 

106 ...: return foo(1-a) 

107 """ 

108 names = [] 

109 for obj in objects: 

110 if isinstance(obj, ModuleType): 

111 obj = obj.__name__ 

112 

113 if isinstance(obj, str): 

114 names.append(obj) 

115 elif hasattr(obj, '__name__'): 

116 mapping[obj.__name__] = obj 

117 else: 

118 raise TypeError( 

119 "Objects other than modules and functions " 

120 f"must be passed by kwarg, but got: {type(obj)}" 

121 ) 

122 

123 for name, obj in mapping.items(): 

124 mapping[name] = can(obj) 

125 return depend(_require, *names, **mapping) 

126 

127 

128class Dependency(set): 

129 """An object for representing a set of msg_id dependencies. 

130 

131 Subclassed from set(). 

132 

133 Parameters 

134 ---------- 

135 dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict() 

136 The msg_ids to depend on 

137 all : bool [default True] 

138 Whether the dependency should be considered met when *all* depending tasks have completed 

139 or only when *any* have been completed. 

140 success : bool [default True] 

141 Whether to consider successes as fulfilling dependencies. 

142 failure : bool [default False] 

143 Whether to consider failures as fulfilling dependencies. 

144 

145 If `all=success=True` and `failure=False`, then the task will fail with an ImpossibleDependency 

146 as soon as the first depended-upon task fails. 

147 """ 

148 

149 all = True 

150 success = True 

151 failure = True 

152 

153 def __init__(self, dependencies=[], all=True, success=True, failure=False): 

154 if isinstance(dependencies, dict): 

155 # load from dict 

156 all = dependencies.get('all', True) 

157 success = dependencies.get('success', success) 

158 failure = dependencies.get('failure', failure) 

159 dependencies = dependencies.get('dependencies', []) 

160 ids = [] 

161 

162 # extract ids from various sources: 

163 if isinstance(dependencies, (str, AsyncResult)): 

164 dependencies = [dependencies] 

165 for d in dependencies: 

166 if isinstance(d, str): 

167 ids.append(d) 

168 elif isinstance(d, AsyncResult): 

169 ids.extend(d.msg_ids) 

170 else: 

171 raise TypeError(f"invalid dependency type: {type(d)!r}") 

172 

173 set.__init__(self, ids) 

174 self.all = all 

175 if not (success or failure): 

176 raise ValueError("Must depend on at least one of successes or failures!") 

177 self.success = success 

178 self.failure = failure 

179 

180 def check(self, completed, failed=None): 

181 """check whether our dependencies have been met.""" 

182 if len(self) == 0: 

183 return True 

184 against = set() 

185 if self.success: 

186 against = completed 

187 if failed is not None and self.failure: 

188 against = against.union(failed) 

189 if self.all: 

190 return self.issubset(against) 

191 else: 

192 return not self.isdisjoint(against) 

193 

194 def unreachable(self, completed, failed=None): 

195 """return whether this dependency has become impossible.""" 

196 if len(self) == 0: 

197 return False 

198 against = set() 

199 if not self.success: 

200 against = completed 

201 if failed is not None and not self.failure: 

202 against = against.union(failed) 

203 if self.all: 

204 return not self.isdisjoint(against) 

205 else: 

206 return self.issubset(against) 

207 

208 def as_dict(self): 

209 """Represent this dependency as a dict. For json compatibility.""" 

210 return dict( 

211 dependencies=list(self), 

212 all=self.all, 

213 success=self.success, 

214 failure=self.failure, 

215 ) 

216 

217 

218__all__ = ['depend', 'require', 'dependent', 'Dependency']