Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipyparallel/client/asyncresult.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

617 statements  

1"""AsyncResult objects for the client""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5import concurrent.futures 

6import sys 

7import threading 

8import time 

9import warnings 

10from concurrent.futures import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION, Future 

11from contextlib import contextmanager 

12from datetime import datetime 

13from functools import lru_cache, partial 

14from itertools import chain, repeat 

15from threading import Event 

16 

17import zmq 

18from decorator import decorator 

19from IPython import get_ipython 

20from IPython.display import display, display_pretty, publish_display_data 

21 

22from ipyparallel import error 

23from ipyparallel.util import _parse_date, compare_datetimes, progress, utcnow 

24 

25from .futures import MessageFuture, multi_future 

26 

27 

28def _raw_text(s): 

29 display_pretty(s, raw=True) 

30 

31 

32_default = object() 

33 

34# global empty tracker that's always done: 

35finished_tracker = zmq.MessageTracker() 

36 

37 

38@decorator 

39def check_ready(f, self, *args, **kwargs): 

40 """Check ready state prior to calling the method.""" 

41 self.wait(0) 

42 if not self._ready: 

43 raise TimeoutError("result not ready") 

44 return f(self, *args, **kwargs) 

45 

46 

47_metadata_keys = [] 

48# threading.TIMEOUT_MAX new in 3.2 

49_FOREVER = getattr(threading, 'TIMEOUT_MAX', int(1e6)) 

50 

51 

52class AsyncResult(Future): 

53 """Class for representing results of non-blocking calls. 

54 

55 Extends the interfaces of :py:class:`multiprocessing.pool.AsyncResult` 

56 and :py:class:`concurrent.futures.Future`. 

57 """ 

58 

59 msg_ids = None 

60 _targets = None 

61 _tracker = None 

62 _single_result = False 

63 owner = False 

64 _last_display_prefix = "" 

65 _stream_trailing_newline = True 

66 _chunk_sizes = None 

67 

68 def __init__( 

69 self, 

70 client, 

71 children, 

72 fname='unknown', 

73 targets=None, 

74 owner=False, 

75 return_exceptions=False, 

76 chunk_sizes=None, 

77 ): 

78 super().__init__() 

79 if not isinstance(children, list): 

80 children = [children] 

81 self._single_result = True 

82 else: 

83 self._single_result = False 

84 

85 self._return_exceptions = return_exceptions 

86 self._chunk_sizes = chunk_sizes or {} 

87 

88 if isinstance(children[0], str): 

89 self.msg_ids = children 

90 self._children = [] 

91 else: 

92 self._children = children 

93 self.msg_ids = [f.msg_id for f in children] 

94 

95 self._client = client 

96 self._fname = fname 

97 self._targets = targets 

98 self.owner = owner 

99 

100 self._ready = False 

101 self._ready_event = Event() 

102 self._output_ready = False 

103 self._output_event = Event() 

104 self._sent_event = Event() 

105 self._success = None 

106 if self._children: 

107 self._metadata = [f.output.metadata for f in self._children] 

108 else: 

109 self._metadata = [self._client.metadata[id] for id in self.msg_ids] 

110 self._init_futures() 

111 

112 def _init_futures(self): 

113 """Build futures for results and output; hook up callbacks""" 

114 if not self._children: 

115 for msg_id in self.msg_ids: 

116 future = self._client._futures.get(msg_id, None) 

117 if not future: 

118 result = self._client.results.get(msg_id, _default) 

119 # result resides in local cache, construct already-resolved Future 

120 if result is not _default: 

121 future = MessageFuture(msg_id) 

122 future.output = Future() 

123 future.output.metadata = self.client.metadata[msg_id] 

124 future.set_result(result) 

125 future.output.set_result(None) 

126 if not future: 

127 raise KeyError(f"No Future or result for msg_id: {msg_id}") 

128 self._children.append(future) 

129 

130 self._result_future = multi_future(self._children) 

131 

132 self._sent_future = multi_future([f.tracker for f in self._children]) 

133 self._sent_future.add_done_callback(self._handle_sent) 

134 

135 self._output_future = multi_future( 

136 [self._result_future] + [f.output for f in self._children] 

137 ) 

138 # on completion of my constituents, trigger my own resolution 

139 self._result_future.add_done_callback(self._resolve_result) 

140 self._output_future.add_done_callback(self._resolve_output) 

141 self.add_done_callback(self._finalize_result) 

142 

143 def _iopub_streaming_output_callback(self, eid, msg_future, msg): 

144 """Callback for iopub messages registered during AsyncResult.stream_output()""" 

145 msg_type = msg['header']['msg_type'] 

146 ip = get_ipython() 

147 if ip is not None: 

148 in_kernel = getattr(ip, 'kernel', None) is not None 

149 else: 

150 in_kernel = False 

151 

152 if msg_type == 'stream': 

153 msg_content = msg['content'] 

154 stream_name = msg_content['name'] 

155 

156 if in_kernel: 

157 parent_msg_id = msg.get('parent_header', {}).get('msg_id', '') 

158 display_id = f"{parent_msg_id}-{stream_name}" 

159 md = msg_future.output.metadata 

160 full_stream = md[stream_name] 

161 if display_id in self._already_streamed: 

162 update = True 

163 else: 

164 self._already_streamed[display_id] = True 

165 update = False 

166 publish_display_data( 

167 { 

168 "text/plain": f"[{stream_name}:{eid}] " + full_stream, 

169 }, 

170 transient={"display_id": display_id}, 

171 update=update, 

172 ) 

173 return 

174 else: 

175 stream = getattr(sys, stream_name, sys.stdout) 

176 self._display_stream( 

177 msg_content['text'], 

178 f'[{stream_name}:{eid}] ', 

179 file=stream, 

180 ) 

181 elif msg_type == "error": 

182 content = msg['content'] 

183 if 'engine_info' not in content: 

184 content['engine_info'] = { 

185 "engine_id": msg_future.output.metadata.engine_id, 

186 "engine_uuid": msg_future.output.metadata.engine_uuid, 

187 # always execute? 

188 "method": msg_future.header["msg_type"].partition("_")[0], 

189 } 

190 

191 err = self._client._unwrap_exception(msg['content']) 

192 self._streamed_errors += 1 

193 if self._streamed_errors <= error.CompositeError.tb_limit: 

194 print("\n".join(err.render_traceback()), file=sys.stderr) 

195 else: 

196 # single-line error after we hit the limit 

197 print(err, file=sys.stderr) 

198 elif msg_type == "execute_result": 

199 # mock ExecuteReply from execute_result on iopub 

200 from .client import ExecuteReply 

201 

202 er = ExecuteReply( 

203 msg_id=msg_future.msg_id, 

204 content=msg['content'], 

205 metadata=msg_future.output.metadata, 

206 ) 

207 display(er) 

208 

209 if ip is None: 

210 return 

211 

212 if msg_type == 'display_data': 

213 msg_content = msg['content'] 

214 _raw_text(f'[output:{eid}]') 

215 self._republish_displaypub(msg_content, eid) 

216 

217 @contextmanager 

218 def stream_output(self): 

219 """Stream output for this result as it arrives. 

