1"""AsyncResult objects for the client"""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5import concurrent.futures
6import sys
7import threading
8import time
9import warnings
10from concurrent.futures import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION, Future
11from contextlib import contextmanager
12from datetime import datetime
13from functools import lru_cache, partial
14from itertools import chain, repeat
15from threading import Event
16
17import zmq
18from decorator import decorator
19from IPython import get_ipython
20from IPython.display import display, display_pretty, publish_display_data
21
22from ipyparallel import error
23from ipyparallel.util import _parse_date, compare_datetimes, progress, utcnow
24
25from .futures import MessageFuture, multi_future
26
27
28def _raw_text(s):
29 display_pretty(s, raw=True)
30
31
32_default = object()
33
34# global empty tracker that's always done:
35finished_tracker = zmq.MessageTracker()
36
37
38@decorator
39def check_ready(f, self, *args, **kwargs):
40 """Check ready state prior to calling the method."""
41 self.wait(0)
42 if not self._ready:
43 raise TimeoutError("result not ready")
44 return f(self, *args, **kwargs)
45
46
47_metadata_keys = []
48# threading.TIMEOUT_MAX new in 3.2
49_FOREVER = getattr(threading, 'TIMEOUT_MAX', int(1e6))
50
51
52class AsyncResult(Future):
53 """Class for representing results of non-blocking calls.
54
55 Extends the interfaces of :py:class:`multiprocessing.pool.AsyncResult`
56 and :py:class:`concurrent.futures.Future`.
57 """
58
59 msg_ids = None
60 _targets = None
61 _tracker = None
62 _single_result = False
63 owner = False
64 _last_display_prefix = ""
65 _stream_trailing_newline = True
66 _chunk_sizes = None
67
68 def __init__(
69 self,
70 client,
71 children,
72 fname='unknown',
73 targets=None,
74 owner=False,
75 return_exceptions=False,
76 chunk_sizes=None,
77 ):
78 super().__init__()
79 if not isinstance(children, list):
80 children = [children]
81 self._single_result = True
82 else:
83 self._single_result = False
84
85 self._return_exceptions = return_exceptions
86 self._chunk_sizes = chunk_sizes or {}
87
88 if isinstance(children[0], str):
89 self.msg_ids = children
90 self._children = []
91 else:
92 self._children = children
93 self.msg_ids = [f.msg_id for f in children]
94
95 self._client = client
96 self._fname = fname
97 self._targets = targets
98 self.owner = owner
99
100 self._ready = False
101 self._ready_event = Event()
102 self._output_ready = False
103 self._output_event = Event()
104 self._sent_event = Event()
105 self._success = None
106 if self._children:
107 self._metadata = [f.output.metadata for f in self._children]
108 else:
109 self._metadata = [self._client.metadata[id] for id in self.msg_ids]
110 self._init_futures()
111
112 def _init_futures(self):
113 """Build futures for results and output; hook up callbacks"""
114 if not self._children:
115 for msg_id in self.msg_ids:
116 future = self._client._futures.get(msg_id, None)
117 if not future:
118 result = self._client.results.get(msg_id, _default)
119 # result resides in local cache, construct already-resolved Future
120 if result is not _default:
121 future = MessageFuture(msg_id)
122 future.output = Future()
123 future.output.metadata = self.client.metadata[msg_id]
124 future.set_result(result)
125 future.output.set_result(None)
126 if not future:
127 raise KeyError(f"No Future or result for msg_id: {msg_id}")
128 self._children.append(future)
129
130 self._result_future = multi_future(self._children)
131
132 self._sent_future = multi_future([f.tracker for f in self._children])
133 self._sent_future.add_done_callback(self._handle_sent)
134
135 self._output_future = multi_future(
136 [self._result_future] + [f.output for f in self._children]
137 )
138 # on completion of my constituents, trigger my own resolution
139 self._result_future.add_done_callback(self._resolve_result)
140 self._output_future.add_done_callback(self._resolve_output)
141 self.add_done_callback(self._finalize_result)
142
143 def _iopub_streaming_output_callback(self, eid, msg_future, msg):
144 """Callback for iopub messages registered during AsyncResult.stream_output()"""
145 msg_type = msg['header']['msg_type']
146 ip = get_ipython()
147 if ip is not None:
148 in_kernel = getattr(ip, 'kernel', None) is not None
149 else:
150 in_kernel = False
151
152 if msg_type == 'stream':
153 msg_content = msg['content']
154 stream_name = msg_content['name']
155
156 if in_kernel:
157 parent_msg_id = msg.get('parent_header', {}).get('msg_id', '')
158 display_id = f"{parent_msg_id}-{stream_name}"
159 md = msg_future.output.metadata
160 full_stream = md[stream_name]
161 if display_id in self._already_streamed:
162 update = True
163 else:
164 self._already_streamed[display_id] = True
165 update = False
166 publish_display_data(
167 {
168 "text/plain": f"[{stream_name}:{eid}] " + full_stream,
169 },
170 transient={"display_id": display_id},
171 update=update,
172 )
173 return
174 else:
175 stream = getattr(sys, stream_name, sys.stdout)
176 self._display_stream(
177 msg_content['text'],
178 f'[{stream_name}:{eid}] ',
179 file=stream,
180 )
181 elif msg_type == "error":
182 content = msg['content']
183 if 'engine_info' not in content:
184 content['engine_info'] = {
185 "engine_id": msg_future.output.metadata.engine_id,
186 "engine_uuid": msg_future.output.metadata.engine_uuid,
187 # always execute?
188 "method": msg_future.header["msg_type"].partition("_")[0],
189 }
190
191 err = self._client._unwrap_exception(msg['content'])
192 self._streamed_errors += 1
193 if self._streamed_errors <= error.CompositeError.tb_limit:
194 print("\n".join(err.render_traceback()), file=sys.stderr)
195 else:
196 # single-line error after we hit the limit
197 print(err, file=sys.stderr)
198 elif msg_type == "execute_result":
199 # mock ExecuteReply from execute_result on iopub
200 from .client import ExecuteReply
201
202 er = ExecuteReply(
203 msg_id=msg_future.msg_id,
204 content=msg['content'],
205 metadata=msg_future.output.metadata,
206 )
207 display(er)
208
209 if ip is None:
210 return
211
212 if msg_type == 'display_data':
213 msg_content = msg['content']
214 _raw_text(f'[output:{eid}]')
215 self._republish_displaypub(msg_content, eid)
216
217 @contextmanager
218 def stream_output(self):
219 """Stream output for this result as it arrives.
220
221 Returns a context manager, during which output is streamed.
222 """
223
224 # Keep a handle on the futures so we can remove the callback later
225 future_callbacks = {}
226 self._already_streamed = {}
227 self._stream_trailing_newline = True
228 self._last_display_prefix = ""
229 self._streamed_errors = 0
230
231 for eid, msg_future in zip(self._targets, self._children):
232 iopub_callback = partial(
233 self._iopub_streaming_output_callback, eid, msg_future
234 )
235 future_callbacks[msg_future] = iopub_callback
236 md = msg_future.output.metadata
237
238 msg_future.iopub_callbacks.append(iopub_callback)
239 # FIXME: there's still a race here
240 # registering before publishing means possible duplicates,
241 # while after means lost output
242
243 # publish already-captured output immediately
244 for name in ("stdout", "stderr"):
245 text = md[name]
246 if text:
247 iopub_callback(
248 {
249 "header": {"msg_type": "stream"},
250 "content": {"name": name, "text": text},
251 }
252 )
253 for output in md["outputs"]:
254 iopub_callback(
255 {
256 "header": {"msg_type": "display_data"},
257 "content": output,
258 }
259 )
260 if md["execute_result"]:
261 iopub_callback(
262 {
263 "header": {"msg_type": "execute_result"},
264 "content": md["execute_result"],
265 }
266 )
267
268 try:
269 yield
270 finally:
271 # clear stream cache
272 self._already_streamed = {}
273
274 # Remove the callbacks
275 for msg_future, iopub_callback in future_callbacks.items():
276 msg_future.iopub_callbacks.remove(iopub_callback)
277
278 def __repr__(self):
279 if self._ready:
280 if self._success:
281 state = "finished"
282 else:
283 state = "failed"
284 else:
285 state = "pending"
286 return f"<{self.__class__.__name__}({self._fname}): {state}>"
287
288 def __dir__(self):
289 keys = dir(self.__class__)
290 if not _metadata_keys:
291 from .client import Metadata
292
293 _metadata_keys.extend(Metadata().keys())
294 keys.extend(_metadata_keys)
295 return keys
296
297 def _reconstruct_result(self, res):
298 """Reconstruct our result from actual result list (always a list)
299
300 Override me in subclasses for turning a list of results
301 into the expected form.
302 """
303 if self._single_result:
304 return res[0]
305 else:
306 return res
307
308 def get(self, timeout=None, return_exceptions=None, return_when=None):
309 """Return the result when it arrives.
310
311 Arguments:
312
313 timeout : int [default None]
314 If `timeout` is not ``None`` and the result does not arrive within
315 `timeout` seconds then ``TimeoutError`` is raised. If the
316 remote call raised an exception then that exception will be reraised
317 by get() inside a `RemoteError`.
318 return_exceptions : bool [default False]
319 If True, return Exceptions instead of raising them.
320 return_when : None, ALL_COMPLETED, or FIRST_EXCEPTION
321 FIRST_COMPLETED is not supported, and treated the same as ALL_COMPLETED.
322 See :py:func:`concurrent.futures.wait` for documentation.
323
324 When return_when=FIRST_EXCEPTION, will raise immediately on the first exception,
325 rather than waiting for all results to finish before reporting errors.
326
327 .. versionchanged:: 8.0
328 Added `return_when` argument.
329 """
330 if return_when == FIRST_COMPLETED:
331 # FIRST_COMPLETED unsupported, same as ALL_COMPLETED
332 warnings.warn(
333 "Ignoring unsupported AsyncResult.get(return_when=FIRST_COMPLETED)",
334 UserWarning,
335 stacklevel=2,
336 )
337 return_when = None
338 elif return_when == ALL_COMPLETED:
339 # None avoids call to .split() and is a tiny bit more efficient
340 return_when = None
341
342 if not self.ready():
343 wait_result = self.wait(timeout, return_when=return_when)
344
345 if return_exceptions is None:
346 # default to attribute, if AsyncResult was created with return_exceptions=True
347 return_exceptions = self._return_exceptions
348
349 if self._ready:
350 if self._success:
351 return self.result()
352 else:
353 e = self.exception()
354 if return_exceptions:
355 return self._reconstruct_result(self._raw_results)
356 else:
357 raise e
358 else:
359 if return_when == FIRST_EXCEPTION:
360 # this should only occur if there was an exception
361 # any other situation should have triggered the ready branch above
362
363 done, pending = wait_result
364 for ar in done:
365 if not ar._success:
366 return ar.get(return_exceptions=return_exceptions)
367 raise TimeoutError("Result not ready.")
368
369 def _check_ready(self):
370 if not self.ready():
371 raise TimeoutError("Result not ready.")
372
373 def ready(self):
374 """Return whether the call has completed."""
375 if not self._ready:
376 self.wait(0)
377
378 return self._ready
379
380 def wait_for_output(self, timeout=-1):
381 """Wait for our output to be complete.
382
383 AsyncResult.wait only waits for the result,
384 which may arrive before output is complete.
385 """
386 if self._output_ready:
387 return True
388 if timeout and timeout < 0:
389 timeout = None
390 return self._output_event.wait(timeout)
391
392 def _resolve_output(self, f=None):
393 """Callback that fires when outputs are ready"""
394 if self.owner:
395 [self._client.metadata.pop(mid, None) for mid in self.msg_ids]
396 self._output_ready = True
397 self._output_event.set()
398
399 @classmethod
400 def join(cls, *async_results):
401 """Join multiple AsyncResults into one
402
403 Inverse of .split(),
404 used for rejoining split results in wait.
405
406 .. versionadded:: 8.0
407 """
408 if not async_results:
409 raise ValueError("Must specify at least one AsyncResult to join")
410 first = async_results[0]
411 if len(async_results) == 1:
412 # only one AsyncResult, nothing to join
413 return first
414
415 return cls(
416 client=first._client,
417 fname=first._fname,
418 return_exceptions=first._return_exceptions,
419 children=list(chain(*(ar._children for ar in async_results))),
420 targets=list(chain(*(ar._targets for ar in async_results))),
421 owner=False,
422 )
423
424 @lru_cache
425 def split(self):
426 """Split an AsyncResult
427
428 An AsyncResult object that represents multiple messages
429 can be split to wait for individual results
430 This can be passed to `concurrent.futures.wait` and friends
431 to get partial results.
432
433 .. versionadded:: 8.0
434 """
435 if len(self._children) == 1:
436 # nothing to do if we're already representing a single message
437 return (self,)
438 self.owner = False
439
440 if self._targets is None:
441 _targets = repeat(None)
442 else:
443 _targets = self._targets
444
445 flatten = not isinstance(self, AsyncMapResult)
446 return tuple(
447 AsyncResult(
448 client=self._client,
449 children=msg_future if flatten else [msg_future],
450 targets=[engine_id],
451 fname=self._fname,
452 owner=False,
453 return_exceptions=self._return_exceptions,
454 )
455 for engine_id, msg_future in zip(_targets, self._children)
456 )
457
458 def wait(self, timeout=-1, return_when=None):
459 """Wait until the result is available or until `timeout` seconds pass.
460
461 Arguments:
462
463 timeout (int):
464 The timeout in seconds. `-1` or None indicate an infinite timeout.
465 return_when (enum):
466 None, ALL_COMPLETED, FIRST_COMPLETED, or FIRST_EXCEPTION.
467 Passed to :py:func:`concurrent.futures.wait`.
468 If specified and not-None,
469
470 Returns:
471 ready (bool):
472 For backward-compatibility.
473 If `return_when` is None or unspecified,
474 returns True if all tasks are done, False otherwise
475
476 (done, pending):
477 If `return_when` is any of the constants for :py:func:`concurrent.futures.wait`,
478 will return two sets of AsyncResult objects
479 representing the completed and still-pending subsets of results,
480 matching the return value of `wait` itself.
481
482 .. versionchanged:: 8.0
483 Added `return_when`.
484 """
485 if timeout and timeout < 0:
486 timeout = None
487 if return_when is None:
488 if self._ready:
489 return True
490 self._ready_event.wait(timeout)
491 self.wait_for_output(0)
492 return self._ready
493 else:
494 futures = self.split()
495 done, pending = concurrent.futures.wait(
496 futures, timeout=timeout, return_when=return_when
497 )
498 if done:
499 self.wait_for_output(0)
500
501 return done, pending
502
503 # # simple cases: all done, or all pending
504 # if not pending:
505 # return (None, self)
506 # if not done:
507 # return (self, None)
508
509 #
510 # # neither set is empty, rejoin two subsets
511 # return (self.__class__.join(*done), self.__class__.join(*pending))
512
513 def _resolve_result(self, f=None):
514 if self.done():
515 return
516 if f:
517 results = f.result()
518 else:
519 results = list(map(self._client.results.get, self.msg_ids))
520
521 # store raw results
522 self._raw_results = results
523
524 try:
525 if self._single_result:
526 r = results[0]
527 if isinstance(r, Exception):
528 raise r
529 else:
530 results = self._collect_exceptions(results)
531 except Exception as e:
532 self._success = False
533 self.set_exception(e)
534 else:
535 self._success = True
536 self.set_result(self._reconstruct_result(results))
537
538 def _collect_exceptions(self, results):
539 """Wrap Exceptions in a CompositeError
540
541 if self._return_exceptions is True, this is a no-op
542 """
543 if self._return_exceptions:
544 return results
545 else:
546 return error.collect_exceptions(results, self._fname)
547
548 def _finalize_result(self, f):
549 if self.owner:
550 [self._client.results.pop(mid, None) for mid in self.msg_ids]
551 self._ready = True
552 self._ready_event.set()
553
554 def successful(self):
555 """Return whether the call completed without raising an exception.
556
557 Will raise ``RuntimeError`` if the result is not ready.
558 """
559 if not self.ready():
560 raise RuntimeError("Cannot check successful() if not done.")
561 return self._success
562
563 # ----------------------------------------------------------------
564 # Extra methods not in mp.pool.AsyncResult
565 # ----------------------------------------------------------------
566
567 def get_dict(self, timeout=-1):
568 """Get the results as a dict, keyed by engine_id.
569
570 timeout behavior is described in `get()`.
571 """
572
573 results = self.get(timeout)
574 if self._single_result:
575 results = [results]
576 engine_ids = [md['engine_id'] for md in self._metadata]
577
578 rdict = {}
579 for engine_id, result in zip(engine_ids, results):
580 if engine_id in rdict:
581 n_jobs = engine_ids.count(engine_id)
582 raise ValueError(
583 f"Cannot build dict, {n_jobs} jobs ran on engine #{engine_id}"
584 )
585 else:
586 rdict[engine_id] = result
587
588 return rdict
589
590 @property
591 def r(self):
592 """result property wrapper for `get(timeout=-1)`."""
593 return self.get()
594
595 _DATE_FIELDS = [
596 "submitted",
597 "started",
598 "completed",
599 "received",
600 ]
601
602 def _parse_metadata_dates(self):
603 """Ensure metadata date fields are parsed on access
604
605 Rather than parsing timestamps from str->dt on receipt,
606 parse on access for compatibility.
607 """
608 for md in self._metadata:
609 for key in self._DATE_FIELDS:
610 if isinstance(md.get(key, None), str):
611 md[key] = _parse_date(md[key])
612
613 @property
614 def metadata(self):
615 """property for accessing execution metadata."""
616 self._parse_metadata_dates()
617 if self._single_result:
618 return self._metadata[0]
619 else:
620 return self._metadata
621
622 @property
623 def result_dict(self):
624 """result property as a dict."""
625 return self.get_dict()
626
627 def __dict__(self):
628 return self.get_dict(0)
629
630 def abort(self):
631 """
632 Abort my tasks, if possible.
633
634 Only tasks that have not started yet can be aborted.
635
636 Raises RuntimeError if already done.
637 """
638 if self.ready():
639 raise RuntimeError("Can't abort, I am already done!")
640 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
641
642 def _handle_sent(self, f):
643 """Resolve sent Future, build MessageTracker"""
644 trackers = f.result()
645 trackers = [t for t in trackers if t is not None]
646 self._tracker = zmq.MessageTracker(*trackers)
647 self._sent_event.set()
648
649 @property
650 def sent(self):
651 """check whether my messages have been sent."""
652 return self._sent_event.is_set() and self._tracker.done
653
654 def wait_for_send(self, timeout=-1):
655 """wait for pyzmq send to complete.
656
657 This is necessary when sending arrays that you intend to edit in-place.
658 `timeout` is in seconds, and will raise TimeoutError if it is reached
659 before the send completes.
660 """
661 if not self._sent_event.is_set():
662 if timeout and timeout < 0:
663 # Event doesn't like timeout < 0
664 timeout = None
665 elif timeout == 0:
666 raise TimeoutError("Still waiting to be sent")
667 # wait for Future to indicate send having been called,
668 # which means MessageTracker is ready.
669 tic = time.perf_counter()
670 if not self._sent_event.wait(timeout):
671 raise TimeoutError("Still waiting to be sent")
672 if timeout:
673 timeout = max(0, timeout - (time.perf_counter() - tic))
674 try:
675 if timeout is None:
676 # MessageTracker doesn't like timeout=None
677 timeout = -1
678 return self._tracker.wait(timeout)
679 except zmq.NotDone:
680 raise TimeoutError("Still waiting to be sent")
681
682 # -------------------------------------
683 # dict-access
684 # -------------------------------------
685
686 def __getitem__(self, key):
687 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str."""
688 if isinstance(key, int):
689 self._check_ready()
690 return self._collect_exceptions([self.result()[key]])[0]
691 elif isinstance(key, slice):
692 self._check_ready()
693 return self._collect_exceptions(self.result()[key])
694 elif isinstance(key, str):
695 # metadata proxy *does not* require that results are done
696 self.wait(0)
697 self.wait_for_output(0)
698 self._parse_metadata_dates()
699 values = [md[key] for md in self._metadata]
700 if self._single_result:
701 return values[0]
702 else:
703 return values
704 else:
705 raise TypeError(
706 f"Invalid key type {type(key)!r}, must be 'int','slice', or 'str'"
707 )
708
709 def __getattr__(self, key):
710 """getattr maps to getitem for convenient attr access to metadata."""
711 try:
712 return self.__getitem__(key)
713 except (TimeoutError, KeyError):
714 raise AttributeError(
715 f"{self.__class__.__name__!r} object has no attribute {key!r}"
716 )
717
718 @staticmethod
719 def _wait_for_child(child, evt, timeout=_FOREVER):
720 """Wait for a child to be done"""
721 if child.done():
722 return
723 evt.clear()
724 child.add_done_callback(lambda f: evt.set())
725 evt.wait(timeout)
726
727 # asynchronous iterator:
728 def __iter__(self):
729 if self._single_result:
730 raise TypeError("AsyncResults with a single result are not iterable.")
731 try:
732 rlist = self.get(0)
733 except TimeoutError:
734 # wait for each result individually
735 evt = Event()
736 for child in self._children:
737 self._wait_for_child(child, evt=evt)
738 result = child.result()
739 self._collect_exceptions([result])
740 yield result
741 else:
742 # already done
743 yield from rlist
744
745 @lru_cache
746 def __len__(self):
747 return self._count_chunks(*self.msg_ids)
748
749 @lru_cache
750 def _count_chunks(self, *msg_ids):
751 """Count the granular tasks"""
752 return sum(self._chunk_sizes.setdefault(msg_id, 1) for msg_id in msg_ids)
753
754 # -------------------------------------
755 # Sugar methods and attributes
756 # -------------------------------------
757
758 def timedelta(self, start, end, start_key=min, end_key=max):
759 """compute the difference between two sets of timestamps
760
761 The default behavior is to use the earliest of the first
762 and the latest of the second list, but this can be changed
763 by passing a different
764
765 Parameters
766 ----------
767 start : one or more datetime objects (e.g. ar.submitted)
768 end : one or more datetime objects (e.g. ar.received)
769 start_key : callable
770 Function to call on `start` to extract the relevant
771 entry [default: min]
772 end_key : callable
773 Function to call on `end` to extract the relevant
774 entry [default: max]
775
776 Returns
777 -------
778 dt : float
779 The time elapsed (in seconds) between the two selected timestamps.
780 """
781 if not isinstance(start, datetime):
782 # handle single_result AsyncResults, where ar.stamp is single object,
783 # not a list
784 start = start_key(start)
785 if not isinstance(end, datetime):
786 # handle single_result AsyncResults, where ar.stamp is single object,
787 # not a list
788 end = end_key(end)
789 return compare_datetimes(end, start).total_seconds()
790
791 @property
792 def progress(self):
793 """the number of tasks which have been completed at this point.
794
795 Fractional progress would be given by 1.0 * ar.progress / len(ar)
796 """
797 self.wait(0)
798 finished_msg_ids = set(self.msg_ids).intersection(self._client.outstanding)
799 finished_count = self._count_chunks(*finished_msg_ids)
800 return len(self) - finished_count
801
802 @property
803 def elapsed(self):
804 """elapsed time since initial submission"""
805 if self.ready():
806 return self.wall_time
807
808 now = submitted = utcnow()
809 self._parse_metadata_dates()
810 for md in self._metadata:
811 stamp = md["submitted"]
812 if stamp and stamp < submitted:
813 submitted = stamp
814 return compare_datetimes(now, submitted).total_seconds()
815
816 @property
817 @check_ready
818 def serial_time(self):
819 """serial computation time of a parallel calculation
820
821 Computed as the sum of (completed-started) of each task
822 """
823 t = 0
824 self._parse_metadata_dates()
825 for md in self._metadata:
826 t += compare_datetimes(md['completed'], md['started']).total_seconds()
827 return t
828
829 @property
830 @check_ready
831 def wall_time(self):
832 """actual computation time of a parallel calculation
833
834 Computed as the time between the latest `received` stamp
835 and the earliest `submitted`.
836
837 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
838 """
839 return self.timedelta(self.submitted, self.received)
840
841 def wait_interactive(
842 self, interval=0.1, timeout=-1, widget=None, return_when=ALL_COMPLETED
843 ):
844 """interactive wait, printing progress at regular intervals.
845
846 Parameters
847 ----------
848 interval : float
849 Interval on which to update progress display.
850 timeout : float
851 Time (in seconds) to wait before raising a TimeoutError.
852 -1 (default) means no timeout.
853 widget : bool
854 default: True if in an IPython kernel (notebook), False otherwise.
855 Override default context-detection behavior for whether a widget-based progress bar
856 should be used.
857 return_when : concurrent.futures.ALL_COMPLETED | FIRST_EXCEPTION | FIRST_COMPLETED
858 """
859 if timeout and timeout < 0:
860 timeout = None
861 if return_when == ALL_COMPLETED:
862 return_when = None
863 N = len(self)
864 tic = time.perf_counter()
865 progress_bar = progress(widget=widget, total=N, unit='tasks', desc=self._fname)
866
867 finished = self.ready()
868 while not finished and (
869 timeout is None or time.perf_counter() - tic <= timeout
870 ):
871 wait_result = self.wait(interval, return_when=return_when)
872 progress_bar.update(self.progress - progress_bar.n)
873 if return_when is None:
874 finished = wait_result
875 else:
876 done, pending = wait_result
877 if return_when == FIRST_COMPLETED:
878 finished = bool(done)
879 elif return_when == FIRST_EXCEPTION:
880 finished = (not pending) or any(not ar._success for ar in done)
881 else:
882 raise ValueError(f"Unrecognized return_when={return_when!r}")
883
884 progress_bar.update(self.progress - progress_bar.n)
885 progress_bar.close()
886
887 def _republish_displaypub(self, content, eid):
888 """republish individual displaypub content dicts"""
889 ip = get_ipython()
890 if ip is None:
891 # displaypub is meaningless outside IPython
892 return
893 md = content['metadata'] or {}
894 md['engine'] = eid
895 ip.display_pub.publish(data=content['data'], metadata=md)
896
897 def _display_stream(self, text, prefix='', file=None):
898 """Redisplay a stream"""
899 if not text:
900 # nothing to display
901 return
902 if file is None:
903 file = sys.stdout
904
905 end = ""
906 if prefix:
907 if prefix == self._last_display_prefix:
908 # same prefix, no need to re-display
909 prefix = ""
910 else:
911 self._last_display_prefix = prefix
912
913 if prefix and not self._stream_trailing_newline:
914 # prefix changed, no trailing newline; insert newline
915 pre = "\n"
916 else:
917 pre = ""
918
919 if prefix:
920 sep = "\n"
921 else:
922 sep = ""
923
924 self._stream_trailing_newline = text.endswith("\n")
925 print(f"{pre}{prefix}{sep}{text}", file=file, end="")
926
927 def _display_single_result(self, result_only=False):
928 if not result_only:
929 self._display_stream(self.stdout)
930 self._display_stream(self.stderr, file=sys.stderr)
931 if get_ipython() is None:
932 # displaypub is meaningless outside IPython
933 return
934
935 if not result_only:
936 for output in self.outputs:
937 self._republish_displaypub(output, self.engine_id)
938
939 if self.execute_result is not None:
940 display(self.get())
941
942 @check_ready
943 def display_outputs(self, groupby="type", result_only=False):
944 """republish the outputs of the computation
945
946 Parameters
947 ----------
948 groupby : str [default: type]
949 if 'type':
950 Group outputs by type (show all stdout, then all stderr, etc.):
951
952 [stdout:1] foo
953 [stdout:2] foo
954 [stderr:1] bar
955 [stderr:2] bar
956 if 'engine':
957 Display outputs for each engine before moving on to the next:
958
959 [stdout:1] foo
960 [stderr:1] bar
961 [stdout:2] foo
962 [stderr:2] bar
963
964 if 'order':
965 Like 'type', but further collate individual displaypub
966 outputs. This is meant for cases of each command producing
967 several plots, and you would like to see all of the first
968 plots together, then all of the second plots, and so on.
969
970 result_only: boolean [default: False]
971 Only display the execution result and skip stdout, stderr and
972 display-outputs. Usually used when using streaming output
973 since these outputs would have already been displayed.
974 """
975 self.wait_for_output()
976 if self._single_result:
977 self._display_single_result(result_only=result_only)
978 return
979
980 stdouts = self.stdout
981 stderrs = self.stderr
982 execute_results = self.execute_result
983 output_lists = self.outputs
984 results = self.get(return_exceptions=True)
985
986 targets = self.engine_id
987
988 if groupby == "engine":
989 for eid, stdout, stderr, outputs, r, execute_result in zip(
990 targets, stdouts, stderrs, output_lists, results, execute_results
991 ):
992 if not result_only:
993 self._display_stream(stdout, f'[stdout:{eid}] ')
994 self._display_stream(stderr, f'[stderr:{eid}] ', file=sys.stderr)
995
996 if get_ipython() is None:
997 # displaypub is meaningless outside IPython
998 continue
999
1000 if (outputs and not result_only) or execute_result is not None:
1001 _raw_text(f'[output:{eid}]')
1002
1003 if not result_only:
1004 for output in outputs:
1005 self._republish_displaypub(output, eid)
1006
1007 if execute_result is not None:
1008 display(r)
1009
1010 elif groupby in ('type', 'order'):
1011 if not result_only:
1012 # republish stdout:
1013 for eid, stdout in zip(targets, stdouts):
1014 self._display_stream(stdout, f'[stdout:{eid}] ')
1015
1016 # republish stderr:
1017 for eid, stderr in zip(targets, stderrs):
1018 self._display_stream(stderr, f'[stderr:{eid}] ', file=sys.stderr)
1019
1020 if get_ipython() is None:
1021 # displaypub is meaningless outside IPython
1022 return
1023
1024 if not result_only:
1025 if groupby == 'order':
1026 output_dict = {
1027 eid: outputs for eid, outputs in zip(targets, output_lists)
1028 }
1029 N = max(len(outputs) for outputs in output_lists)
1030 for i in range(N):
1031 for eid in targets:
1032 outputs = output_dict[eid]
1033 if len(outputs) >= N:
1034 _raw_text(f'[output:{eid}]')
1035 self._republish_displaypub(outputs[i], eid)
1036 else:
1037 # republish displaypub output
1038 for eid, outputs in zip(targets, output_lists):
1039 if outputs:
1040 _raw_text(f'[output:{eid}]')
1041 for output in outputs:
1042 self._republish_displaypub(output, eid)
1043
1044 # finally, add execute_result:
1045 for eid, r, execute_result in zip(targets, results, execute_results):
1046 if execute_result is not None:
1047 display(r)
1048
1049 else:
1050 raise ValueError(
1051 f"groupby must be one of 'type', 'engine', 'collate', not {groupby!r}"
1052 )
1053
1054
1055class AsyncMapResult(AsyncResult):
1056 """Class for representing results of non-blocking maps.
1057
1058 AsyncMapResult.get() will properly reconstruct gathers into single object.
1059
1060 AsyncMapResult is iterable at any time, and will wait on results as they come.
1061
1062 If ordered=False, then the first results to arrive will come first, otherwise
1063 results will be yielded in the order they were submitted.
1064 """
1065
1066 def __init__(
1067 self,
1068 client,
1069 children,
1070 mapObject,
1071 fname='',
1072 ordered=True,
1073 return_exceptions=False,
1074 chunk_sizes=None,
1075 ):
1076 self._mapObject = mapObject
1077 self.ordered = ordered
1078 AsyncResult.__init__(
1079 self,
1080 client,
1081 children,
1082 fname=fname,
1083 return_exceptions=return_exceptions,
1084 chunk_sizes=chunk_sizes,
1085 )
1086 self._single_result = False
1087
1088 def _reconstruct_result(self, res):
1089 """Perform the gather on the actual results."""
1090 if self._return_exceptions:
1091 if any(isinstance(r, Exception) for r in res):
1092 # running with _return_exceptions,
1093 # cannot reconstruct original
1094 # use simple chain iterable
1095 flattened = []
1096 for r in res:
1097 if isinstance(r, Exception):
1098 flattened.append(r)
1099 else:
1100 flattened.extend(r)
1101 return flattened
1102 return self._mapObject.joinPartitions(res)
1103
1104 # asynchronous iterator:
1105 def __iter__(self):
1106 it = self._ordered_iter if self.ordered else self._unordered_iter
1107 yield from it()
1108
1109 def _yield_child_results(self, child):
1110 """Yield results from a child
1111
1112 for use in iterator methods
1113 """
1114 rlist = child.result()
1115 if not isinstance(rlist, list):
1116 rlist = [rlist]
1117 self._collect_exceptions(rlist)
1118 yield from rlist
1119
1120 # asynchronous ordered iterator:
1121 def _ordered_iter(self):
1122 """iterator for results *as they arrive*, preserving submission order."""
1123 try:
1124 rlist = self.get(0)
1125 except TimeoutError:
1126 # wait for each result individually
1127 evt = Event()
1128 for child in self._children:
1129 self._wait_for_child(child, evt=evt)
1130 yield from self._yield_child_results(child)
1131 else:
1132 # already done
1133 yield from rlist
1134
1135 # asynchronous unordered iterator:
1136 def _unordered_iter(self):
1137 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
1138 try:
1139 rlist = self.get(0)
1140 except TimeoutError:
1141 pending = self._children
1142 while pending:
1143 done, pending = concurrent.futures.wait(
1144 pending, return_when=FIRST_COMPLETED
1145 )
1146 for child in done:
1147 yield from self._yield_child_results(child)
1148 else:
1149 # already done
1150 yield from rlist
1151
1152
1153class AsyncHubResult(AsyncResult):
1154 """Class to wrap pending results that must be requested from the Hub.
1155
1156 Note that waiting/polling on these objects requires polling the Hub over the network,
1157 so use `AsyncHubResult.wait()` sparingly.
1158 """
1159
1160 def _init_futures(self):
1161 """disable Future-based resolution of Hub results"""
1162 pass
1163
1164 def wait(self, timeout=-1, return_when=None):
1165 """wait for result to complete."""
1166 start = time.perf_counter()
1167 if timeout and timeout < 0:
1168 timeout = None
1169 if self._ready:
1170 return True
1171 local_ids = [m for m in self.msg_ids if m in self._client.outstanding]
1172 local_ready = self._client.wait(local_ids, timeout)
1173 if local_ready:
1174 remote_ids = [m for m in self.msg_ids if m not in self._client.results]
1175 if not remote_ids:
1176 self._ready = True
1177 else:
1178 rdict = self._client.result_status(remote_ids, status_only=False)
1179 pending = rdict['pending']
1180 while pending and (
1181 timeout is None or time.perf_counter() < start + timeout
1182 ):
1183 rdict = self._client.result_status(remote_ids, status_only=False)
1184 pending = rdict['pending']
1185 if pending:
1186 time.sleep(0.1)
1187 if not pending:
1188 self._ready = True
1189 if self._ready:
1190 self._output_ready = True
1191 try:
1192 results = list(map(self._client.results.get, self.msg_ids))
1193 if self._single_result:
1194 r = results[0]
1195 if isinstance(r, Exception) and not self._return_exceptions:
1196 raise r
1197 else:
1198 results = self._collect_exceptions(results)
1199 self._success = True
1200 self.set_result(self._reconstruct_result(results))
1201 except Exception as e:
1202 self._success = False
1203 self.set_exception(e)
1204 finally:
1205 if self.owner:
1206 [self._client.metadata.pop(mid) for mid in self.msg_ids]
1207 [self._client.results.pop(mid) for mid in self.msg_ids]
1208
1209 return self._ready
1210
1211
1212__all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']