1"""Dependency utilities"""
2
3from types import ModuleType
4
5from ipyparallel.client.asyncresult import AsyncResult
6from ipyparallel.error import UnmetDependency
7from ipyparallel.serialize import can
8from ipyparallel.util import interactive
9
10
11class depend:
12 """Dependency decorator, for use with tasks.
13
14 `@depend` lets you define a function for engine dependencies
15 just like you use `apply` for tasks.
16
17
18 Examples
19 --------
20 ::
21
22 @depend(df, a,b, c=5)
23 def f(m,n,p)
24
25 view.apply(f, 1,2,3)
26
27 will call df(a,b,c=5) on the engine, and if it returns False or
28 raises an UnmetDependency error, then the task will not be run
29 and another engine will be tried.
30 """
31
32 def __init__(self, _wrapped_f, *args, **kwargs):
33 self.f = _wrapped_f
34 self.args = args
35 self.kwargs = kwargs
36
37 def __call__(self, f):
38 return dependent(f, self.f, *self.args, **self.kwargs)
39
40
41class dependent:
42 """A function that depends on another function.
43 This is an object to prevent the closure used
44 in traditional decorators, which are not picklable.
45 """
46
47 def __init__(self, _wrapped_f, _wrapped_df, *dargs, **dkwargs):
48 self.f = _wrapped_f
49 name = getattr(_wrapped_f, '__name__', 'f')
50 self.__name__ = name
51 self.df = _wrapped_df
52 self.dargs = dargs
53 self.dkwargs = dkwargs
54
55 def check_dependency(self):
56 if self.df(*self.dargs, **self.dkwargs) is False:
57 raise UnmetDependency()
58
59 def __call__(self, *args, **kwargs):
60 return self.f(*args, **kwargs)
61
62
63@interactive
64def _require(*modules, **mapping):
65 """Helper for @require decorator."""
66 from ipyparallel.error import UnmetDependency
67 from ipyparallel.serialize import uncan
68
69 user_ns = globals()
70 for name in modules:
71 try:
72 exec(f'import {name}', user_ns)
73 except ImportError:
74 raise UnmetDependency(name)
75
76 for name, cobj in mapping.items():
77 user_ns[name] = uncan(cobj, user_ns)
78 return True
79
80
81def require(*objects, **mapping):
82 """Simple decorator for requiring local objects and modules to be available
83 when the decorated function is called on the engine.
84
85 Modules specified by name or passed directly will be imported
86 prior to calling the decorated function.
87
88 Objects other than modules will be pushed as a part of the task.
89 Functions can be passed positionally,
90 and will be pushed to the engine with their __name__.
91 Other objects can be passed by keyword arg.
92
93 Examples
94 --------
95 ::
96
97 In [1]: @ipp.require('numpy')
98 ...: def norm(a):
99 ...: return numpy.linalg.norm(a,2)
100
101 ::
102
103 In [2]: foo = lambda x: x*x
104 In [3]: @ipp.require(foo)
105 ...: def bar(a):
106 ...: return foo(1-a)
107 """
108 names = []
109 for obj in objects:
110 if isinstance(obj, ModuleType):
111 obj = obj.__name__
112
113 if isinstance(obj, str):
114 names.append(obj)
115 elif hasattr(obj, '__name__'):
116 mapping[obj.__name__] = obj
117 else:
118 raise TypeError(
119 "Objects other than modules and functions "
120 f"must be passed by kwarg, but got: {type(obj)}"
121 )
122
123 for name, obj in mapping.items():
124 mapping[name] = can(obj)
125 return depend(_require, *names, **mapping)
126
127
128class Dependency(set):
129 """An object for representing a set of msg_id dependencies.
130
131 Subclassed from set().
132
133 Parameters
134 ----------
135 dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict()
136 The msg_ids to depend on
137 all : bool [default True]
138 Whether the dependency should be considered met when *all* depending tasks have completed
139 or only when *any* have been completed.
140 success : bool [default True]
141 Whether to consider successes as fulfilling dependencies.
142 failure : bool [default False]
143 Whether to consider failures as fulfilling dependencies.
144
145 If `all=success=True` and `failure=False`, then the task will fail with an ImpossibleDependency
146 as soon as the first depended-upon task fails.
147 """
148
149 all = True
150 success = True
151 failure = True
152
153 def __init__(self, dependencies=[], all=True, success=True, failure=False):
154 if isinstance(dependencies, dict):
155 # load from dict
156 all = dependencies.get('all', True)
157 success = dependencies.get('success', success)
158 failure = dependencies.get('failure', failure)
159 dependencies = dependencies.get('dependencies', [])
160 ids = []
161
162 # extract ids from various sources:
163 if isinstance(dependencies, (str, AsyncResult)):
164 dependencies = [dependencies]
165 for d in dependencies:
166 if isinstance(d, str):
167 ids.append(d)
168 elif isinstance(d, AsyncResult):
169 ids.extend(d.msg_ids)
170 else:
171 raise TypeError(f"invalid dependency type: {type(d)!r}")
172
173 set.__init__(self, ids)
174 self.all = all
175 if not (success or failure):
176 raise ValueError("Must depend on at least one of successes or failures!")
177 self.success = success
178 self.failure = failure
179
180 def check(self, completed, failed=None):
181 """check whether our dependencies have been met."""
182 if len(self) == 0:
183 return True
184 against = set()
185 if self.success:
186 against = completed
187 if failed is not None and self.failure:
188 against = against.union(failed)
189 if self.all:
190 return self.issubset(against)
191 else:
192 return not self.isdisjoint(against)
193
194 def unreachable(self, completed, failed=None):
195 """return whether this dependency has become impossible."""
196 if len(self) == 0:
197 return False
198 against = set()
199 if not self.success:
200 against = completed
201 if failed is not None and not self.failure:
202 against = against.union(failed)
203 if self.all:
204 return not self.isdisjoint(against)
205 else:
206 return self.issubset(against)
207
208 def as_dict(self):
209 """Represent this dependency as a dict. For json compatibility."""
210 return dict(
211 dependencies=list(self),
212 all=self.all,
213 success=self.success,
214 failure=self.failure,
215 )
216
217
218__all__ = ['depend', 'require', 'dependent', 'Dependency']