220 

221 Returns a context manager, during which output is streamed. 

222 """ 

223 

224 # Keep a handle on the futures so we can remove the callback later 

225 future_callbacks = {} 

226 self._already_streamed = {} 

227 self._stream_trailing_newline = True 

228 self._last_display_prefix = "" 

229 self._streamed_errors = 0 

230 

231 for eid, msg_future in zip(self._targets, self._children): 

232 iopub_callback = partial( 

233 self._iopub_streaming_output_callback, eid, msg_future 

234 ) 

235 future_callbacks[msg_future] = iopub_callback 

236 md = msg_future.output.metadata 

237 

238 msg_future.iopub_callbacks.append(iopub_callback) 

239 # FIXME: there's still a race here 

240 # registering before publishing means possible duplicates, 

241 # while after means lost output 

242 

243 # publish already-captured output immediately 

244 for name in ("stdout", "stderr"): 

245 text = md[name] 

246 if text: 

247 iopub_callback( 

248 { 

249 "header": {"msg_type": "stream"}, 

250 "content": {"name": name, "text": text}, 

251 } 

252 ) 

253 for output in md["outputs"]: 

254 iopub_callback( 

255 { 

256 "header": {"msg_type": "display_data"}, 

257 "content": output, 

258 } 

259 ) 

260 if md["execute_result"]: 

261 iopub_callback( 

262 { 

263 "header": {"msg_type": "execute_result"}, 

264 "content": md["execute_result"], 

265 } 

266 ) 

267 

268 try: 

269 yield 

270 finally: 

271 # clear stream cache 

272 self._already_streamed = {} 

273 

274 # Remove the callbacks 

275 for msg_future, iopub_callback in future_callbacks.items(): 

276 msg_future.iopub_callbacks.remove(iopub_callback) 

277 

278 def __repr__(self): 

279 if self._ready: 

280 if self._success: 

281 state = "finished" 

282 else: 

283 state = "failed" 

284 else: 

285 state = "pending" 

286 return f"<{self.__class__.__name__}({self._fname}): {state}>" 

287 

288 def __dir__(self): 

289 keys = dir(self.__class__) 

290 if not _metadata_keys: 

291 from .client import Metadata 

292 

293 _metadata_keys.extend(Metadata().keys()) 

294 keys.extend(_metadata_keys) 

295 return keys 

296 

297 def _reconstruct_result(self, res): 

298 """Reconstruct our result from actual result list (always a list) 

299 

300 Override me in subclasses for turning a list of results 

301 into the expected form. 

302 """ 

303 if self._single_result: 

304 return res[0] 

305 else: 

306 return res 

307 

308 def get(self, timeout=None, return_exceptions=None, return_when=None): 

309 """Return the result when it arrives. 

310 

311 Arguments: 

312 

313 timeout : int [default None] 

314 If `timeout` is not ``None`` and the result does not arrive within 

