1"""Views of remote engines."""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5import builtins
6import concurrent.futures
7import inspect
8import secrets
9import threading
10import time
11import warnings
12from collections import deque
13from contextlib import contextmanager
14
15from decorator import decorator
16from IPython import get_ipython
17from traitlets import Any, Bool, CFloat, Dict, HasTraits, Instance, Integer, List, Set
18
19import ipyparallel as ipp
20from ipyparallel import util
21from ipyparallel.controller.dependency import Dependency, dependent
22
23from .. import serialize
24from ..serialize import PrePickled
25from . import map as Map
26from .asyncresult import AsyncMapResult, AsyncResult
27from .remotefunction import ParallelFunction, getname, parallel, remote
28
29# -----------------------------------------------------------------------------
30# Decorators
31# -----------------------------------------------------------------------------
32
33
34@decorator
35def save_ids(f, self, *args, **kwargs):
36 """Keep our history and outstanding attributes up to date after a method call."""
37 n_previous = len(self.client.history)
38 try:
39 ret = f(self, *args, **kwargs)
40 finally:
41 nmsgs = len(self.client.history) - n_previous
42 msg_ids = self.client.history[-nmsgs:]
43 self.history.extend(msg_ids)
44 self.outstanding.update(msg_ids)
45 return ret
46
47
48@decorator
49def sync_results(f, self, *args, **kwargs):
50 """sync relevant results from self.client to our results attribute."""
51 if self._in_sync_results:
52 return f(self, *args, **kwargs)
53 self._in_sync_results = True
54 try:
55 ret = f(self, *args, **kwargs)
56 finally:
57 self._in_sync_results = False
58 self._sync_results()
59 return ret
60
61
62# -----------------------------------------------------------------------------
63# Classes
64# -----------------------------------------------------------------------------
65
66
67class View(HasTraits):
68 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
69
70 Don't use this class, use subclasses.
71
72 Methods
73 -------
74
75 spin
76 flushes incoming results and registration state changes
77 control methods spin, and requesting `ids` also ensures up to date
78
79 wait
80 wait on one or more msg_ids
81
82 execution methods
83 apply
84 legacy: execute, run
85
86 data movement
87 push, pull, scatter, gather
88
89 query methods
90 get_result, queue_status, purge_results, result_status
91
92 control methods
93 abort, shutdown
94
95 """
96
97 # flags
98 block = Bool(False)
99 track = Bool(False)
100 targets = Any()
101
102 history = List()
103 outstanding = Set()
104 results = Dict()
105 client = Instance('ipyparallel.Client', allow_none=True)
106
107 _socket = Any()
108 _flag_names = List(['targets', 'block', 'track'])
109 _in_sync_results = Bool(False)
110 _targets = Any()
111 _idents = Any()
112
113 def __init__(self, client=None, socket=None, **flags):
114 super().__init__(client=client, _socket=socket)
115 self.results = client.results
116 self.block = client.block
117 self.executor = ViewExecutor(self)
118
119 self.set_flags(**flags)
120
121 assert self.__class__ is not View, "Don't use base View objects, use subclasses"
122
123 def __repr__(self):
124 strtargets = str(self.targets)
125 if len(strtargets) > 16:
126 strtargets = strtargets[:12] + '...]'
127 return f"<{self.__class__.__name__} {strtargets}>"
128
129 def __len__(self):
130 if isinstance(self.targets, list):
131 return len(self.targets)
132 elif isinstance(self.targets, int):
133 return 1
134 else:
135 return len(self.client)
136
137 def set_flags(self, **kwargs):
138 """set my attribute flags by keyword.
139
140 Views determine behavior with a few attributes (`block`, `track`, etc.).
141 These attributes can be set all at once by name with this method.
142
143 Parameters
144 ----------
145 block : bool
146 whether to wait for results
147 track : bool
148 whether to create a MessageTracker to allow the user to
149 safely edit after arrays and buffers during non-copying
150 sends.
151 """
152 for name, value in kwargs.items():
153 if name not in self._flag_names:
154 raise KeyError(f"Invalid name: {name!r}")
155 else:
156 setattr(self, name, value)
157
158 @contextmanager
159 def temp_flags(self, **kwargs):
160 """temporarily set flags, for use in `with` statements.
161
162 See set_flags for permanent setting of flags
163
164 Examples
165 --------
166 >>> view.track=False
167 ...
168 >>> with view.temp_flags(track=True):
169 ... ar = view.apply(dostuff, my_big_array)
170 ... ar.tracker.wait() # wait for send to finish
171 >>> view.track
172 False
173
174 """
175 # preflight: save flags, and set temporaries
176 saved_flags = {}
177 for f in self._flag_names:
178 saved_flags[f] = getattr(self, f)
179 self.set_flags(**kwargs)
180 # yield to the with-statement block
181 try:
182 yield
183 finally:
184 # postflight: restore saved flags
185 self.set_flags(**saved_flags)
186
187 # ----------------------------------------------------------------
188 # apply
189 # ----------------------------------------------------------------
190
191 def _sync_results(self):
192 """to be called by @sync_results decorator
193
194 after submitting any tasks.
195 """
196 delta = self.outstanding.difference(self.client.outstanding)
197 completed = self.outstanding.intersection(delta)
198 self.outstanding = self.outstanding.difference(completed)
199
200 @sync_results
201 @save_ids
202 def _really_apply(self, f, args, kwargs, block=None, **options):
203 """wrapper for client.send_apply_request"""
204 raise NotImplementedError("Implement in subclasses")
205
206 def apply(self, __ipp_f, *args, **kwargs):
207 """calls ``f(*args, **kwargs)`` on remote engines, returning the result.
208
209 This method sets all apply flags via this View's attributes.
210
211 Returns :class:`~ipyparallel.client.asyncresult.AsyncResult`
212 instance if ``self.block`` is False, otherwise the return value of
213 ``f(*args, **kwargs)``.
214 """
215 return self._really_apply(__ipp_f, args, kwargs)
216
217 def apply_async(self, __ipp_f, *args, **kwargs):
218 """calls ``f(*args, **kwargs)`` on remote engines in a nonblocking manner.
219
220 Returns :class:`~ipyparallel.client.asyncresult.AsyncResult` instance.
221 """
222 return self._really_apply(__ipp_f, args, kwargs, block=False)
223
224 def apply_sync(self, __ipp_f, *args, **kwargs):
225 """calls ``f(*args, **kwargs)`` on remote engines in a blocking manner,
226 returning the result.
227 """
228 return self._really_apply(__ipp_f, args, kwargs, block=True)
229
230 # ----------------------------------------------------------------
231 # wrappers for client and control methods
232 # ----------------------------------------------------------------
233 @sync_results
234 def spin(self):
235 """spin the client, and sync"""
236 self.client.spin()
237
238 @sync_results
239 def wait(self, jobs=None, timeout=-1):
240 """waits on one or more `jobs`, for up to `timeout` seconds.
241
242 Parameters
243 ----------
244 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
245 ints are indices to self.history
246 strs are msg_ids
247 default: wait on all outstanding messages
248 timeout : float
249 a time in seconds, after which to give up.
250 default is -1, which means no timeout
251
252 Returns
253 -------
254 True : when all msg_ids are done
255 False : timeout reached, some msg_ids still outstanding
256 """
257 if jobs is None:
258 jobs = self.history
259 return self.client.wait(jobs, timeout)
260
261 def abort(self, jobs=None, targets=None, block=None):
262 """Abort jobs on my engines.
263
264 Note: only jobs that have not started yet can be aborted.
265 To halt a running job,
266 you must interrupt the engine(s) via the Cluster API.
267
268 Parameters
269 ----------
270 jobs : None, str, list of strs, optional
271 if None: abort all jobs.
272 else: abort specific msg_id(s).
273 """
274 block = block if block is not None else self.block
275 targets = targets if targets is not None else self.targets
276 jobs = jobs if jobs is not None else list(self.outstanding)
277
278 return self.client.abort(jobs=jobs, targets=targets, block=block)
279
280 def queue_status(self, targets=None, verbose=False):
281 """Fetch the Queue status of my engines"""
282 targets = targets if targets is not None else self.targets
283 return self.client.queue_status(targets=targets, verbose=verbose)
284
285 def purge_results(self, jobs=[], targets=[]):
286 """Instruct the controller to forget specific results."""
287 if targets is None or targets == 'all':
288 targets = self.targets
289 return self.client.purge_results(jobs=jobs, targets=targets)
290
291 def shutdown(self, targets=None, restart=False, hub=False, block=None):
292 """Terminates one or more engine processes, optionally including the hub."""
293 block = self.block if block is None else block
294 if targets is None or targets == 'all':
295 targets = self.targets
296 return self.client.shutdown(
297 targets=targets, restart=restart, hub=hub, block=block
298 )
299
300 def get_result(self, indices_or_msg_ids=None, block=None, owner=False):
301 """return one or more results, specified by history index or msg_id.
302
303 See :meth:`ipyparallel.client.client.Client.get_result` for details.
304 """
305
306 if indices_or_msg_ids is None:
307 indices_or_msg_ids = -1
308 if isinstance(indices_or_msg_ids, int):
309 indices_or_msg_ids = self.history[indices_or_msg_ids]
310 elif isinstance(indices_or_msg_ids, (list, tuple, set)):
311 indices_or_msg_ids = list(indices_or_msg_ids)
312 for i, index in enumerate(indices_or_msg_ids):
313 if isinstance(index, int):
314 indices_or_msg_ids[i] = self.history[index]
315 return self.client.get_result(indices_or_msg_ids, block=block, owner=owner)
316
317 # -------------------------------------------------------------------
318 # Map
319 # -------------------------------------------------------------------
320
321 @sync_results
322 def map(self, f, *sequences, **kwargs):
323 """override in subclasses"""
324 raise NotImplementedError()
325
326 def map_async(self, f, *sequences, **kwargs):
327 """Parallel version of builtin :func:`python:map`, using this view's engines.
328
329 This is equivalent to ``map(...block=False)``.
330
331 See `self.map` for details.
332 """
333 if 'block' in kwargs:
334 raise TypeError("map_async doesn't take a `block` keyword argument.")
335 kwargs['block'] = False
336 return self.map(f, *sequences, **kwargs)
337
338 def map_sync(self, f, *sequences, **kwargs):
339 """Parallel version of builtin :func:`python:map`, using this view's engines.
340
341 This is equivalent to ``map(...block=True)``.
342
343 See `self.map` for details.
344 """
345 if 'block' in kwargs:
346 raise TypeError("map_sync doesn't take a `block` keyword argument.")
347 kwargs['block'] = True
348 return self.map(f, *sequences, **kwargs)
349
350 def imap(self, f, *sequences, **kwargs):
351 """Parallel version of :func:`itertools.imap`.
352
353 See `self.map` for details.
354
355 """
356
357 return iter(self.map_async(f, *sequences, **kwargs))
358
359 # -------------------------------------------------------------------
360 # Decorators
361 # -------------------------------------------------------------------
362
363 def remote(self, block=None, **flags):
364 """Decorator for making a RemoteFunction"""
365 block = self.block if block is None else block
366 return remote(self, block=block, **flags)
367
368 def parallel(self, dist='b', block=None, **flags):
369 """Decorator for making a ParallelFunction"""
370 block = self.block if block is None else block
371 return parallel(self, dist=dist, block=block, **flags)
372
373
374class DirectView(View):
375 """Direct Multiplexer View of one or more engines.
376
377 These are created via indexed access to a client:
378
379 >>> dv_1 = client[1]
380 >>> dv_all = client[:]
381 >>> dv_even = client[::2]
382 >>> dv_some = client[1:3]
383
384 This object provides dictionary access to engine namespaces:
385
386 # push a=5:
387 >>> dv['a'] = 5
388 # pull 'foo':
389 >>> dv['foo']
390
391 """
392
393 def __init__(self, client=None, socket=None, targets=None, **flags):
394 super().__init__(client=client, socket=socket, targets=targets, **flags)
395
396 @property
397 def importer(self):
398 """sync_imports(local=True) as a property.
399
400 See sync_imports for details.
401
402 """
403 return self.sync_imports(True)
404
405 @contextmanager
406 def sync_imports(self, local=True, quiet=False):
407 """Context Manager for performing simultaneous local and remote imports.
408
409 'import x as y' will *not* work. The 'as y' part will simply be ignored.
410
411 If `local=True`, then the package will also be imported locally.
412
413 If `quiet=True`, no output will be produced when attempting remote
414 imports.
415
416 Note that remote-only (`local=False`) imports have not been implemented.
417
418 >>> with view.sync_imports():
419 ... from numpy import recarray
420 importing recarray from numpy on engine(s)
421
422 """
423
424 local_import = builtins.__import__
425 modules = set()
426 results = []
427
428 # get the calling frame
429 # that's two steps up due to `@contextmanager`
430 context_frame = inspect.getouterframes(inspect.currentframe())[2].frame
431
432 @util.interactive
433 def remote_import(name, fromlist, level):
434 """the function to be passed to apply, that actually performs the import
435 on the engine, and loads up the user namespace.
436 """
437 import sys
438
439 user_ns = globals()
440 mod = __import__(name, fromlist=fromlist, level=level)
441 if fromlist:
442 for key in fromlist:
443 user_ns[key] = getattr(mod, key)
444 else:
445 user_ns[name] = sys.modules[name]
446
447 def view_import(name, globals={}, locals={}, fromlist=[], level=0):
448 """the drop-in replacement for __import__, that optionally imports
449 locally as well.
450 """
451 # don't override nested imports
452 save_import = builtins.__import__
453 builtins.__import__ = local_import
454
455 import_frame = inspect.getouterframes(inspect.currentframe())[1].frame
456 if import_frame is not context_frame:
457 # only forward imports from the context frame,
458 # not secondary imports
459 # TODO: does this ever happen, or is the above `__import__` enough?
460 return local_import(name, globals, locals, fromlist, level)
461
462 if local:
463 mod = local_import(name, globals, locals, fromlist, level)
464 else:
465 raise NotImplementedError("remote-only imports not yet implemented")
466
467 key = name + ':' + ','.join(fromlist or [])
468 if level <= 0 and key not in modules:
469 modules.add(key)
470 if not quiet:
471 if fromlist:
472 print(
473 "importing {} from {} on engine(s)".format(
474 ','.join(fromlist), name
475 )
476 )
477 else:
478 print(f"importing {name} on engine(s)")
479 results.append(self.apply_async(remote_import, name, fromlist, level))
480 # restore override
481 builtins.__import__ = save_import
482
483 return mod
484
485 # override __import__
486 builtins.__import__ = view_import
487 try:
488 # enter the block
489 yield
490 except ImportError:
491 if local:
492 raise
493 else:
494 # ignore import errors if not doing local imports
495 pass
496 finally:
497 # always restore __import__
498 builtins.__import__ = local_import
499
500 for r in results:
501 # raise possible remote ImportErrors here
502 r.get()
503
504 def use_dill(self):
505 """Expand serialization support with dill
506
507 adds support for closures, etc.
508
509 This calls ipyparallel.serialize.use_dill() here and on each engine.
510 """
511 serialize.use_dill()
512 return self.apply(serialize.use_dill)
513
514 def use_cloudpickle(self):
515 """Expand serialization support with cloudpickle.
516
517 This calls ipyparallel.serialize.use_cloudpickle() here and on each engine.
518 """
519 serialize.use_cloudpickle()
520 return self.apply(serialize.use_cloudpickle)
521
522 def use_pickle(self):
523 """Restore
524
525 This reverts changes to serialization caused by `use_dill|.cloudpickle`.
526 """
527 serialize.use_pickle()
528 return self.apply(serialize.use_pickle)
529
530 @sync_results
531 @save_ids
532 def _really_apply(
533 self, f, args=None, kwargs=None, targets=None, block=None, track=None
534 ):
535 """calls f(*args, **kwargs) on remote engines, returning the result.
536
537 This method sets all of `apply`'s flags via this View's attributes.
538
539 Parameters
540 ----------
541 f : callable
542 args : list [default: empty]
543 kwargs : dict [default: empty]
544 targets : target list [default: self.targets]
545 where to run
546 block : bool [default: self.block]
547 whether to block
548 track : bool [default: self.track]
549 whether to ask zmq to track the message, for safe non-copying sends
550
551 Returns
552 -------
553 if self.block is False:
554 returns AsyncResult
555 else:
556 returns actual result of f(*args, **kwargs) on the engine(s)
557 This will be a list of self.targets is also a list (even length 1), or
558 the single result if self.targets is an integer engine id
559 """
560 args = [] if args is None else args
561 kwargs = {} if kwargs is None else kwargs
562 block = self.block if block is None else block
563 track = self.track if track is None else track
564 targets = self.targets if targets is None else targets
565
566 _idents, _targets = self.client._build_targets(targets)
567 futures = []
568
569 pf = PrePickled(f)
570 pargs = [PrePickled(arg) for arg in args]
571 pkwargs = {k: PrePickled(v) for k, v in kwargs.items()}
572
573 for ident in _idents:
574 future = self.client.send_apply_request(
575 self._socket, pf, pargs, pkwargs, track=track, ident=ident
576 )
577 futures.append(future)
578 if track:
579 trackers = [_.tracker for _ in futures]
580 else:
581 trackers = []
582 if isinstance(targets, int):
583 futures = futures[0]
584 ar = AsyncResult(
585 self.client, futures, fname=getname(f), targets=_targets, owner=True
586 )
587 if block:
588 try:
589 return ar.get()
590 except KeyboardInterrupt:
591 pass
592 return ar
593
594 @sync_results
595 def map(self, f, *sequences, block=None, track=False, return_exceptions=False):
596 """Parallel version of builtin `map`, using this View's `targets`.
597
598 There will be one task per target, so work will be chunked
599 if the sequences are longer than `targets`.
600
601 Results can be iterated as they are ready, but will become available in chunks.
602
603 .. versionadded:: 7.0
604 `return_exceptions`
605
606 Parameters
607 ----------
608 f : callable
609 function to be mapped
610 *sequences : one or more sequences of matching length
611 the sequences to be distributed and passed to `f`
612 block : bool [default self.block]
613 whether to wait for the result or not
614 track : bool [default False]
615 Track underlying zmq send to indicate when it is safe to modify memory.
616 Only for zero-copy sends such as numpy arrays that are going to be modified in-place.
617 return_exceptions : bool [default False]
618 Return remote Exceptions in the result sequence instead of raising them.
619
620 Returns
621 -------
622 If block=False
623 An :class:`~ipyparallel.client.asyncresult.AsyncMapResult` instance.
624 An object like AsyncResult, but which reassembles the sequence of results
625 into a single list. AsyncMapResults can be iterated through before all
626 results are complete.
627 else
628 A list, the result of ``map(f,*sequences)``
629 """
630
631 if block is None:
632 block = self.block
633
634 assert len(sequences) > 0, "must have some sequences to map onto!"
635 pf = ParallelFunction(
636 self, f, block=block, track=track, return_exceptions=return_exceptions
637 )
638 return pf.map(*sequences)
639
640 @sync_results
641 @save_ids
642 def execute(self, code, silent=True, targets=None, block=None):
643 """Executes `code` on `targets` in blocking or nonblocking manner.
644
645 ``execute`` is always `bound` (affects engine namespace)
646
647 Parameters
648 ----------
649 code : str
650 the code string to be executed
651 block : bool
652 whether or not to wait until done to return
653 default: self.block
654 """
655 block = self.block if block is None else block
656 targets = self.targets if targets is None else targets
657
658 _idents, _targets = self.client._build_targets(targets)
659 futures = []
660 for ident in _idents:
661 future = self.client.send_execute_request(
662 self._socket, code, silent=silent, ident=ident
663 )
664 futures.append(future)
665 if isinstance(targets, int):
666 futures = futures[0]
667 ar = AsyncResult(
668 self.client, futures, fname='execute', targets=_targets, owner=True
669 )
670 if block:
671 try:
672 ar.get()
673 ar.wait_for_output()
674 except KeyboardInterrupt:
675 pass
676 return ar
677
678 def run(self, filename, targets=None, block=None):
679 """Execute contents of `filename` on my engine(s).
680
681 This simply reads the contents of the file and calls `execute`.
682
683 Parameters
684 ----------
685 filename : str
686 The path to the file
687 targets : int/str/list of ints/strs
688 the engines on which to execute
689 default : all
690 block : bool
691 whether or not to wait until done
692 default: self.block
693
694 """
695 with open(filename) as f:
696 # add newline in case of trailing indented whitespace
697 # which will cause SyntaxError
698 code = f.read() + '\n'
699 return self.execute(code, block=block, targets=targets)
700
701 def update(self, ns):
702 """update remote namespace with dict `ns`
703
704 See `push` for details.
705 """
706 return self.push(ns, block=self.block, track=self.track)
707
708 def push(self, ns, targets=None, block=None, track=None):
709 """update remote namespace with dict `ns`
710
711 Parameters
712 ----------
713 ns : dict
714 dict of keys with which to update engine namespace(s)
715 block : bool [default : self.block]
716 whether to wait to be notified of engine receipt
717
718 """
719
720 block = block if block is not None else self.block
721 track = track if track is not None else self.track
722 targets = targets if targets is not None else self.targets
723 # applier = self.apply_sync if block else self.apply_async
724 if not isinstance(ns, dict):
725 raise TypeError(f"Must be a dict, not {type(ns)}")
726 return self._really_apply(
727 util._push, kwargs=ns, block=block, track=track, targets=targets
728 )
729
730 def get(self, key_s):
731 """get object(s) by `key_s` from remote namespace
732
733 see `pull` for details.
734 """
735 # block = block if block is not None else self.block
736 return self.pull(key_s, block=True)
737
738 def pull(self, names, targets=None, block=None):
739 """get object(s) by `name` from remote namespace
740
741 will return one object if it is a key.
742 can also take a list of keys, in which case it will return a list of objects.
743 """
744 block = block if block is not None else self.block
745 targets = targets if targets is not None else self.targets
746 if isinstance(names, str):
747 pass
748 elif isinstance(names, (list, tuple, set)):
749 for key in names:
750 if not isinstance(key, str):
751 raise TypeError(f"keys must be str, not type {type(key)!r}")
752 else:
753 raise TypeError(f"names must be strs, not {names!r}")
754 return self._really_apply(util._pull, (names,), block=block, targets=targets)
755
756 def scatter(
757 self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None
758 ):
759 """
760 Partition a Python sequence and send the partitions to a set of engines.
761 """
762 block = block if block is not None else self.block
763 track = track if track is not None else self.track
764 targets = targets if targets is not None else self.targets
765
766 # construct integer ID list:
767 targets = self.client._build_targets(targets)[1]
768
769 mapObject = Map.dists[dist]()
770 nparts = len(targets)
771 futures = []
772 _lengths = []
773 for index, engineid in enumerate(targets):
774 partition = mapObject.getPartition(seq, index, nparts)
775 if flatten and len(partition) == 1:
776 ns = {key: partition[0]}
777 else:
778 ns = {key: partition}
779 r = self.push(ns, block=False, track=track, targets=engineid)
780 r.owner = False
781 futures.extend(r._children)
782 _lengths.append(len(partition))
783
784 r = AsyncResult(
785 self.client, futures, fname='scatter', targets=targets, owner=True
786 )
787 r._scatter_lengths = _lengths
788 if block:
789 r.wait()
790 else:
791 return r
792
793 @sync_results
794 @save_ids
795 def gather(self, key, dist='b', targets=None, block=None):
796 """
797 Gather a partitioned sequence on a set of engines as a single local seq.
798 """
799 block = block if block is not None else self.block
800 targets = targets if targets is not None else self.targets
801 mapObject = Map.dists[dist]()
802 msg_ids = []
803
804 # construct integer ID list:
805 targets = self.client._build_targets(targets)[1]
806
807 futures = []
808 for index, engineid in enumerate(targets):
809 ar = self.pull(key, block=False, targets=engineid)
810 ar.owner = False
811 futures.extend(ar._children)
812
813 r = AsyncMapResult(self.client, futures, mapObject, fname='gather')
814
815 if block:
816 try:
817 return r.get()
818 except KeyboardInterrupt:
819 pass
820 return r
821
822 def __getitem__(self, key):
823 return self.get(key)
824
825 def __setitem__(self, key, value):
826 self.update({key: value})
827
828 def clear(self, targets=None, block=None):
829 """Clear the remote namespaces on my engines."""
830 block = block if block is not None else self.block
831 targets = targets if targets is not None else self.targets
832 return self.client.clear(targets=targets, block=block)
833
834 # ----------------------------------------
835 # activate for %px, %autopx, etc. magics
836 # ----------------------------------------
837
838 def activate(self, suffix=''):
839 """Activate IPython magics associated with this View
840
841 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig`
842
843 Parameters
844 ----------
845 suffix : str [default: '']
846 The suffix, if any, for the magics. This allows you to have
847 multiple views associated with parallel magics at the same time.
848
849 e.g. ``rc[::2].activate(suffix='_even')`` will give you
850 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics
851 on the even engines.
852 """
853
854 from ipyparallel.client.magics import ParallelMagics
855
856 ip = get_ipython()
857 if ip is None:
858 warnings.warn(
859 "The IPython parallel magics (%px, etc.) only work within IPython."
860 )
861 return
862
863 M = ParallelMagics(ip, self, suffix)
864 ip.magics_manager.register(M)
865
866
867@decorator
868def _not_coalescing(method, self, *args, **kwargs):
869 """Decorator for broadcast methods that can't use reply coalescing"""
870 is_coalescing = self.is_coalescing
871 try:
872 self.is_coalescing = False
873 return method(self, *args, **kwargs)
874 finally:
875 self.is_coalescing = is_coalescing
876
877
878class BroadcastView(DirectView):
879 is_coalescing = Bool(False)
880
881 def _init_metadata(self, target_tuples):
882 """initialize request metadata"""
883 return dict(
884 targets=target_tuples,
885 is_broadcast=True,
886 is_coalescing=self.is_coalescing,
887 )
888
889 def _make_async_result(self, message_future, s_idents, **kwargs):
890 original_msg_id = message_future.msg_id
891 if not self.is_coalescing:
892 futures = []
893 for ident in s_idents:
894 msg_and_target_id = f'{original_msg_id}_{ident}'
895 future = self.client.create_message_futures(
896 msg_and_target_id,
897 message_future.header,
898 async_result=True,
899 track=True,
900 )
901 self.client.outstanding.add(msg_and_target_id)
902 self.client._outstanding_dict[ident].add(msg_and_target_id)
903 self.outstanding.add(msg_and_target_id)
904 futures.append(future[0])
905 if original_msg_id in self.outstanding:
906 self.outstanding.remove(original_msg_id)
907 else:
908 self.client.outstanding.add(original_msg_id)
909 for ident in s_idents:
910 self.client._outstanding_dict[ident].add(original_msg_id)
911 futures = message_future
912
913 ar = AsyncResult(self.client, futures, owner=True, **kwargs)
914
915 if self.is_coalescing:
916 # if coalescing, discard outstanding-tracking when we are done
917 def _rm_outstanding(_):
918 for ident in s_idents:
919 if ident in self.client._outstanding_dict:
920 self.client._outstanding_dict[ident].discard(original_msg_id)
921
922 ar.add_done_callback(_rm_outstanding)
923
924 return ar
925
926 @sync_results
927 @save_ids
928 def _really_apply(
929 self, f, args=None, kwargs=None, block=None, track=None, targets=None
930 ):
931 args = [] if args is None else args
932 kwargs = {} if kwargs is None else kwargs
933 block = self.block if block is None else block
934 track = self.track if track is None else track
935 targets = self.targets if targets is None else targets
936 idents, _targets = self.client._build_targets(targets)
937
938 pf = PrePickled(f)
939 pargs = [PrePickled(arg) for arg in args]
940 pkwargs = {k: PrePickled(v) for k, v in kwargs.items()}
941
942 s_idents = [ident.decode("utf8") for ident in idents]
943 target_tuples = list(zip(s_idents, _targets))
944
945 metadata = self._init_metadata(target_tuples)
946
947 ar = None
948
949 def make_asyncresult(message_future):
950 nonlocal ar
951 ar = self._make_async_result(
952 message_future, s_idents, fname=getname(f), targets=_targets
953 )
954
955 self.client.send_apply_request(
956 self._socket,
957 pf,
958 pargs,
959 pkwargs,
960 track=track,
961 metadata=metadata,
962 message_future_hook=make_asyncresult,
963 )
964
965 if block:
966 try:
967 return ar.get()
968 except KeyboardInterrupt:
969 pass
970 return ar
971
972 @sync_results
973 @save_ids
974 @_not_coalescing
975 def execute(self, code, silent=True, targets=None, block=None):
976 """Executes `code` on `targets` in blocking or nonblocking manner.
977
978 ``execute`` is always `bound` (affects engine namespace)
979
980 Parameters
981 ----------
982 code : str
983 the code string to be executed
984 block : bool
985 whether or not to wait until done to return
986 default: self.block
987 """
988 block = self.block if block is None else block
989 targets = self.targets if targets is None else targets
990
991 _idents, _targets = self.client._build_targets(targets)
992 s_idents = [ident.decode("utf8") for ident in _idents]
993 target_tuples = list(zip(s_idents, _targets))
994
995 metadata = self._init_metadata(target_tuples)
996
997 ar = None
998
999 def make_asyncresult(message_future):
1000 nonlocal ar
1001 ar = self._make_async_result(
1002 message_future, s_idents, fname='execute', targets=_targets
1003 )
1004
1005 message_future = self.client.send_execute_request(
1006 self._socket,
1007 code,
1008 silent=silent,
1009 metadata=metadata,
1010 message_future_hook=make_asyncresult,
1011 )
1012 if block:
1013 try:
1014 ar.get()
1015 ar.wait_for_output()
1016 except KeyboardInterrupt:
1017 pass
1018 return ar
1019
1020 @staticmethod
1021 def _broadcast_map(f, *sequence_names):
1022 """Function passed to apply
1023
1024 Equivalent, but account for the fact that scatter
1025 occurs in a separate step.
1026
1027 Does these things:
1028 - resolve sequence names to sequences in the user namespace
1029 - collect list(map(f, *squences))
1030 - cleanup temporary sequence variables from scatter
1031 """
1032 sequences = []
1033 ip = get_ipython()
1034 for seq_name in sequence_names:
1035 sequences.append(ip.user_ns.pop(seq_name))
1036 return list(map(f, *sequences))
1037
1038 @_not_coalescing
1039 def map(self, f, *sequences, block=None, track=False, return_exceptions=False):
1040 """Parallel version of builtin `map`, using this View's `targets`.
1041
1042 There will be one task per engine, so work will be chunked
1043 if the sequences are longer than `targets`.
1044
1045 Results can be iterated as they are ready, but will become available in chunks.
1046
1047 .. note::
1048
1049 BroadcastView does not yet have a fully native map implementation.
1050 In particular, the scatter step is still one message per engine,
1051 identical to DirectView,
1052 and typically slower due to the more complex scheduler.
1053
1054 It is more efficient to partition inputs via other means (e.g. SPMD based on rank & size)
1055 and use `apply` to submit all tasks in one broadcast.
1056
1057 .. versionadded:: 8.8
1058
1059 Parameters
1060 ----------
1061 f : callable
1062 function to be mapped
1063 *sequences : one or more sequences of matching length
1064 the sequences to be distributed and passed to `f`
1065 block : bool [default self.block]
1066 whether to wait for the result or not
1067 track : bool [default False]
1068 Track underlying zmq send to indicate when it is safe to modify memory.
1069 Only for zero-copy sends such as numpy arrays that are going to be modified in-place.
1070 return_exceptions : bool [default False]
1071 Return remote Exceptions in the result sequence instead of raising them.
1072
1073 Returns
1074 -------
1075 If block=False
1076 An :class:`~ipyparallel.client.asyncresult.AsyncMapResult` instance.
1077 An object like AsyncResult, but which reassembles the sequence of results
1078 into a single list. AsyncMapResults can be iterated through before all
1079 results are complete.
1080 else
1081 A list, the result of ``map(f,*sequences)``
1082 """
1083 if block is None:
1084 block = self.block
1085 if track is None:
1086 track = self.track
1087
1088 # unique identifier, since we're living in the interactive namespace
1089 map_key = secrets.token_hex(5)
1090 dist = 'b'
1091 map_object = Map.dists[dist]()
1092
1093 seq_names = []
1094 for i, seq in enumerate(sequences):
1095 seq_name = f"_seq_{map_key}_{i}"
1096 seq_names.append(seq_name)
1097 try:
1098 len(seq)
1099 except Exception:
1100 # cast length-less sequences (e.g. Range) to list
1101 seq = list(seq)
1102
1103 ar = self.scatter(seq_name, seq, dist=dist, block=False, track=track)
1104 scatter_chunk_sizes = ar._scatter_lengths
1105
1106 # submit the map tasks as an actual broadcast
1107 ar = self.apply(self._broadcast_map, f, *seq_names)
1108 ar.owner = False
1109 # re-wrap messages in an AsyncMapResult to get map API
1110 # this is where the 'gather' reconstruction happens
1111 amr = ipp.AsyncMapResult(
1112 self.client,
1113 ar._children,
1114 map_object,
1115 fname=getname(f),
1116 return_exceptions=return_exceptions,
1117 chunk_sizes={
1118 future.msg_id: chunk_size
1119 for future, chunk_size in zip(ar._children, scatter_chunk_sizes)
1120 },
1121 )
1122
1123 if block:
1124 return amr.get()
1125 else:
1126 return amr
1127
1128 # scatter/gather cannot be coalescing yet
1129 scatter = _not_coalescing(DirectView.scatter)
1130 gather = _not_coalescing(DirectView.gather)
1131
1132
1133class LazyMapIterator:
1134 """Iterable representation of a lazy map (imap)
1135
1136 Has a `.cancel()` method to stop consuming new inputs.
1137
1138 .. versionadded:: 8.0
1139 """
1140
1141 def __init__(self, gen, signal_done):
1142 self._gen = gen
1143 self._signal_done = signal_done
1144
1145 def __iter__(self):
1146 return self._gen
1147
1148 def __next__(self):
1149 return next(self._gen)
1150
1151 def cancel(self):
1152 """Stop consuming the input to the map.
1153
1154 Useful to e.g. stop consuming an infinite (or just large) input
1155 when you've arrived at the result (or error) you needed.
1156 """
1157 self._signal_done()
1158
1159
1160class LoadBalancedView(View):
1161 """An load-balancing View that only executes via the Task scheduler.
1162
1163 Load-balanced views can be created with the client's `view` method:
1164
1165 >>> v = client.load_balanced_view()
1166
1167 or targets can be specified, to restrict the potential destinations:
1168
1169 >>> v = client.load_balanced_view([1,3])
1170
1171 which would restrict loadbalancing to between engines 1 and 3.
1172
1173 """
1174
1175 follow = Any()
1176 after = Any()
1177 timeout = CFloat()
1178 retries = Integer(0)
1179
1180 _task_scheme = Any()
1181 _flag_names = List(
1182 ['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries']
1183 )
1184 _outstanding_maps = Set()
1185
1186 def __init__(self, client=None, socket=None, **flags):
1187 super().__init__(client=client, socket=socket, **flags)
1188 self._task_scheme = client._task_scheme
1189
1190 def _validate_dependency(self, dep):
1191 """validate a dependency.
1192
1193 For use in `set_flags`.
1194 """
1195 if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
1196 return True
1197 elif isinstance(dep, (list, set, tuple)):
1198 for d in dep:
1199 if not isinstance(d, (str, AsyncResult)):
1200 return False
1201 elif isinstance(dep, dict):
1202 if set(dep.keys()) != set(Dependency().as_dict().keys()):
1203 return False
1204 if not isinstance(dep['msg_ids'], list):
1205 return False
1206 for d in dep['msg_ids']:
1207 if not isinstance(d, str):
1208 return False
1209 else:
1210 return False
1211
1212 return True
1213
1214 def _render_dependency(self, dep):
1215 """helper for building jsonable dependencies from various input forms."""
1216 if isinstance(dep, Dependency):
1217 return dep.as_dict()
1218 elif isinstance(dep, AsyncResult):
1219 return dep.msg_ids
1220 elif dep is None:
1221 return []
1222 else:
1223 # pass to Dependency constructor
1224 return list(Dependency(dep))
1225
1226 def set_flags(self, **kwargs):
1227 """set my attribute flags by keyword.
1228
1229 A View is a wrapper for the Client's apply method, but with attributes
1230 that specify keyword arguments, those attributes can be set by keyword
1231 argument with this method.
1232
1233 Parameters
1234 ----------
1235 block : bool
1236 whether to wait for results
1237 track : bool
1238 whether to create a MessageTracker to allow the user to
1239 safely edit after arrays and buffers during non-copying
1240 sends.
1241 after : Dependency or collection of msg_ids
1242 Only for load-balanced execution (targets=None)
1243 Specify a list of msg_ids as a time-based dependency.
1244 This job will only be run *after* the dependencies
1245 have been met.
1246 follow : Dependency or collection of msg_ids
1247 Only for load-balanced execution (targets=None)
1248 Specify a list of msg_ids as a location-based dependency.
1249 This job will only be run on an engine where this dependency
1250 is met.
1251 timeout : float/int or None
1252 Only for load-balanced execution (targets=None)
1253 Specify an amount of time (in seconds) for the scheduler to
1254 wait for dependencies to be met before failing with a
1255 DependencyTimeout.
1256 retries : int
1257 Number of times a task will be retried on failure.
1258 """
1259
1260 super().set_flags(**kwargs)
1261 for name in ('follow', 'after'):
1262 if name in kwargs:
1263 value = kwargs[name]
1264 if self._validate_dependency(value):
1265 setattr(self, name, value)
1266 else:
1267 raise ValueError(f"Invalid dependency: {value!r}")
1268 if 'timeout' in kwargs:
1269 t = kwargs['timeout']
1270 if not isinstance(t, (int, float, type(None))):
1271 raise TypeError(f"Invalid type for timeout: {type(t)!r}")
1272 if t is not None:
1273 if t < 0:
1274 raise ValueError(f"Invalid timeout: {t}")
1275
1276 self.timeout = t
1277
1278 @sync_results
1279 @save_ids
1280 def _really_apply(
1281 self,
1282 f,
1283 args=None,
1284 kwargs=None,
1285 block=None,
1286 track=None,
1287 after=None,
1288 follow=None,
1289 timeout=None,
1290 targets=None,
1291 retries=None,
1292 ):
1293 """calls f(*args, **kwargs) on a remote engine, returning the result.
1294
1295 This method temporarily sets all of `apply`'s flags for a single call.
1296
1297 Parameters
1298 ----------
1299 f : callable
1300 args : list [default: empty]
1301 kwargs : dict [default: empty]
1302 block : bool [default: self.block]
1303 whether to block
1304 track : bool [default: self.track]
1305 whether to ask zmq to track the message, for safe non-copying sends
1306 !!!!!! TODO : THE REST HERE !!!!
1307
1308 Returns
1309 -------
1310 if self.block is False:
1311 returns AsyncResult
1312 else:
1313 returns actual result of f(*args, **kwargs) on the engine(s)
1314 This will be a list of self.targets is also a list (even length 1), or
1315 the single result if self.targets is an integer engine id
1316 """
1317
1318 # validate whether we can run
1319 if self._socket.closed():
1320 msg = "Task farming is disabled"
1321 if self._task_scheme == 'pure':
1322 msg += " because the pure ZMQ scheduler cannot handle"
1323 msg += " disappearing engines."
1324 raise RuntimeError(msg)
1325
1326 if self._task_scheme == 'pure':
1327 # pure zmq scheme doesn't support extra features
1328 msg = "Pure ZMQ scheduler doesn't support the following flags:"
1329 "follow, after, retries, targets, timeout"
1330 if follow or after or retries or targets or timeout:
1331 # hard fail on Scheduler flags
1332 raise RuntimeError(msg)
1333 if isinstance(f, dependent):
1334 # soft warn on functional dependencies
1335 warnings.warn(msg, RuntimeWarning)
1336
1337 # build args
1338 args = [] if args is None else args
1339 kwargs = {} if kwargs is None else kwargs
1340 block = self.block if block is None else block
1341 track = self.track if track is None else track
1342 after = self.after if after is None else after
1343 retries = self.retries if retries is None else retries
1344 follow = self.follow if follow is None else follow
1345 timeout = self.timeout if timeout is None else timeout
1346 targets = self.targets if targets is None else targets
1347
1348 if not isinstance(retries, int):
1349 raise TypeError(f'retries must be int, not {type(retries)!r}')
1350
1351 if targets is None:
1352 idents = []
1353 else:
1354 idents = self.client._build_targets(targets)[0]
1355 # ensure *not* bytes
1356 idents = [ident.decode() for ident in idents]
1357
1358 after = self._render_dependency(after)
1359 follow = self._render_dependency(follow)
1360 metadata = dict(
1361 after=after, follow=follow, timeout=timeout, targets=idents, retries=retries
1362 )
1363
1364 future = self.client.send_apply_request(
1365 self._socket, f, args, kwargs, track=track, metadata=metadata
1366 )
1367
1368 ar = AsyncResult(
1369 self.client,
1370 future,
1371 fname=getname(f),
1372 targets=None,
1373 owner=True,
1374 )
1375 if block:
1376 try:
1377 return ar.get()
1378 except KeyboardInterrupt:
1379 pass
1380 return ar
1381
1382 @sync_results
1383 @save_ids
1384 def map(
1385 self,
1386 f,
1387 *sequences,
1388 block=None,
1389 chunksize=1,
1390 ordered=True,
1391 return_exceptions=False,
1392 ):
1393 """Parallel version of builtin `map`, load-balanced by this View.
1394
1395 Each `chunksize` elements will be a separate task, and will be
1396 load-balanced. This lets individual elements be available for iteration
1397 as soon as they arrive.
1398
1399 .. versionadded:: 7.0
1400 `return_exceptions`
1401
1402 Parameters
1403 ----------
1404 f : callable
1405 function to be mapped
1406 *sequences : one or more sequences of matching length
1407 the sequences to be distributed and passed to `f`
1408 block : bool [default self.block]
1409 whether to wait for the result or not
1410 chunksize : int [default 1]
1411 how many elements should be in each task.
1412 ordered : bool [default True]
1413 Whether the results should be gathered as they arrive, or enforce
1414 the order of submission.
1415
1416 Only applies when iterating through AsyncMapResult as results arrive.
1417 Has no effect when block=True.
1418
1419 return_exceptions: bool [default False]
1420 Return Exceptions instead of raising on the first exception.
1421
1422 Returns
1423 -------
1424 if block=False
1425 An :class:`~ipyparallel.client.asyncresult.AsyncMapResult` instance.
1426 An object like AsyncResult, but which reassembles the sequence of results
1427 into a single list. AsyncMapResults can be iterated through before all
1428 results are complete.
1429 else
1430 A list, the result of ``map(f,*sequences)``
1431 """
1432
1433 # default
1434 if block is None:
1435 block = self.block
1436
1437 assert len(sequences) > 0, "must have some sequences to map onto!"
1438
1439 pf = ParallelFunction(
1440 self,
1441 f,
1442 block=block,
1443 chunksize=chunksize,
1444 ordered=ordered,
1445 return_exceptions=return_exceptions,
1446 )
1447 return pf.map(*sequences)
1448
1449 def imap(
1450 self,
1451 f,
1452 *sequences,
1453 ordered=True,
1454 max_outstanding='auto',
1455 return_exceptions=False,
1456 ):
1457 """Parallel version of lazily-evaluated `imap`, load-balanced by this View.
1458
1459 `ordered`, and `max_outstanding` can be specified by keyword only.
1460
1461 Unlike other map functions in IPython Parallel,
1462 this one does not consume the full iterable before submitting work,
1463 returning a single 'AsyncMapResult' representing the full computation.
1464
1465 Instead, it consumes iterables as they come, submitting up to `max_outstanding`
1466 tasks to the cluster before waiting on results (default: one task per engine).
1467 This allows it to work with infinite generators,
1468 and avoid potentially expensive read-ahead for large streams of inputs
1469 that may not fit in memory all at once.
1470
1471 .. versionadded:: 7.0
1472
1473 Parameters
1474 ----------
1475 f : callable
1476 function to be mapped
1477 *sequences : one or more sequences of matching length
1478 the sequences to be distributed and passed to `f`
1479 ordered : bool [default True]
1480 Whether the results should be yielded on a first-come-first-yield basis,
1481 or preserve the order of submission.
1482
1483 max_outstanding : int [default len(engines)]
1484 The maximum number of tasks to be outstanding.
1485
1486 max_outstanding=0 will greedily consume the whole generator
1487 (map_async may be more efficient).
1488
1489 A limit of 1 should be strictly worse than running a local map,
1490 as there will be no parallelism.
1491
1492 Use this to tune how greedily input generator should be consumed.
1493
1494 return_exceptions : bool [default False]
1495 Return Exceptions instead of raising them.
1496
1497 Returns
1498 -------
1499
1500 lazily-evaluated generator, yielding results of `f` on each item of sequences.
1501 Yield-order depends on `ordered` argument.
1502 """
1503
1504 assert len(sequences) > 0, "must have some sequences to map onto!"
1505
1506 if max_outstanding == 'auto':
1507 max_outstanding = len(self)
1508
1509 pf = PrePickled(f)
1510
1511 map_id = secrets.token_bytes(16)
1512
1513 # record that a map is outstanding, mainly for Executor.shutdown
1514 self._outstanding_maps.add(map_id)
1515
1516 def signal_done():
1517 nonlocal iterator_done
1518 iterator_done = True
1519 self._outstanding_maps.discard(map_id)
1520
1521 outstanding_lock = threading.Lock()
1522
1523 if ordered:
1524 outstanding = deque()
1525 add_outstanding = outstanding.append
1526 else:
1527 outstanding = set()
1528 add_outstanding = outstanding.add
1529
1530 def wait_for_ready():
1531 while not outstanding and not iterator_done:
1532 # no outstanding futures, need to wait for something to wait for
1533 time.sleep(0.1)
1534 if not outstanding:
1535 # nothing to wait for, iterator_done is True
1536 return []
1537
1538 if ordered:
1539 with outstanding_lock:
1540 return [outstanding.popleft()]
1541 else:
1542 # unordered, yield whatever finishes first, as soon as it's ready
1543 # repeat with timeout because the consumer thread may be adding to `outstanding`
1544 with outstanding_lock:
1545 to_wait = outstanding.copy()
1546 done, _ = concurrent.futures.wait(
1547 to_wait,
1548 return_when=concurrent.futures.FIRST_COMPLETED,
1549 timeout=0.5,
1550 )
1551 if done:
1552 with outstanding_lock:
1553 for f in done:
1554 outstanding.remove(f)
1555 return done
1556
1557 arg_iterator = iter(zip(*sequences))
1558 iterator_done = False
1559
1560 # consume inputs in _another_ thread,
1561 # to avoid blocking the IO thread with a possibly blocking generator
1562 # only need one thread for this, though.
1563 consumer_pool = concurrent.futures.ThreadPoolExecutor(1)
1564
1565 def consume_callback(f):
1566 if not iterator_done:
1567 consumer_pool.submit(consume_next)
1568
1569 def consume_next():
1570 """Consume the next call from the argument iterator
1571
1572 If max_outstanding, schedules consumption when the result finishes.
1573 If running with no limit, schedules another consumption immediately.
1574 """
1575 nonlocal iterator_done
1576 if iterator_done:
1577 return
1578
1579 try:
1580 args = next(arg_iterator)
1581 ar = self.apply_async(pf, *args)
1582 except StopIteration:
1583 signal_done()
1584 return
1585 except Exception as e:
1586 # exception consuming iterator, propagate
1587 ar = concurrent.futures.Future()
1588 # mock get so it gets re-raised when awaited
1589 ar.get = lambda *args: ar.result()
1590 ar.set_exception(e)
1591 with outstanding_lock:
1592 add_outstanding(ar)
1593 signal_done()
1594 return
1595
1596 with outstanding_lock:
1597 add_outstanding(ar)
1598 if max_outstanding:
1599 ar.add_done_callback(consume_callback)
1600 else:
1601 consumer_pool.submit(consume_next)
1602
1603 # kick it off
1604 # only need one if not using max_outstanding,
1605 # as each eventloop tick will submit a new item
1606 # otherwise, start one consumer for each slot, which will chain
1607 kickoff_count = 1 if max_outstanding == 0 else max_outstanding
1608 submit_futures = []
1609 for i in range(kickoff_count):
1610 submit_futures.append(consumer_pool.submit(consume_next))
1611
1612 # await the first one, just in case it raises
1613 try:
1614 submit_futures[0].result()
1615 except Exception:
1616 # make sure we clean up
1617 signal_done()
1618 raise
1619 del submit_futures
1620
1621 # wrap result-yielding in another call
1622 # because if this function is itself a generator
1623 # the first submission won't happen until the first result is requested
1624 def iter_results():
1625 nonlocal outstanding
1626 with consumer_pool:
1627 while not iterator_done:
1628 # yield results as they become ready
1629 for ready_ar in wait_for_ready():
1630 yield ready_ar.get(return_exceptions=return_exceptions)
1631
1632 # yield any remaining results
1633 if ordered:
1634 for ar in outstanding:
1635 yield ar.get(return_exceptions=return_exceptions)
1636 else:
1637 while outstanding:
1638 done, outstanding = concurrent.futures.wait(
1639 outstanding, return_when=concurrent.futures.FIRST_COMPLETED
1640 )
1641 for ar in done:
1642 yield ar.get(return_exceptions=return_exceptions)
1643
1644 return LazyMapIterator(iter_results(), signal_done)
1645
1646 def register_joblib_backend(self, name='ipyparallel', make_default=False):
1647 """Register this View as a joblib parallel backend
1648
1649 To make this the default backend, set make_default=True.
1650
1651 Use with::
1652
1653 p = Parallel(backend='ipyparallel')
1654 ...
1655
1656 See joblib docs for details
1657
1658 Requires joblib >= 0.10
1659
1660 .. versionadded:: 5.1
1661 """
1662 from joblib.parallel import register_parallel_backend
1663
1664 from ._joblib import IPythonParallelBackend
1665
1666 register_parallel_backend(
1667 name,
1668 lambda **kwargs: IPythonParallelBackend(view=self, **kwargs),
1669 make_default=make_default,
1670 )
1671
1672
1673class ViewExecutor(concurrent.futures.Executor):
1674 """A PEP-3148 Executor API for Views
1675
1676 Access as view.executor
1677 """
1678
1679 def __init__(self, view):
1680 self.view = view
1681 self._max_workers = len(self.view)
1682
1683 def submit(self, fn, *args, **kwargs):
1684 """Same as View.apply_async"""
1685 return self.view.apply_async(fn, *args, **kwargs)
1686
1687 def map(self, func, *iterables, **kwargs):
1688 """Return generator for View.map_async"""
1689 if 'timeout' in kwargs:
1690 warnings.warn("timeout unsupported in ViewExecutor.map")
1691 kwargs.pop('timeout')
1692 return self.view.imap(func, *iterables, **kwargs)
1693
1694 def shutdown(self, wait=True):
1695 """ViewExecutor does *not* shutdown engines
1696
1697 results are awaited if wait=True, but engines are *not* shutdown.
1698 """
1699 if wait:
1700 # wait for *submission* of outstanding maps,
1701 # otherwise view.wait won't know what to wait for
1702 outstanding_maps = getattr(self.view, "_outstanding_maps")
1703 if outstanding_maps:
1704 while outstanding_maps:
1705 time.sleep(0.1)
1706 self.view.wait()
1707
1708
1709__all__ = ['LoadBalancedView', 'DirectView', 'ViewExecutor', 'BroadcastView']