1"""Remote Functions and decorators for Views."""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5import warnings
6from inspect import signature
7
8from decorator import decorator
9
10from ..serialize import PrePickled
11from . import map as Map
12from .asyncresult import AsyncMapResult
13
14# -----------------------------------------------------------------------------
15# Functions and Decorators
16# -----------------------------------------------------------------------------
17
18
19def remote(view, block=None, **flags):
20 """Turn a function into a remote function.
21
22 This method can be used for map::
23
24 In [1]: @remote(view,block=True)
25 ...: def func(a):
26 ...: pass
27 """
28
29 def remote_function(f):
30 return RemoteFunction(view, f, block=block, **flags)
31
32 return remote_function
33
34
35def parallel(view, dist='b', block=None, ordered=True, **flags):
36 """Turn a function into a parallel remote function.
37
38 This method can be used for map::
39
40 In [1]: @parallel(view, block=True)
41 ...: def func(a):
42 ...: pass
43 """
44
45 def parallel_function(f):
46 return ParallelFunction(
47 view, f, dist=dist, block=block, ordered=ordered, **flags
48 )
49
50 return parallel_function
51
52
53def getname(f):
54 """Get the name of an object.
55
56 For use in case of callables that are not functions, and
57 thus may not have __name__ defined.
58
59 Order: f.__name__ > f.name > str(f)
60 """
61 try:
62 return f.__name__
63 except Exception:
64 pass
65 try:
66 return f.name
67 except Exception:
68 pass
69
70 return str(f)
71
72
73@decorator
74def sync_view_results(f, self, *args, **kwargs):
75 """sync relevant results from self.client to our results attribute.
76
77 This is a clone of view.sync_results, but for remote functions
78 """
79 view = self.view
80 if view._in_sync_results:
81 return f(self, *args, **kwargs)
82 view._in_sync_results = True
83 try:
84 ret = f(self, *args, **kwargs)
85 finally:
86 view._in_sync_results = False
87 view._sync_results()
88 return ret
89
90
91# --------------------------------------------------------------------------
92# Classes
93# --------------------------------------------------------------------------
94
95
96class RemoteFunction:
97 """Turn an existing function into a remote function.
98
99 Parameters
100 ----------
101
102 view : View instance
103 The view to be used for execution
104 f : callable
105 The function to be wrapped into a remote function
106 block : bool [default: None]
107 Whether to wait for results or not. The default behavior is
108 to use the current `block` attribute of `view`
109
110 **flags : remaining kwargs are passed to View.temp_flags
111 """
112
113 view = None # the remote connection
114 func = None # the wrapped function
115 block = None # whether to block
116 flags = None # dict of extra kwargs for temp_flags
117
118 def __init__(self, view, f, block=None, **flags):
119 self.view = view
120 self.func = f
121 self.block = block
122 self.flags = flags
123
124 # copy function attributes for nicer inspection
125 # of decorated functions
126 self.__name__ = getname(f)
127 if getattr(f, '__doc__', None):
128 self.__doc__ = f'{self.__class__.__name__} wrapping:\n{f.__doc__}'
129 if getattr(f, '__signature__', None):
130 self.__signature__ = f.__signature__
131 else:
132 try:
133 self.__signature__ = signature(f)
134 except Exception:
135 # no signature, but that's okay
136 pass
137
138 def __call__(self, *args, **kwargs):
139 block = self.view.block if self.block is None else self.block
140 with self.view.temp_flags(block=block, **self.flags):
141 return self.view.apply(self.func, *args, **kwargs)
142
143
144def _map(f, *sequences):
145 return list(map(f, *sequences))
146
147
148_prepickled_map = None
149
150
151class ParallelFunction(RemoteFunction):
152 """Class for mapping a function to sequences.
153
154 This will distribute the sequences according the a mapper, and call
155 the function on each sub-sequence. If called via map, then the function
156 will be called once on each element, rather that each sub-sequence.
157
158 Parameters
159 ----------
160
161 view : View instance
162 The view to be used for execution
163 f : callable
164 The function to be wrapped into a remote function
165 dist : str [default: 'b']
166 The key for which mapObject to use to distribute sequences
167 options are:
168
169 * 'b' : use contiguous chunks in order
170 * 'r' : use round-robin striping
171
172 block : bool [default: None]
173 Whether to wait for results or not. The default behavior is
174 to use the current `block` attribute of `view`
175 chunksize : int or None
176 The size of chunk to use when breaking up sequences in a load-balanced manner
177 ordered : bool [default: True]
178 Whether the result should be kept in order. If False,
179 results become available as they arrive, regardless of submission order.
180 return_exceptions : bool [default: False]
181 **flags
182 remaining kwargs are passed to View.temp_flags
183 """
184
185 chunksize = None
186 ordered = None
187 mapObject = None
188
189 def __init__(
190 self,
191 view,
192 f,
193 dist='b',
194 block=None,
195 chunksize=None,
196 ordered=True,
197 return_exceptions=False,
198 **flags,
199 ):
200 super().__init__(view, f, block=block, **flags)
201 self.chunksize = chunksize
202 self.ordered = ordered
203 self.return_exceptions = return_exceptions
204
205 mapClass = Map.dists[dist]
206 self.mapObject = mapClass()
207
208 @sync_view_results
209 def __call__(self, *sequences, **kwargs):
210 global _prepickled_map
211 if _prepickled_map is None:
212 _prepickled_map = PrePickled(_map)
213 client = self.view.client
214 _mapping = kwargs.pop('__ipp_mapping', False)
215 if kwargs:
216 raise TypeError(f"Unexpected keyword arguments: {kwargs}")
217
218 lens = []
219 maxlen = minlen = -1
220 for i, seq in enumerate(sequences):
221 try:
222 n = len(seq)
223 except Exception:
224 seq = list(seq)
225 if isinstance(sequences, tuple):
226 # can't alter a tuple
227 sequences = list(sequences)
228 sequences[i] = seq
229 n = len(seq)
230 if n > maxlen:
231 maxlen = n
232 if minlen == -1 or n < minlen:
233 minlen = n
234 lens.append(n)
235
236 if maxlen == 0:
237 # nothing to iterate over
238 return []
239
240 # check that the length of sequences match
241 if not _mapping and minlen != maxlen:
242 msg = f'all sequences must have equal length, but have {lens}'
243 raise ValueError(msg)
244
245 balanced = 'Balanced' in self.view.__class__.__name__
246 if balanced:
247 if self.chunksize:
248 nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0)
249 else:
250 nparts = maxlen
251 targets = [None] * nparts
252 else:
253 if self.chunksize:
254 warnings.warn(
255 "`chunksize` is ignored unless load balancing", UserWarning
256 )
257 # multiplexed:
258 targets = self.view.targets
259 # 'all' is lazily evaluated at execution time, which is now:
260 if targets == 'all':
261 targets = client._build_targets(targets)[1]
262 elif isinstance(targets, int):
263 # single-engine view, targets must be iterable
264 targets = [targets]
265 nparts = len(targets)
266
267 futures = []
268
269 pf = PrePickled(self.func)
270
271 chunk_sizes = {}
272 chunk_size = 1
273
274 for index, t in enumerate(targets):
275 args = []
276 for seq in sequences:
277 part = self.mapObject.getPartition(seq, index, nparts, maxlen)
278 args.append(part)
279
280 if sum(len(arg) for arg in args) == 0:
281 continue
282
283 if _mapping:
284 chunk_size = min(len(arg) for arg in args)
285
286 args = [PrePickled(arg) for arg in args]
287
288 if _mapping:
289 f = _prepickled_map
290 args = [pf] + args
291 else:
292 f = pf
293
294 view = self.view if balanced else client[t]
295 with view.temp_flags(block=False, **self.flags):
296 ar = view.apply(f, *args)
297 ar.owner = False
298
299 msg_id = ar.msg_ids[0]
300 chunk_sizes[msg_id] = chunk_size
301 futures.extend(ar._children)
302
303 r = AsyncMapResult(
304 self.view.client,
305 futures,
306 self.mapObject,
307 fname=getname(self.func),
308 ordered=self.ordered,
309 return_exceptions=self.return_exceptions,
310 chunk_sizes=chunk_sizes,
311 )
312
313 if self.block:
314 try:
315 return r.get()
316 except KeyboardInterrupt:
317 return r
318 else:
319 return r
320
321 def map(self, *sequences):
322 """call a function on each element of one or more sequence(s) remotely.
323 This should behave very much like the builtin map, but return an AsyncMapResult
324 if self.block is False.
325
326 That means it can take generators (will be cast to lists locally),
327 and mismatched sequence lengths will be padded with None.
328 """
329 return self(*sequences, __ipp_mapping=True)
330
331
332__all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']