315 `timeout` seconds then ``TimeoutError`` is raised. If the 

316 remote call raised an exception then that exception will be reraised 

317 by get() inside a `RemoteError`. 

318 return_exceptions : bool [default False] 

319 If True, return Exceptions instead of raising them. 

320 return_when : None, ALL_COMPLETED, or FIRST_EXCEPTION 

321 FIRST_COMPLETED is not supported, and treated the same as ALL_COMPLETED. 

322 See :py:func:`concurrent.futures.wait` for documentation. 

323 

324 When return_when=FIRST_EXCEPTION, will raise immediately on the first exception, 

325 rather than waiting for all results to finish before reporting errors. 

326 

327 .. versionchanged:: 8.0 

328 Added `return_when` argument. 

329 """ 

330 if return_when == FIRST_COMPLETED: 

331 # FIRST_COMPLETED unsupported, same as ALL_COMPLETED 

332 warnings.warn( 

333 "Ignoring unsupported AsyncResult.get(return_when=FIRST_COMPLETED)", 

334 UserWarning, 

335 stacklevel=2, 

336 ) 

337 return_when = None 

338 elif return_when == ALL_COMPLETED: 

339 # None avoids call to .split() and is a tiny bit more efficient 

340 return_when = None 

341 

342 if not self.ready(): 

343 wait_result = self.wait(timeout, return_when=return_when) 

344 

345 if return_exceptions is None: 

346 # default to attribute, if AsyncResult was created with return_exceptions=True 

347 return_exceptions = self._return_exceptions 

348 

349 if self._ready: 

350 if self._success: 

351 return self.result() 

352 else: 

353 e = self.exception() 

354 if return_exceptions: 

355 return self._reconstruct_result(self._raw_results) 

356 else: 

357 raise e 

358 else: 

359 if return_when == FIRST_EXCEPTION: 

360 # this should only occur if there was an exception 

361 # any other situation should have triggered the ready branch above 

362 

363 done, pending = wait_result 

364 for ar in done: 

365 if not ar._success: 

366 return ar.get(return_exceptions=return_exceptions) 

367 raise TimeoutError("Result not ready.") 

368 

369 def _check_ready(self): 

370 if not self.ready(): 

371 raise TimeoutError("Result not ready.") 

372 

373 def ready(self): 

374 """Return whether the call has completed.""" 

375 if not self._ready: 

376 self.wait(0) 

377 

378 return self._ready 

379 

380 def wait_for_output(self, timeout=-1): 

381 """Wait for our output to be complete. 

382 

383 AsyncResult.wait only waits for the result, 

384 which may arrive before output is complete. 

385 """ 

386 if self._output_ready: 

387 return True 

388 if timeout and timeout < 0: 

389 timeout = None 

390 return self._output_event.wait(timeout) 

391 

392 def _resolve_output(self, f=None): 

393 """Callback that fires when outputs are ready""" 

394 if self.owner: 

395 [self._client.metadata.pop(mid, None) for mid in self.msg_ids] 

396 self._output_ready = True 

397 self._output_event.set() 

398 

399 @classmethod 

400 def join(cls, *async_results): 

401 """Join multiple AsyncResults into one 

402 

403 Inverse of .split(), 

404 used for rejoining split results in wait. 

405 

406 .. versionadded:: 8.0 

407 """ 

408 if not async_results: 

409 raise ValueError("Must specify at least one AsyncResult to join") 

410 first = async_results[0] 

411 if len(async_results) == 1: 

412 # only one AsyncResult, nothing to join 

413 return first 

414 

415 return cls( 

416 client=first._client, 

417 fname=first._fname, 

418 return_exceptions=first._return_exceptions, 

419 children=list(chain(*(ar._children for ar in async_results))), 

420 targets=list(chain(*(ar._targets for ar in async_results))), 

421 owner=False, 

422 ) 

423 

424 @lru_cache 

425 def split(self): 

426 """Split an AsyncResult 

427 

428 An AsyncResult object that represents multiple messages 

429 can be split to wait for individual results 

430 This can be passed to `concurrent.futures.wait` and friends 

431 to get partial results. 

432 

433 .. versionadded:: 8.0 

434 """ 

435 if len(self._children) == 1: 

436 # nothing to do if we're already representing a single message 

437 return (self,) 

438 self.owner = False 

439 

440 if self._targets is None: 

441 _targets = repeat(None) 

442 else: 

443 _targets = self._targets 

444 

445 flatten = not isinstance(self, AsyncMapResult) 

446 return tuple( 

447 AsyncResult( 

448 client=self._client, 

449 children=msg_future if flatten else [msg_future], 

450 targets=[engine_id], 

451 fname=self._fname, 

452 owner=False, 

453 return_exceptions=self._return_exceptions, 

454 ) 

455 for engine_id, msg_future in zip(_targets, self._children) 

456 ) 

457 

458 def wait(self, timeout=-1, return_when=None): 

459 """Wait until the result is available or until `timeout` seconds pass. 

460 

461 Arguments: 

462 

463 timeout (int): 

464 The timeout in seconds. `-1` or None indicate an infinite timeout. 

465 return_when (enum): 

466 None, ALL_COMPLETED, FIRST_COMPLETED, or FIRST_EXCEPTION. 

467 Passed to :py:func:`concurrent.futures.wait`. 

468 If specified and not-None, 

469 

470 Returns: 

471 ready (bool): 

472 For backward-compatibility. 

473 If `return_when` is None or unspecified, 

474 returns True if all tasks are done, False otherwise 

475 

476 (done, pending): 

477 If `return_when` is any of the constants for :py:func:`concurrent.futures.wait`, 

478 will return two sets of AsyncResult objects 

479 representing the completed and still-pending subsets of results, 

480 matching the return value of `wait` itself. 

481 

482 .. versionchanged:: 8.0 

483 Added `return_when`. 

484 """ 

485 if timeout and timeout < 0: 

486 timeout = None 

487 if return_when is None: 

488 if self._ready: 

489 return True 

490 self._ready_event.wait(timeout) 

491 self.wait_for_output(0) 

492 return self._ready 

493 else: 

494 futures = self.split() 

495 done, pending = concurrent.futures.wait( 

496 futures, timeout=timeout, return_when=return_when 

497 ) 

498 if done: 

499 self.wait_for_output(0) 

500 

501 return done, pending 

502 

503 # # simple cases: all done, or all pending 

504 # if not pending: 

505 # return (None, self) 

506 # if not done: 

507 # return (self, None) 

508 

509 # 

510 # # neither set is empty, rejoin two subsets 

511 # return (self.__class__.join(*done), self.__class__.join(*pending)) 

512 

513 def _resolve_result(self, f=None): 

514 if self.done(): 

515 return 

516 if f: 

517 results = f.result() 

518 else: 

519 results = list(map(self._client.results.get, self.msg_ids)) 

520 

521 # store raw results 

522 self._raw_results = results 

523 

524 try: 

525 if self._single_result: 

526 r = results[0] 

527 if isinstance(r, Exception): 

528 raise r 

529 else: 

530 results = self._collect_exceptions(results) 

531 except Exception as e: 

532 self._success = False 

533 self.set_exception(e) 

534 else: 

535 self._success = True 

536 self.set_result(self._reconstruct_result(results)) 

537 

538 def _collect_exceptions(self, results): 

539 """Wrap Exceptions in a CompositeError 

540 

541 if self._return_exceptions is True, this is a no-op 

542 """ 

543 if self._return_exceptions: 

544 return results 

545 else: 

546 return error.collect_exceptions(results, self._fname) 

547 

548 def _finalize_result(self, f): 

549 if self.owner: 

550 [self._client.results.pop(mid, None) for mid in self.msg_ids] 

551 self._ready = True 

552 self._ready_event.set() 

553 

554 def successful(self): 

555 """Return whether the call completed without raising an exception. 

556 

557 Will raise ``RuntimeError`` if the result is not ready. 

558 """ 

559 if not self.ready(): 

560 raise RuntimeError("Cannot check successful() if not done.") 

561 return self._success 

562 

563 # ---------------------------------------------------------------- 

564 # Extra methods not in mp.pool.AsyncResult 

565 # ---------------------------------------------------------------- 

566 

567 def get_dict(self, timeout=-1): 

568 """Get the results as a dict, keyed by engine_id. 

569 

570 timeout behavior is described in `get()`. 

571 """ 

572 

573 results = self.get(timeout) 

574 if self._single_result: 

575 results = [results] 

576 engine_ids = [md['engine_id'] for md in self._metadata] 

577 

578 rdict = {} 

579 for engine_id, result in zip(engine_ids, results): 

580 if engine_id in rdict: 

581 n_jobs = engine_ids.count(engine_id) 

582 raise ValueError( 

583 f"Cannot build dict, {n_jobs} jobs ran on engine #{engine_id}" 

584 ) 

585 else: 

586 rdict[engine_id] = result 

587 

588 return rdict 

589 

590 @property 

591 def r(self): 

592 """result property wrapper for `get(timeout=-1)`.""" 

593 return self.get() 

594 

595 _DATE_FIELDS = [ 

596 "submitted", 

597 "started", 

598 "completed", 

599 "received", 

600 ] 

601 

602 def _parse_metadata_dates(self): 

603 """Ensure metadata date fields are parsed on access 

604 

605 Rather than parsing timestamps from str->dt on receipt, 

606 parse on access for compatibility. 

607 """ 

608 for md in self._metadata: 

609 for key in self._DATE_FIELDS: 

610 if isinstance(md.get(key, None), str): 

611 md[key] = _parse_date(md[key]) 

612 

613 @property 

614 def metadata(self): 

615 """property for accessing execution metadata.""" 

616 self._parse_metadata_dates() 

617 if self._single_result: 

618 return self._metadata[0] 

619 else: 

620 return self._metadata 

621 

622 @property 

623 def result_dict(self): 

624 """result property as a dict.""" 

625 return self.get_dict() 

626 

627 def __dict__(self): 

628 return self.get_dict(0) 

629 

630 def abort(self): 

631 """ 

632 Abort my tasks, if possible. 

633 

634 Only tasks that have not started yet can be aborted. 

635 

636 Raises RuntimeError if already done. 

637 """ 

638 if self.ready(): 

639 raise RuntimeError("Can't abort, I am already done!") 

640 return self._client.abort(self.msg_ids, targets=self._targets, block=True) 

641 

642 def _handle_sent(self, f): 

643 """Resolve sent Future, build MessageTracker""" 

644 trackers = f.result() 

645 trackers = [t for t in trackers if t is not None] 

646 self._tracker = zmq.MessageTracker(*trackers) 

647 self._sent_event.set() 

648 

649 @property 

650 def sent(self): 

651 """check whether my messages have been sent.""" 

652 return self._sent_event.is_set() and self._tracker.done 

653 

654 def wait_for_send(self, timeout=-1): 

655 """wait for pyzmq send to complete. 

656 

657 This is necessary when sending arrays that you intend to edit in-place. 

658 `timeout` is in seconds, and will raise TimeoutError if it is reached 

659 before the send completes. 

660 """ 

661 if not self._sent_event.is_set(): 

662 if timeout and timeout < 0: 

663 # Event doesn't like timeout < 0 

664 timeout = None 

665 elif timeout == 0: 

666 raise TimeoutError("Still waiting to be sent") 

667 # wait for Future to indicate send having been called, 

668 # which means MessageTracker is ready. 

669 tic = time.perf_counter() 

670 if not self._sent_event.wait(timeout): 

671 raise TimeoutError("Still waiting to be sent") 

672 if timeout: 

673 timeout = max(0, timeout - (time.perf_counter() - tic)) 

674 try: 

675 if timeout is None: 

676 # MessageTracker doesn't like timeout=None 

677 timeout = -1 

678 return self._tracker.wait(timeout) 

679 except zmq.NotDone: 

680 raise TimeoutError("Still waiting to be sent") 

681 

682 # ------------------------------------- 

683 # dict-access 

684 # ------------------------------------- 

685 

686 def __getitem__(self, key): 

687 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.""" 

688 if isinstance(key, int): 

689 self._check_ready() 

690 return self._collect_exceptions([self.result()[key]])[0] 

691 elif isinstance(key, slice): 

692 self._check_ready() 

693 return self._collect_exceptions(self.result()[key]) 

694 elif isinstance(key, str): 

695 # metadata proxy *does not* require that results are done 

696 self.wait(0) 

697 self.wait_for_output(0) 

698 self._parse_metadata_dates() 

699 values = [md[key] for md in self._metadata] 

700 if self._single_result: 

701 return values[0] 

702 else: 

703 return values 

704 else: 

705 raise TypeError( 

706 f"Invalid key type {type(key)!r}, must be 'int','slice', or 'str'" 

707 ) 

708 

709 def __getattr__(self, key): 

710 """getattr maps to getitem for convenient attr access to metadata.""" 

711 try: 

712 return self.__getitem__(key) 

713 except (TimeoutError, KeyError): 

714 raise AttributeError( 

715 f"{self.__class__.__name__!r} object has no attribute {key!r}" 

716 ) 

717 

718 @staticmethod 

719 def _wait_for_child(child, evt, timeout=_FOREVER): 

720 """Wait for a child to be done""" 

721 if child.done(): 

722 return 

723 evt.clear() 

724 child.add_done_callback(lambda f: evt.set()) 

725 evt.wait(timeout) 

726 

727 # asynchronous iterator: 

728 def __iter__(self): 

729 if self._single_result: 

730 raise TypeError("AsyncResults with a single result are not iterable.") 

731 try: 

732 rlist = self.get(0) 

733 except TimeoutError: 

734 # wait for each result individually 

735 evt = Event() 

736 for child in self._children: 

737 self._wait_for_child(child, evt=evt) 

738 result = child.result() 

739 self._collect_exceptions([result]) 

740 yield result 

741 else: 

742 # already done 

743 yield from rlist 

744 

745 @lru_cache 

746 def __len__(self): 

747 return self._count_chunks(*self.msg_ids) 

748 

749 @lru_cache 

750 def _count_chunks(self, *msg_ids): 

751 """Count the granular tasks""" 

752 return sum(self._chunk_sizes.setdefault(msg_id, 1) for msg_id in msg_ids) 

753 

754 # ------------------------------------- 

755 # Sugar methods and attributes 

756 # ------------------------------------- 

757 

758 def timedelta(self, start, end, start_key=min, end_key=max): 

759 """compute the difference between two sets of timestamps 

760 

761 The default behavior is to use the earliest of the first 

762 and the latest of the second list, but this can be changed 

763 by passing a different 

764 

765 Parameters 

766 ---------- 

767 start : one or more datetime objects (e.g. ar.submitted) 

768 end : one or more datetime objects (e.g. ar.received) 

769 start_key : callable 

770 Function to call on `start` to extract the relevant 

771 entry [default: min] 

772 end_key : callable 

773 Function to call on `end` to extract the relevant 

774 entry [default: max] 

775 

776 Returns 

777 ------- 

778 dt : float 

779 The time elapsed (in seconds) between the two selected timestamps. 

780 """ 

781 if not isinstance(start, datetime): 

782 # handle single_result AsyncResults, where ar.stamp is single object, 

783 # not a list 

784 start = start_key(start) 

785 if not isinstance(end, datetime): 

786 # handle single_result AsyncResults, where ar.stamp is single object, 

787 # not a list 

788 end = end_key(end) 

789 return compare_datetimes(end, start).total_seconds() 

790 

791 @property 

792 def progress(self): 

793 """the number of tasks which have been completed at this point. 

794 

795 Fractional progress would be given by 1.0 * ar.progress / len(ar) 

796 """ 

797 self.wait(0) 

798 finished_msg_ids = set(self.msg_ids).intersection(self._client.outstanding) 

799 finished_count = self._count_chunks(*finished_msg_ids) 

800 return len(self) - finished_count 

801 

802 @property 

803 def elapsed(self): 

804 """elapsed time since initial submission""" 

805 if self.ready(): 

806 return self.wall_time 

807 

808 now = submitted = utcnow() 

809 self._parse_metadata_dates() 

810 for md in self._metadata: 

811 stamp = md["submitted"] 

812 if stamp and stamp < submitted: 

813 submitted = stamp 

814 return compare_datetimes(now, submitted).total_seconds() 

815 

816 @property 

817 @check_ready 

818 def serial_time(self): 

819 """serial computation time of a parallel calculation 

820 

821 Computed as the sum of (completed-started) of each task 

822 """ 

823 t = 0 

824 self._parse_metadata_dates() 

825 for md in self._metadata: 

826 t += compare_datetimes(md['completed'], md['started']).total_seconds() 

827 return t 

828 

829 @property 

830 @check_ready 

831 def wall_time(self): 

832 """actual computation time of a parallel calculation 

833 

834 Computed as the time between the latest `received` stamp 

835 and the earliest `submitted`. 

836 

837 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta. 

838 """ 

839 return self.timedelta(self.submitted, self.received) 

840 

841 def wait_interactive( 

842 self, interval=0.1, timeout=-1, widget=None, return_when=ALL_COMPLETED 

843 ): 

844 """interactive wait, printing progress at regular intervals. 

845 

846 Parameters 

847 ---------- 

848 interval : float 

849 Interval on which to update progress display. 

850 timeout : float 

851 Time (in seconds) to wait before raising a TimeoutError. 

852 -1 (default) means no timeout. 

853 widget : bool 

854 default: True if in an IPython kernel (notebook), False otherwise. 

855 Override default context-detection behavior for whether a widget-based progress bar 

856 should be used. 

857 return_when : concurrent.futures.ALL_COMPLETED | FIRST_EXCEPTION | FIRST_COMPLETED 

858 """ 

859 if timeout and timeout < 0: 

860 timeout = None 

861 if return_when == ALL_COMPLETED: 

862 return_when = None 

863 N = len(self) 

864 tic = time.perf_counter() 

865 progress_bar = progress(widget=widget, total=N, unit='tasks', desc=self._fname) 

866 

867 finished = self.ready() 

868 while not finished and ( 

869 timeout is None or time.perf_counter() - tic <= timeout 

870 ): 

871 wait_result = self.wait(interval, return_when=return_when) 

872 progress_bar.update(self.progress - progress_bar.n) 

873 if return_when is None: 

874 finished = wait_result 

875 else: 

876 done, pending = wait_result 

877 if return_when == FIRST_COMPLETED: 

878 finished = bool(done) 

879 elif return_when == FIRST_EXCEPTION: 

880 finished = (not pending) or any(not ar._success for ar in done) 

881 else: 

882 raise ValueError(f"Unrecognized return_when={return_when!r}") 

883 

884 progress_bar.update(self.progress - progress_bar.n) 

885 progress_bar.close() 

886 

887 def _republish_displaypub(self, content, eid): 

888 """republish individual displaypub content dicts""" 

889 ip = get_ipython() 

890 if ip is None: 

891 # displaypub is meaningless outside IPython 

892 return 

893 md = content['metadata'] or {} 

894 md['engine'] = eid 

895 ip.display_pub.publish(data=content['data'], metadata=md) 

896 

897 def _display_stream(self, text, prefix='', file=None): 

898 """Redisplay a stream""" 

899 if not text: 

900 # nothing to display 

901 return 

902 if file is None: 

903 file = sys.stdout 

904 

905 end = "" 

906 if prefix: 

907 if prefix == self._last_display_prefix: 

908 # same prefix, no need to re-display 

909 prefix = "" 

910 else: 

911 self._last_display_prefix = prefix 

912 

913 if prefix and not self._stream_trailing_newline: 

914 # prefix changed, no trailing newline; insert newline 

915 pre = "\n" 

916 else: 

917 pre = "" 

918 

919 if prefix: 

920 sep = "\n" 

921 else: 

922 sep = "" 

923 

924 self._stream_trailing_newline = text.endswith("\n") 

925 print(f"{pre}{prefix}{sep}{text}", file=file, end="") 

926 

927 def _display_single_result(self, result_only=False): 

928 if not result_only: 

929 self._display_stream(self.stdout) 

930 self._display_stream(self.stderr, file=sys.stderr) 

931 if get_ipython() is None: 

932 # displaypub is meaningless outside IPython 

933 return 

934 

935 if not result_only: 

936 for output in self.outputs: 

937 self._republish_displaypub(output, self.engine_id) 

938 

939 if self.execute_result is not None: 

940 display(self.get()) 

941 

942 @check_ready 

943 def display_outputs(self, groupby="type", result_only=False): 

944 """republish the outputs of the computation 

945 

946 Parameters 

947 ---------- 

948 groupby : str [default: type] 

949 if 'type': 

950 Group outputs by type (show all stdout, then all stderr, etc.): 

951 

952 [stdout:1] foo 

953 [stdout:2] foo 

954 [stderr:1] bar 

955 [stderr:2] bar 

956 if 'engine': 

957 Display outputs for each engine before moving on to the next: 

958 

959 [stdout:1] foo 

960 [stderr:1] bar 

961 [stdout:2] foo 

962 [stderr:2] bar 

963 

964 if 'order': 

965 Like 'type', but further collate individual displaypub 

966 outputs. This is meant for cases of each command producing 

967 several plots, and you would like to see all of the first 

968 plots together, then all of the second plots, and so on. 

969 

970 result_only: boolean [default: False] 

971 Only display the execution result and skip stdout, stderr and 

972 display-outputs. Usually used when using streaming output 

973 since these outputs would have already been displayed. 

974 """ 

975 self.wait_for_output() 

976 if self._single_result: 

977 self._display_single_result(result_only=result_only) 

978 return 

979 

980 stdouts = self.stdout 

981 stderrs = self.stderr 

982 execute_results = self.execute_result 

983 output_lists = self.outputs 

984 results = self.get(return_exceptions=True) 

985 

986 targets = self.engine_id 

987 

988 if groupby == "engine": 

989 for eid, stdout, stderr, outputs, r, execute_result in zip( 

990 targets, stdouts, stderrs, output_lists, results, execute_results 

991 ): 

992 if not result_only: 

993 self._display_stream(stdout, f'[stdout:{eid}] ') 

994 self._display_stream(stderr, f'[stderr:{eid}] ', file=sys.stderr) 

995 

996 if get_ipython() is None: 

997 # displaypub is meaningless outside IPython 

998 continue 

999 

1000 if (outputs and not result_only) or execute_result is not None: 

1001 _raw_text(f'[output:{eid}]') 

1002 

1003 if not result_only: 

1004 for output in outputs: 

1005 self._republish_displaypub(output, eid) 

1006 

1007 if execute_result is not None: 

1008 display(r) 

1009 

1010 elif groupby in ('type', 'order'): 

1011 if not result_only: 

1012 # republish stdout: 

1013 for eid, stdout in zip(targets, stdouts): 

1014 self._display_stream(stdout, f'[stdout:{eid}] ') 

1015 

1016 # republish stderr: 

1017 for eid, stderr in zip(targets, stderrs): 

1018 self._display_stream(stderr, f'[stderr:{eid}] ', file=sys.stderr) 

1019 

1020 if get_ipython() is None: 

1021 # displaypub is meaningless outside IPython 

1022 return 

1023 

1024 if not result_only: 

1025 if groupby == 'order': 

1026 output_dict = { 

1027 eid: outputs for eid, outputs in zip(targets, output_lists) 

1028 } 

1029 N = max(len(outputs) for outputs in output_lists) 

1030 for i in range(N): 

1031 for eid in targets: 

1032 outputs = output_dict[eid] 

1033 if len(outputs) >= N: 

1034 _raw_text(f'[output:{eid}]') 

1035 self._republish_displaypub(outputs[i], eid) 

1036 else: 

1037 # republish displaypub output 

1038 for eid, outputs in zip(targets, output_lists): 

1039 if outputs: 

1040 _raw_text(f'[output:{eid}]') 

1041 for output in outputs: 

1042 self._republish_displaypub(output, eid) 

1043 

1044 # finally, add execute_result: 

1045 for eid, r, execute_result in zip(targets, results, execute_results): 

1046 if execute_result is not None: 

1047 display(r) 

1048 

1049 else: 

1050 raise ValueError( 

1051 f"groupby must be one of 'type', 'engine', 'collate', not {groupby!r}" 

1052 ) 

1053 

1054 

1055class AsyncMapResult(AsyncResult): 

1056 """Class for representing results of non-blocking maps. 

1057 

1058 AsyncMapResult.get() will properly reconstruct gathers into single object. 

1059 

1060 AsyncMapResult is iterable at any time, and will wait on results as they come. 

1061 

1062 If ordered=False, then the first results to arrive will come first, otherwise 

1063 results will be yielded in the order they were submitted. 

1064 """ 

1065 

1066 def __init__( 

1067 self, 

1068 client, 

1069 children, 

1070 mapObject, 

1071 fname='', 

1072 ordered=True, 

1073 return_exceptions=False, 

1074 chunk_sizes=None, 

1075 ): 

1076 self._mapObject = mapObject 

1077 self.ordered = ordered 

1078 AsyncResult.__init__( 

1079 self, 

1080 client, 

1081 children, 

1082 fname=fname, 

1083 return_exceptions=return_exceptions, 

1084 chunk_sizes=chunk_sizes, 

1085 ) 

1086 self._single_result = False 

1087 

1088 def _reconstruct_result(self, res): 

1089 """Perform the gather on the actual results.""" 

1090 if self._return_exceptions: 

1091 if any(isinstance(r, Exception) for r in res): 

1092 # running with _return_exceptions, 

1093 # cannot reconstruct original 

1094 # use simple chain iterable 

1095 flattened = [] 

1096 for r in res: 

1097 if isinstance(r, Exception): 

1098 flattened.append(r) 

1099 else: 

1100 flattened.extend(r) 

1101 return flattened 

1102 return self._mapObject.joinPartitions(res) 

1103 

1104 # asynchronous iterator: 

1105 def __iter__(self): 

1106 it = self._ordered_iter if self.ordered else self._unordered_iter 

1107 yield from it() 

1108 

1109 def _yield_child_results(self, child): 

1110 """Yield results from a child 

1111 

1112 for use in iterator methods 

1113 """ 

1114 rlist = child.result() 

1115 if not isinstance(rlist, list): 

1116 rlist = [rlist] 

1117 self._collect_exceptions(rlist) 

1118 yield from rlist 

1119 

1120 # asynchronous ordered iterator: 

1121 def _ordered_iter(self): 

1122 """iterator for results *as they arrive*, preserving submission order.""" 

1123 try: 

1124 rlist = self.get(0) 

1125 except TimeoutError: 

1126 # wait for each result individually 

1127 evt = Event() 

1128 for child in self._children: 

1129 self._wait_for_child(child, evt=evt) 

1130 yield from self._yield_child_results(child) 

1131 else: 

1132 # already done 

1133 yield from rlist 

1134 

1135 # asynchronous unordered iterator: 

1136 def _unordered_iter(self): 

1137 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order.""" 

1138 try: 

1139 rlist = self.get(0) 

1140 except TimeoutError: 

1141 pending = self._children 

1142 while pending: 

1143 done, pending = concurrent.futures.wait( 

1144 pending, return_when=FIRST_COMPLETED 

1145 ) 

1146 for child in done: 

1147 yield from self._yield_child_results(child) 

1148 else: 

1149 # already done 

1150 yield from rlist 

1151 

1152 

1153class AsyncHubResult(AsyncResult): 

1154 """Class to wrap pending results that must be requested from the Hub. 

1155 

1156 Note that waiting/polling on these objects requires polling the Hub over the network, 

1157 so use `AsyncHubResult.wait()` sparingly. 

1158 """ 

1159 

1160 def _init_futures(self): 

1161 """disable Future-based resolution of Hub results""" 

1162 pass 

1163 

1164 def wait(self, timeout=-1, return_when=None): 

1165 """wait for result to complete.""" 

1166 start = time.perf_counter() 

1167 if timeout and timeout < 0: 

1168 timeout = None 

1169 if self._ready: 

1170 return True 

1171 local_ids = [m for m in self.msg_ids if m in self._client.outstanding] 

1172 local_ready = self._client.wait(local_ids, timeout) 

1173 if local_ready: 

1174 remote_ids = [m for m in self.msg_ids if m not in self._client.results] 

1175 if not remote_ids: 

1176 self._ready = True 

1177 else: 

1178 rdict = self._client.result_status(remote_ids, status_only=False) 

1179 pending = rdict['pending'] 

1180 while pending and ( 

1181 timeout is None or time.perf_counter() < start + timeout 

1182 ): 

1183 rdict = self._client.result_status(remote_ids, status_only=False) 

1184 pending = rdict['pending'] 

1185 if pending: 

1186 time.sleep(0.1) 

1187 if not pending: 

1188 self._ready = True 

1189 if self._ready: 

1190 self._output_ready = True 

1191 try: 

1192 results = list(map(self._client.results.get, self.msg_ids)) 

1193 if self._single_result: 

1194 r = results[0] 

1195 if isinstance(r, Exception) and not self._return_exceptions: 

1196 raise r 

1197 else: 

1198 results = self._collect_exceptions(results) 

1199 self._success = True 

1200 self.set_result(self._reconstruct_result(results)) 

1201 except Exception as e: 

1202 self._success = False 

1203 self.set_exception(e) 

1204 finally: 

1205 if self.owner: 

1206 [self._client.metadata.pop(mid) for mid in self.msg_ids] 

1207 [self._client.results.pop(mid) for mid in self.msg_ids] 

1208 

1209 return self._ready 

1210 

1211 

1212__all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']