Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipyparallel/client/view.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

690 statements  

1"""Views of remote engines.""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5import builtins 

6import concurrent.futures 

7import inspect 

8import secrets 

9import threading 

10import time 

11import warnings 

12from collections import deque 

13from contextlib import contextmanager 

14 

15from decorator import decorator 

16from IPython import get_ipython 

17from traitlets import Any, Bool, CFloat, Dict, HasTraits, Instance, Integer, List, Set 

18 

19import ipyparallel as ipp 

20from ipyparallel import util 

21from ipyparallel.controller.dependency import Dependency, dependent 

22 

23from .. import serialize 

24from ..serialize import PrePickled 

25from . import map as Map 

26from .asyncresult import AsyncMapResult, AsyncResult 

27from .remotefunction import ParallelFunction, getname, parallel, remote 

28 

29# ----------------------------------------------------------------------------- 

30# Decorators 

31# ----------------------------------------------------------------------------- 

32 

33 

34@decorator 

35def save_ids(f, self, *args, **kwargs): 

36 """Keep our history and outstanding attributes up to date after a method call.""" 

37 n_previous = len(self.client.history) 

38 try: 

39 ret = f(self, *args, **kwargs) 

40 finally: 

41 nmsgs = len(self.client.history) - n_previous 

42 msg_ids = self.client.history[-nmsgs:] 

43 self.history.extend(msg_ids) 

44 self.outstanding.update(msg_ids) 

45 return ret 

46 

47 

48@decorator 

49def sync_results(f, self, *args, **kwargs): 

50 """sync relevant results from self.client to our results attribute.""" 

51 if self._in_sync_results: 

52 return f(self, *args, **kwargs) 

53 self._in_sync_results = True 

54 try: 

55 ret = f(self, *args, **kwargs) 

56 finally: 

57 self._in_sync_results = False 

58 self._sync_results() 

59 return ret 

60 

61 

62# ----------------------------------------------------------------------------- 

63# Classes 

64# ----------------------------------------------------------------------------- 

65 

66 

67class View(HasTraits): 

68 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. 

69 

70 Don't use this class, use subclasses. 

71 

72 Methods 

73 ------- 

74 

75 spin 

76 flushes incoming results and registration state changes 

77 control methods spin, and requesting `ids` also ensures up to date 

78 

79 wait 

80 wait on one or more msg_ids 

81 

82 execution methods 

83 apply 

84 legacy: execute, run 

85 

86 data movement 

87 push, pull, scatter, gather 

88 

89 query methods 

90 get_result, queue_status, purge_results, result_status 

91 

92 control methods 

93 abort, shutdown 

94 

95 """ 

96 

97 # flags 

98 block = Bool(False) 

99 track = Bool(False) 

100 targets = Any() 

101 

102 history = List() 

103 outstanding = Set() 

104 results = Dict() 

105 client = Instance('ipyparallel.Client', allow_none=True) 

106 

107 _socket = Any() 

108 _flag_names = List(['targets', 'block', 'track']) 

109 _in_sync_results = Bool(False) 

110 _targets = Any() 

111 _idents = Any() 

112 

113 def __init__(self, client=None, socket=None, **flags): 

114 super().__init__(client=client, _socket=socket) 

115 self.results = client.results 

116 self.block = client.block 

117 self.executor = ViewExecutor(self) 

118 

119 self.set_flags(**flags) 

120 

121 assert self.__class__ is not View, "Don't use base View objects, use subclasses" 

122 

123 def __repr__(self): 

124 strtargets = str(self.targets) 

125 if len(strtargets) > 16: 

126 strtargets = strtargets[:12] + '...]' 

127 return f"<{self.__class__.__name__} {strtargets}>" 

128 

129 def __len__(self): 

130 if isinstance(self.targets, list): 

131 return len(self.targets) 

132 elif isinstance(self.targets, int): 

133 return 1 

134 else: 

135 return len(self.client) 

136 

137 def set_flags(self, **kwargs): 

138 """set my attribute flags by keyword. 

139 

140 Views determine behavior with a few attributes (`block`, `track`, etc.). 

141 These attributes can be set all at once by name with this method. 

142 

143 Parameters 

144 ---------- 

145 block : bool 

146 whether to wait for results 

147 track : bool 

148 whether to create a MessageTracker to allow the user to 

149 safely edit after arrays and buffers during non-copying 

150 sends. 

151 """ 

152 for name, value in kwargs.items(): 

153 if name not in self._flag_names: 

154 raise KeyError(f"Invalid name: {name!r}") 

155 else: 

156 setattr(self, name, value) 

157 

158 @contextmanager 

159 def temp_flags(self, **kwargs): 

160 """temporarily set flags, for use in `with` statements. 

161 

162 See set_flags for permanent setting of flags 

163 

164 Examples 

165 -------- 

166 >>> view.track=False 

167 ... 

168 >>> with view.temp_flags(track=True): 

169 ... ar = view.apply(dostuff, my_big_array) 

170 ... ar.tracker.wait() # wait for send to finish 

171 >>> view.track 

172 False 

173 

174 """ 

175 # preflight: save flags, and set temporaries 

176 saved_flags = {} 

177 for f in self._flag_names: 

178 saved_flags[f] = getattr(self, f) 

179 self.set_flags(**kwargs) 

180 # yield to the with-statement block 

181 try: 

182 yield 

183 finally: 

184 # postflight: restore saved flags 

185 self.set_flags(**saved_flags) 

186 

187 # ---------------------------------------------------------------- 

188 # apply 

189 # ---------------------------------------------------------------- 

190 

191 def _sync_results(self): 

192 """to be called by @sync_results decorator 

193 

194 after submitting any tasks. 

195 """ 

196 delta = self.outstanding.difference(self.client.outstanding) 

197 completed = self.outstanding.intersection(delta) 

198 self.outstanding = self.outstanding.difference(completed) 

199 

200 @sync_results 

201 @save_ids 

202 def _really_apply(self, f, args, kwargs, block=None, **options): 

203 """wrapper for client.send_apply_request""" 

204 raise NotImplementedError("Implement in subclasses") 

205 

206 def apply(self, __ipp_f, *args, **kwargs): 

207 """calls ``f(*args, **kwargs)`` on remote engines, returning the result. 

208 

209 This method sets all apply flags via this View's attributes. 

210 

211 Returns :class:`~ipyparallel.client.asyncresult.AsyncResult` 

212 instance if ``self.block`` is False, otherwise the return value of 

213 ``f(*args, **kwargs)``. 

214 """ 

215 return self._really_apply(__ipp_f, args, kwargs) 

216 

217 def apply_async(self, __ipp_f, *args, **kwargs): 

218 """calls ``f(*args, **kwargs)`` on remote engines in a nonblocking manner. 

219 

220 Returns :class:`~ipyparallel.client.asyncresult.AsyncResult` instance. 

221 """ 

222 return self._really_apply(__ipp_f, args, kwargs, block=False) 

223 

224 def apply_sync(self, __ipp_f, *args, **kwargs): 

225 """calls ``f(*args, **kwargs)`` on remote engines in a blocking manner, 

226 returning the result. 

227 """ 

228 return self._really_apply(__ipp_f, args, kwargs, block=True) 

229 

230 # ---------------------------------------------------------------- 

231 # wrappers for client and control methods 

232 # ---------------------------------------------------------------- 

233 @sync_results 

234 def spin(self): 

235 """spin the client, and sync""" 

236 self.client.spin() 

237 

238 @sync_results 

239 def wait(self, jobs=None, timeout=-1): 

240 """waits on one or more `jobs`, for up to `timeout` seconds. 

241 

242 Parameters 

243 ---------- 

244 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects 

245 ints are indices to self.history 

246 strs are msg_ids 

247 default: wait on all outstanding messages 

248 timeout : float 

249 a time in seconds, after which to give up. 

250 default is -1, which means no timeout 

251 

252 Returns 

253 ------- 

254 True : when all msg_ids are done 

255 False : timeout reached, some msg_ids still outstanding 

256 """ 

257 if jobs is None: 

258 jobs = self.history 

259 return self.client.wait(jobs, timeout) 

260 

261 def abort(self, jobs=None, targets=None, block=None): 

262 """Abort jobs on my engines. 

263 

264 Note: only jobs that have not started yet can be aborted. 

265 To halt a running job, 

266 you must interrupt the engine(s) via the Cluster API. 

267 

268 Parameters 

269 ---------- 

270 jobs : None, str, list of strs, optional 

271 if None: abort all jobs. 

272 else: abort specific msg_id(s). 

273 """ 

274 block = block if block is not None else self.block 

275 targets = targets if targets is not None else self.targets 

276 jobs = jobs if jobs is not None else list(self.outstanding) 

277 

278 return self.client.abort(jobs=jobs, targets=targets, block=block) 

279 

280 def queue_status(self, targets=None, verbose=False): 

281 """Fetch the Queue status of my engines""" 

282 targets = targets if targets is not None else self.targets 

283 return self.client.queue_status(targets=targets, verbose=verbose) 

284 

285 def purge_results(self, jobs=[], targets=[]): 

286 """Instruct the controller to forget specific results.""" 

287 if targets is None or targets == 'all': 

288 targets = self.targets 

289 return self.client.purge_results(jobs=jobs, targets=targets) 

290 

291 def shutdown(self, targets=None, restart=False, hub=False, block=None): 

292 """Terminates one or more engine processes, optionally including the hub.""" 

293 block = self.block if block is None else block 

294 if targets is None or targets == 'all': 

295 targets = self.targets 

296 return self.client.shutdown( 

297 targets=targets, restart=restart, hub=hub, block=block 

298 ) 

299 

300 def get_result(self, indices_or_msg_ids=None, block=None, owner=False): 

301 """return one or more results, specified by history index or msg_id. 

302 

303 See :meth:`ipyparallel.client.client.Client.get_result` for details. 

304 """ 

305 

306 if indices_or_msg_ids is None: 

307 indices_or_msg_ids = -1 

308 if isinstance(indices_or_msg_ids, int): 

309 indices_or_msg_ids = self.history[indices_or_msg_ids] 

310 elif isinstance(indices_or_msg_ids, (list, tuple, set)): 

311 indices_or_msg_ids = list(indices_or_msg_ids) 

312 for i, index in enumerate(indices_or_msg_ids): 

313 if isinstance(index, int): 

314 indices_or_msg_ids[i] = self.history[index] 

315 return self.client.get_result(indices_or_msg_ids, block=block, owner=owner) 

316 

317 # ------------------------------------------------------------------- 

318 # Map 

319 # ------------------------------------------------------------------- 

320 

321 @sync_results 

322 def map(self, f, *sequences, **kwargs): 

323 """override in subclasses""" 

324 raise NotImplementedError() 

325 

326 def map_async(self, f, *sequences, **kwargs): 

327 """Parallel version of builtin :func:`python:map`, using this view's engines. 

328 

329 This is equivalent to ``map(...block=False)``. 

330 

331 See `self.map` for details. 

332 """ 

333 if 'block' in kwargs: 

334 raise TypeError("map_async doesn't take a `block` keyword argument.") 

335 kwargs['block'] = False 

336 return self.map(f, *sequences, **kwargs) 

337 

338 def map_sync(self, f, *sequences, **kwargs): 

339 """Parallel version of builtin :func:`python:map`, using this view's engines. 

340 

341 This is equivalent to ``map(...block=True)``. 

342 

343 See `self.map` for details. 

344 """ 

345 if 'block' in kwargs: 

346 raise TypeError("map_sync doesn't take a `block` keyword argument.") 

347 kwargs['block'] = True 

348 return self.map(f, *sequences, **kwargs) 

349 

350 def imap(self, f, *sequences, **kwargs): 

351 """Parallel version of :func:`itertools.imap`. 

352 

353 See `self.map` for details. 

354 

355 """ 

356 

357 return iter(self.map_async(f, *sequences, **kwargs)) 

358 

359 # ------------------------------------------------------------------- 

360 # Decorators 

361 # ------------------------------------------------------------------- 

362 

363 def remote(self, block=None, **flags): 

364 """Decorator for making a RemoteFunction""" 

365 block = self.block if block is None else block 

366 return remote(self, block=block, **flags) 

367 

368 def parallel(self, dist='b', block=None, **flags): 

369 """Decorator for making a ParallelFunction""" 

370 block = self.block if block is None else block 

371 return parallel(self, dist=dist, block=block, **flags) 

372 

373 

374class DirectView(View): 

375 """Direct Multiplexer View of one or more engines. 

376 

377 These are created via indexed access to a client: 

378 

379 >>> dv_1 = client[1] 

380 >>> dv_all = client[:] 

381 >>> dv_even = client[::2] 

382 >>> dv_some = client[1:3] 

383 

384 This object provides dictionary access to engine namespaces: 

385 

386 # push a=5: 

387 >>> dv['a'] = 5 

388 # pull 'foo': 

389 >>> dv['foo'] 

390 

391 """ 

392 

393 def __init__(self, client=None, socket=None, targets=None, **flags): 

394 super().__init__(client=client, socket=socket, targets=targets, **flags) 

395 

396 @property 

397 def importer(self): 

398 """sync_imports(local=True) as a property. 

399 

400 See sync_imports for details. 

401 

402 """ 

403 return self.sync_imports(True) 

404 

405 @contextmanager 

406 def sync_imports(self, local=True, quiet=False): 

407 """Context Manager for performing simultaneous local and remote imports. 

408 

409 'import x as y' will *not* work. The 'as y' part will simply be ignored. 

410 

411 If `local=True`, then the package will also be imported locally. 

412 

413 If `quiet=True`, no output will be produced when attempting remote 

414 imports. 

415 

416 Note that remote-only (`local=False`) imports have not been implemented. 

417 

418 >>> with view.sync_imports(): 

419 ... from numpy import recarray 

420 importing recarray from numpy on engine(s) 

421 

422 """ 

423 

424 local_import = builtins.__import__ 

425 modules = set() 

426 results = [] 

427 

428 # get the calling frame 

429 # that's two steps up due to `@contextmanager` 

430 context_frame = inspect.getouterframes(inspect.currentframe())[2].frame 

431 

432 @util.interactive 

433 def remote_import(name, fromlist, level): 

434 """the function to be passed to apply, that actually performs the import 

435 on the engine, and loads up the user namespace. 

436 """ 

437 import sys 

438 

439 user_ns = globals() 

440 mod = __import__(name, fromlist=fromlist, level=level) 

441 if fromlist: 

442 for key in fromlist: 

443 user_ns[key] = getattr(mod, key) 

444 else: 

445 user_ns[name] = sys.modules[name] 

446 

447 def view_import(name, globals={}, locals={}, fromlist=[], level=0): 

448 """the drop-in replacement for __import__, that optionally imports 

449 locally as well. 

450 """ 

451 # don't override nested imports 

452 save_import = builtins.__import__ 

453 builtins.__import__ = local_import 

454 

455 import_frame = inspect.getouterframes(inspect.currentframe())[1].frame 

456 if import_frame is not context_frame: 

457 # only forward imports from the context frame, 

458 # not secondary imports 

459 # TODO: does this ever happen, or is the above `__import__` enough? 

460 return local_import(name, globals, locals, fromlist, level) 

461 

462 if local: 

463 mod = local_import(name, globals, locals, fromlist, level) 

464 else: 

465 raise NotImplementedError("remote-only imports not yet implemented") 

466 

467 key = name + ':' + ','.join(fromlist or []) 

468 if level <= 0 and key not in modules: 

469 modules.add(key) 

470 if not quiet: 

471 if fromlist: 

472 print( 

473 "importing {} from {} on engine(s)".format( 

474 ','.join(fromlist), name 

475 ) 

476 ) 

477 else: 

478 print(f"importing {name} on engine(s)") 

479 results.append(self.apply_async(remote_import, name, fromlist, level)) 

480 # restore override 

481 builtins.__import__ = save_import 

482 

483 return mod 

484 

485 # override __import__ 

486 builtins.__import__ = view_import 

487 try: 

488 # enter the block 

489 yield 

490 except ImportError: 

491 if local: 

492 raise 

493 else: 

494 # ignore import errors if not doing local imports 

495 pass 

496 finally: 

497 # always restore __import__ 

498 builtins.__import__ = local_import 

499 

500 for r in results: 

501 # raise possible remote ImportErrors here 

502 r.get() 

503 

504 def use_dill(self): 

505 """Expand serialization support with dill 

506 

507 adds support for closures, etc. 

508 

509 This calls ipyparallel.serialize.use_dill() here and on each engine. 

510 """ 

511 serialize.use_dill() 

512 return self.apply(serialize.use_dill) 

513 

514 def use_cloudpickle(self): 

515 """Expand serialization support with cloudpickle. 

516 

517 This calls ipyparallel.serialize.use_cloudpickle() here and on each engine. 

518 """ 

519 serialize.use_cloudpickle() 

520 return self.apply(serialize.use_cloudpickle) 

521 

522 def use_pickle(self): 

523 """Restore 

524 

525 This reverts changes to serialization caused by `use_dill|.cloudpickle`. 

526 """ 

527 serialize.use_pickle() 

528 return self.apply(serialize.use_pickle) 

529 

530 @sync_results 

531 @save_ids 

532 def _really_apply( 

533 self, f, args=None, kwargs=None, targets=None, block=None, track=None 

534 ): 

535 """calls f(*args, **kwargs) on remote engines, returning the result. 

536 

537 This method sets all of `apply`'s flags via this View's attributes. 

538 

539 Parameters 

540 ---------- 

541 f : callable 

542 args : list [default: empty] 

543 kwargs : dict [default: empty] 

544 targets : target list [default: self.targets] 

545 where to run 

546 block : bool [default: self.block] 

547 whether to block 

548 track : bool [default: self.track] 

549 whether to ask zmq to track the message, for safe non-copying sends 

550 

551 Returns 

552 ------- 

553 if self.block is False: 

554 returns AsyncResult 

555 else: 

556 returns actual result of f(*args, **kwargs) on the engine(s) 

557 This will be a list of self.targets is also a list (even length 1), or 

558 the single result if self.targets is an integer engine id 

559 """ 

560 args = [] if args is None else args 

561 kwargs = {} if kwargs is None else kwargs 

562 block = self.block if block is None else block 

563 track = self.track if track is None else track 

564 targets = self.targets if targets is None else targets 

565 

566 _idents, _targets = self.client._build_targets(targets) 

567 futures = [] 

568 

569 pf = PrePickled(f) 

570 pargs = [PrePickled(arg) for arg in args] 

571 pkwargs = {k: PrePickled(v) for k, v in kwargs.items()} 

572 

573 for ident in _idents: 

574 future = self.client.send_apply_request( 

575 self._socket, pf, pargs, pkwargs, track=track, ident=ident 

576 ) 

577 futures.append(future) 

578 if track: 

579 trackers = [_.tracker for _ in futures] 

580 else: 

581 trackers = [] 

582 if isinstance(targets, int): 

583 futures = futures[0] 

584 ar = AsyncResult( 

585 self.client, futures, fname=getname(f), targets=_targets, owner=True 

586 ) 

587 if block: 

588 try: 

589 return ar.get() 

590 except KeyboardInterrupt: 

591 pass 

592 return ar 

593 

594 @sync_results 

595 def map(self, f, *sequences, block=None, track=False, return_exceptions=False): 

596 """Parallel version of builtin `map`, using this View's `targets`. 

597 

598 There will be one task per target, so work will be chunked 

599 if the sequences are longer than `targets`. 

600 

601 Results can be iterated as they are ready, but will become available in chunks. 

602 

603 .. versionadded:: 7.0 

604 `return_exceptions` 

605 

606 Parameters 

607 ---------- 

608 f : callable 

609 function to be mapped 

610 *sequences : one or more sequences of matching length 

611 the sequences to be distributed and passed to `f` 

612 block : bool [default self.block] 

613 whether to wait for the result or not 

614 track : bool [default False] 

615 Track underlying zmq send to indicate when it is safe to modify memory. 

616 Only for zero-copy sends such as numpy arrays that are going to be modified in-place. 

617 return_exceptions : bool [default False] 

618 Return remote Exceptions in the result sequence instead of raising them. 

619 

620 Returns 

621 ------- 

622 If block=False 

623 An :class:`~ipyparallel.client.asyncresult.AsyncMapResult` instance. 

624 An object like AsyncResult, but which reassembles the sequence of results 

625 into a single list. AsyncMapResults can be iterated through before all 

626 results are complete. 

627 else 

628 A list, the result of ``map(f,*sequences)`` 

629 """ 

630 

631 if block is None: 

632 block = self.block 

633 

634 assert len(sequences) > 0, "must have some sequences to map onto!" 

635 pf = ParallelFunction( 

636 self, f, block=block, track=track, return_exceptions=return_exceptions 

637 ) 

638 return pf.map(*sequences) 

639 

640 @sync_results 

641 @save_ids 

642 def execute(self, code, silent=True, targets=None, block=None): 

643 """Executes `code` on `targets` in blocking or nonblocking manner. 

644 

645 ``execute`` is always `bound` (affects engine namespace) 

646 

647 Parameters 

648 ---------- 

649 code : str 

650 the code string to be executed 

651 block : bool 

652 whether or not to wait until done to return 

653 default: self.block 

654 """ 

655 block = self.block if block is None else block 

656 targets = self.targets if targets is None else targets 

657 

658 _idents, _targets = self.client._build_targets(targets) 

659 futures = [] 

660 for ident in _idents: 

661 future = self.client.send_execute_request( 

662 self._socket, code, silent=silent, ident=ident 

663 ) 

664 futures.append(future) 

665 if isinstance(targets, int): 

666 futures = futures[0] 

667 ar = AsyncResult( 

668 self.client, futures, fname='execute', targets=_targets, owner=True 

669 ) 

670 if block: 

671 try: 

672 ar.get() 

673 ar.wait_for_output() 

674 except KeyboardInterrupt: 

675 pass 

676 return ar 

677 

678 def run(self, filename, targets=None, block=None): 

679 """Execute contents of `filename` on my engine(s). 

680 

681 This simply reads the contents of the file and calls `execute`. 

682 

683 Parameters 

684 ---------- 

685 filename : str 

686 The path to the file 

687 targets : int/str/list of ints/strs 

688 the engines on which to execute 

689 default : all 

690 block : bool 

691 whether or not to wait until done 

692 default: self.block 

693 

694 """ 

695 with open(filename) as f: 

696 # add newline in case of trailing indented whitespace 

697 # which will cause SyntaxError 

698 code = f.read() + '\n' 

699 return self.execute(code, block=block, targets=targets) 

700 

701 def update(self, ns): 

702 """update remote namespace with dict `ns` 

703 

704 See `push` for details. 

705 """ 

706 return self.push(ns, block=self.block, track=self.track) 

707 

708 def push(self, ns, targets=None, block=None, track=None): 

709 """update remote namespace with dict `ns` 

710 

711 Parameters 

712 ---------- 

713 ns : dict 

714 dict of keys with which to update engine namespace(s) 

715 block : bool [default : self.block] 

716 whether to wait to be notified of engine receipt 

717 

718 """ 

719 

720 block = block if block is not None else self.block 

721 track = track if track is not None else self.track 

722 targets = targets if targets is not None else self.targets 

723 # applier = self.apply_sync if block else self.apply_async 

724 if not isinstance(ns, dict): 

725 raise TypeError(f"Must be a dict, not {type(ns)}") 

726 return self._really_apply( 

727 util._push, kwargs=ns, block=block, track=track, targets=targets 

728 ) 

729 

730 def get(self, key_s): 

731 """get object(s) by `key_s` from remote namespace 

732 

733 see `pull` for details. 

734 """ 

735 # block = block if block is not None else self.block 

736 return self.pull(key_s, block=True) 

737 

738 def pull(self, names, targets=None, block=None): 

739 """get object(s) by `name` from remote namespace 

740 

741 will return one object if it is a key. 

742 can also take a list of keys, in which case it will return a list of objects. 

743 """ 

744 block = block if block is not None else self.block 

745 targets = targets if targets is not None else self.targets 

746 if isinstance(names, str): 

747 pass 

748 elif isinstance(names, (list, tuple, set)): 

749 for key in names: 

750 if not isinstance(key, str): 

751 raise TypeError(f"keys must be str, not type {type(key)!r}") 

752 else: 

753 raise TypeError(f"names must be strs, not {names!r}") 

754 return self._really_apply(util._pull, (names,), block=block, targets=targets) 

755 

756 def scatter( 

757 self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None 

758 ): 

759 """ 

760 Partition a Python sequence and send the partitions to a set of engines. 

761 """ 

762 block = block if block is not None else self.block 

763 track = track if track is not None else self.track 

764 targets = targets if targets is not None else self.targets 

765 

766 # construct integer ID list: 

767 targets = self.client._build_targets(targets)[1] 

768 

769 mapObject = Map.dists[dist]() 

770 nparts = len(targets) 

771 futures = [] 

772 _lengths = [] 

773 for index, engineid in enumerate(targets): 

774 partition = mapObject.getPartition(seq, index, nparts) 

775 if flatten and len(partition) == 1: 

776 ns = {key: partition[0]} 

777 else: 

778 ns = {key: partition} 

779 r = self.push(ns, block=False, track=track, targets=engineid) 

780 r.owner = False 

781 futures.extend(r._children) 

782 _lengths.append(len(partition)) 

783 

784 r = AsyncResult( 

785 self.client, futures, fname='scatter', targets=targets, owner=True 

786 ) 

787 r._scatter_lengths = _lengths 

788 if block: 

789 r.wait() 

790 else: 

791 return r 

792 

793 @sync_results 

794 @save_ids 

795 def gather(self, key, dist='b', targets=None, block=None): 

796 """ 

797 Gather a partitioned sequence on a set of engines as a single local seq. 

798 """ 

799 block = block if block is not None else self.block 

800 targets = targets if targets is not None else self.targets 

801 mapObject = Map.dists[dist]() 

802 msg_ids = [] 

803 

804 # construct integer ID list: 

805 targets = self.client._build_targets(targets)[1] 

806 

807 futures = [] 

808 for index, engineid in enumerate(targets): 

809 ar = self.pull(key, block=False, targets=engineid) 

810 ar.owner = False 

811 futures.extend(ar._children) 

812 

813 r = AsyncMapResult(self.client, futures, mapObject, fname='gather') 

814 

815 if block: 

816 try: 

817 return r.get() 

818 except KeyboardInterrupt: 

819 pass 

820 return r 

821 

822 def __getitem__(self, key): 

823 return self.get(key) 

824 

825 def __setitem__(self, key, value): 

826 self.update({key: value}) 

827 

828 def clear(self, targets=None, block=None): 

829 """Clear the remote namespaces on my engines.""" 

830 block = block if block is not None else self.block 

831 targets = targets if targets is not None else self.targets 

832 return self.client.clear(targets=targets, block=block) 

833 

834 # ---------------------------------------- 

835 # activate for %px, %autopx, etc. magics 

836 # ---------------------------------------- 

837 

838 def activate(self, suffix=''): 

839 """Activate IPython magics associated with this View 

840 

841 Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig` 

842 

843 Parameters 

844 ---------- 

845 suffix : str [default: ''] 

846 The suffix, if any, for the magics. This allows you to have 

847 multiple views associated with parallel magics at the same time. 

848 

849 e.g. ``rc[::2].activate(suffix='_even')`` will give you 

850 the magics ``%px_even``, ``%pxresult_even``, etc. for running magics 

851 on the even engines. 

852 """ 

853 

854 from ipyparallel.client.magics import ParallelMagics 

855 

856 ip = get_ipython() 

857 if ip is None: 

858 warnings.warn( 

859 "The IPython parallel magics (%px, etc.) only work within IPython." 

860 ) 

861 return 

862 

863 M = ParallelMagics(ip, self, suffix) 

864 ip.magics_manager.register(M) 

865 

866 

867@decorator 

868def _not_coalescing(method, self, *args, **kwargs): 

869 """Decorator for broadcast methods that can't use reply coalescing""" 

870 is_coalescing = self.is_coalescing 

871 try: 

872 self.is_coalescing = False 

873 return method(self, *args, **kwargs) 

874 finally: 

875 self.is_coalescing = is_coalescing 

876 

877 

878class BroadcastView(DirectView): 

879 is_coalescing = Bool(False) 

880 

881 def _init_metadata(self, target_tuples): 

882 """initialize request metadata""" 

883 return dict( 

884 targets=target_tuples, 

885 is_broadcast=True, 

886 is_coalescing=self.is_coalescing, 

887 ) 

888 

889 def _make_async_result(self, message_future, s_idents, **kwargs): 

890 original_msg_id = message_future.msg_id 

891 if not self.is_coalescing: 

892 futures = [] 

893 for ident in s_idents: 

894 msg_and_target_id = f'{original_msg_id}_{ident}' 

895 future = self.client.create_message_futures( 

896 msg_and_target_id, 

897 message_future.header, 

898 async_result=True, 

899 track=True, 

900 ) 

901 self.client.outstanding.add(msg_and_target_id) 

902 self.client._outstanding_dict[ident].add(msg_and_target_id) 

903 self.outstanding.add(msg_and_target_id) 

904 futures.append(future[0]) 

905 if original_msg_id in self.outstanding: 

906 self.outstanding.remove(original_msg_id) 

907 else: 

908 self.client.outstanding.add(original_msg_id) 

909 for ident in s_idents: 

910 self.client._outstanding_dict[ident].add(original_msg_id) 

911 futures = message_future 

912 

913 ar = AsyncResult(self.client, futures, owner=True, **kwargs) 

914 

915 if self.is_coalescing: 

916 # if coalescing, discard outstanding-tracking when we are done 

917 def _rm_outstanding(_): 

918 for ident in s_idents: 

919 if ident in self.client._outstanding_dict: 

920 self.client._outstanding_dict[ident].discard(original_msg_id) 

921 

922 ar.add_done_callback(_rm_outstanding) 

923 

924 return ar 

925 

926 @sync_results 

927 @save_ids 

928 def _really_apply( 

929 self, f, args=None, kwargs=None, block=None, track=None, targets=None 

930 ): 

931 args = [] if args is None else args 

932 kwargs = {} if kwargs is None else kwargs 

933 block = self.block if block is None else block 

934 track = self.track if track is None else track 

935 targets = self.targets if targets is None else targets 

936 idents, _targets = self.client._build_targets(targets) 

937 

938 pf = PrePickled(f) 

939 pargs = [PrePickled(arg) for arg in args] 

940 pkwargs = {k: PrePickled(v) for k, v in kwargs.items()} 

941 

942 s_idents = [ident.decode("utf8") for ident in idents] 

943 target_tuples = list(zip(s_idents, _targets)) 

944 

945 metadata = self._init_metadata(target_tuples) 

946 

947 ar = None 

948 

949 def make_asyncresult(message_future): 

950 nonlocal ar 

951 ar = self._make_async_result( 

952 message_future, s_idents, fname=getname(f), targets=_targets 

953 ) 

954 

955 self.client.send_apply_request( 

956 self._socket, 

957 pf, 

958 pargs, 

959 pkwargs, 

960 track=track, 

961 metadata=metadata, 

962 message_future_hook=make_asyncresult, 

963 ) 

964 

965 if block: 

966 try: 

967 return ar.get() 

968 except KeyboardInterrupt: 

969 pass 

970 return ar 

971 

972 @sync_results 

973 @save_ids 

974 @_not_coalescing 

975 def execute(self, code, silent=True, targets=None, block=None): 

976 """Executes `code` on `targets` in blocking or nonblocking manner. 

977 

978 ``execute`` is always `bound` (affects engine namespace) 

979 

980 Parameters 

981 ---------- 

982 code : str 

983 the code string to be executed 

984 block : bool 

985 whether or not to wait until done to return 

986 default: self.block 

987 """ 

988 block = self.block if block is None else block 

989 targets = self.targets if targets is None else targets 

990 

991 _idents, _targets = self.client._build_targets(targets) 

992 s_idents = [ident.decode("utf8") for ident in _idents] 

993 target_tuples = list(zip(s_idents, _targets)) 

994 

995 metadata = self._init_metadata(target_tuples) 

996 

997 ar = None 

998 

999 def make_asyncresult(message_future): 

1000 nonlocal ar 

1001 ar = self._make_async_result( 

1002 message_future, s_idents, fname='execute', targets=_targets 

1003 ) 

1004 

1005 message_future = self.client.send_execute_request( 

1006 self._socket, 

1007 code, 

1008 silent=silent, 

1009 metadata=metadata, 

1010 message_future_hook=make_asyncresult, 

1011 ) 

1012 if block: 

1013 try: 

1014 ar.get() 

1015 ar.wait_for_output() 

1016 except KeyboardInterrupt: 

1017 pass 

1018 return ar 

1019 

1020 @staticmethod 

1021 def _broadcast_map(f, *sequence_names): 

1022 """Function passed to apply 

1023 

1024 Equivalent, but account for the fact that scatter 

1025 occurs in a separate step. 

1026 

1027 Does these things: 

1028 - resolve sequence names to sequences in the user namespace 

1029 - collect list(map(f, *squences)) 

1030 - cleanup temporary sequence variables from scatter 

1031 """ 

1032 sequences = [] 

1033 ip = get_ipython() 

1034 for seq_name in sequence_names: 

1035 sequences.append(ip.user_ns.pop(seq_name)) 

1036 return list(map(f, *sequences)) 

1037 

1038 @_not_coalescing 

1039 def map(self, f, *sequences, block=None, track=False, return_exceptions=False): 

1040 """Parallel version of builtin `map`, using this View's `targets`. 

1041 

1042 There will be one task per engine, so work will be chunked 

1043 if the sequences are longer than `targets`. 

1044 

1045 Results can be iterated as they are ready, but will become available in chunks. 

1046 

1047 .. note:: 

1048 

1049 BroadcastView does not yet have a fully native map implementation. 

1050 In particular, the scatter step is still one message per engine, 

1051 identical to DirectView, 

1052 and typically slower due to the more complex scheduler. 

1053 

1054 It is more efficient to partition inputs via other means (e.g. SPMD based on rank & size) 

1055 and use `apply` to submit all tasks in one broadcast. 

1056 

1057 .. versionadded:: 8.8 

1058 

1059 Parameters 

1060 ---------- 

1061 f : callable 

1062 function to be mapped 

1063 *sequences : one or more sequences of matching length 

1064 the sequences to be distributed and passed to `f` 

1065 block : bool [default self.block] 

1066 whether to wait for the result or not 

1067 track : bool [default False] 

1068 Track underlying zmq send to indicate when it is safe to modify memory. 

1069 Only for zero-copy sends such as numpy arrays that are going to be modified in-place. 

1070 return_exceptions : bool [default False] 

1071 Return remote Exceptions in the result sequence instead of raising them. 

1072 

1073 Returns 

1074 ------- 

1075 If block=False 

1076 An :class:`~ipyparallel.client.asyncresult.AsyncMapResult` instance. 

1077 An object like AsyncResult, but which reassembles the sequence of results 

1078 into a single list. AsyncMapResults can be iterated through before all 

1079 results are complete. 

1080 else 

1081 A list, the result of ``map(f,*sequences)`` 

1082 """ 

1083 if block is None: 

1084 block = self.block 

1085 if track is None: 

1086 track = self.track 

1087 

1088 # unique identifier, since we're living in the interactive namespace 

1089 map_key = secrets.token_hex(5) 

1090 dist = 'b' 

1091 map_object = Map.dists[dist]() 

1092 

1093 seq_names = [] 

1094 for i, seq in enumerate(sequences): 

1095 seq_name = f"_seq_{map_key}_{i}" 

1096 seq_names.append(seq_name) 

1097 try: 

1098 len(seq) 

1099 except Exception: 

1100 # cast length-less sequences (e.g. Range) to list 

1101 seq = list(seq) 

1102 

1103 ar = self.scatter(seq_name, seq, dist=dist, block=False, track=track) 

1104 scatter_chunk_sizes = ar._scatter_lengths 

1105 

1106 # submit the map tasks as an actual broadcast 

1107 ar = self.apply(self._broadcast_map, f, *seq_names) 

1108 ar.owner = False 

1109 # re-wrap messages in an AsyncMapResult to get map API 

1110 # this is where the 'gather' reconstruction happens 

1111 amr = ipp.AsyncMapResult( 

1112 self.client, 

1113 ar._children, 

1114 map_object, 

1115 fname=getname(f), 

1116 return_exceptions=return_exceptions, 

1117 chunk_sizes={ 

1118 future.msg_id: chunk_size 

1119 for future, chunk_size in zip(ar._children, scatter_chunk_sizes) 

1120 }, 

1121 ) 

1122 

1123 if block: 

1124 return amr.get() 

1125 else: 

1126 return amr 

1127 

1128 # scatter/gather cannot be coalescing yet 

1129 scatter = _not_coalescing(DirectView.scatter) 

1130 gather = _not_coalescing(DirectView.gather) 

1131 

1132 

1133class LazyMapIterator: 

1134 """Iterable representation of a lazy map (imap) 

1135 

1136 Has a `.cancel()` method to stop consuming new inputs. 

1137 

1138 .. versionadded:: 8.0 

1139 """ 

1140 

1141 def __init__(self, gen, signal_done): 

1142 self._gen = gen 

1143 self._signal_done = signal_done 

1144 

1145 def __iter__(self): 

1146 return self._gen 

1147 

1148 def __next__(self): 

1149 return next(self._gen) 

1150 

1151 def cancel(self): 

1152 """Stop consuming the input to the map. 

1153 

1154 Useful to e.g. stop consuming an infinite (or just large) input 

1155 when you've arrived at the result (or error) you needed. 

1156 """ 

1157 self._signal_done() 

1158 

1159 

1160class LoadBalancedView(View): 

1161 """An load-balancing View that only executes via the Task scheduler. 

1162 

1163 Load-balanced views can be created with the client's `view` method: 

1164 

1165 >>> v = client.load_balanced_view() 

1166 

1167 or targets can be specified, to restrict the potential destinations: 

1168 

1169 >>> v = client.load_balanced_view([1,3]) 

1170 

1171 which would restrict loadbalancing to between engines 1 and 3. 

1172 

1173 """ 

1174 

1175 follow = Any() 

1176 after = Any() 

1177 timeout = CFloat() 

1178 retries = Integer(0) 

1179 

1180 _task_scheme = Any() 

1181 _flag_names = List( 

1182 ['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'] 

1183 ) 

1184 _outstanding_maps = Set() 

1185 

1186 def __init__(self, client=None, socket=None, **flags): 

1187 super().__init__(client=client, socket=socket, **flags) 

1188 self._task_scheme = client._task_scheme 

1189 

1190 def _validate_dependency(self, dep): 

1191 """validate a dependency. 

1192 

1193 For use in `set_flags`. 

1194 """ 

1195 if dep is None or isinstance(dep, (str, AsyncResult, Dependency)): 

1196 return True 

1197 elif isinstance(dep, (list, set, tuple)): 

1198 for d in dep: 

1199 if not isinstance(d, (str, AsyncResult)): 

1200 return False 

1201 elif isinstance(dep, dict): 

1202 if set(dep.keys()) != set(Dependency().as_dict().keys()): 

1203 return False 

1204 if not isinstance(dep['msg_ids'], list): 

1205 return False 

1206 for d in dep['msg_ids']: 

1207 if not isinstance(d, str): 

1208 return False 

1209 else: 

1210 return False 

1211 

1212 return True 

1213 

1214 def _render_dependency(self, dep): 

1215 """helper for building jsonable dependencies from various input forms.""" 

1216 if isinstance(dep, Dependency): 

1217 return dep.as_dict() 

1218 elif isinstance(dep, AsyncResult): 

1219 return dep.msg_ids 

1220 elif dep is None: 

1221 return [] 

1222 else: 

1223 # pass to Dependency constructor 

1224 return list(Dependency(dep)) 

1225 

1226 def set_flags(self, **kwargs): 

1227 """set my attribute flags by keyword. 

1228 

1229 A View is a wrapper for the Client's apply method, but with attributes 

1230 that specify keyword arguments, those attributes can be set by keyword 

1231 argument with this method. 

1232 

1233 Parameters 

1234 ---------- 

1235 block : bool 

1236 whether to wait for results 

1237 track : bool 

1238 whether to create a MessageTracker to allow the user to 

1239 safely edit after arrays and buffers during non-copying 

1240 sends. 

1241 after : Dependency or collection of msg_ids 

1242 Only for load-balanced execution (targets=None) 

1243 Specify a list of msg_ids as a time-based dependency. 

1244 This job will only be run *after* the dependencies 

1245 have been met. 

1246 follow : Dependency or collection of msg_ids 

1247 Only for load-balanced execution (targets=None) 

1248 Specify a list of msg_ids as a location-based dependency. 

1249 This job will only be run on an engine where this dependency 

1250 is met. 

1251 timeout : float/int or None 

1252 Only for load-balanced execution (targets=None) 

1253 Specify an amount of time (in seconds) for the scheduler to 

1254 wait for dependencies to be met before failing with a 

1255 DependencyTimeout. 

1256 retries : int 

1257 Number of times a task will be retried on failure. 

1258 """ 

1259 

1260 super().set_flags(**kwargs) 

1261 for name in ('follow', 'after'): 

1262 if name in kwargs: 

1263 value = kwargs[name] 

1264 if self._validate_dependency(value): 

1265 setattr(self, name, value) 

1266 else: 

1267 raise ValueError(f"Invalid dependency: {value!r}") 

1268 if 'timeout' in kwargs: 

1269 t = kwargs['timeout'] 

1270 if not isinstance(t, (int, float, type(None))): 

1271 raise TypeError(f"Invalid type for timeout: {type(t)!r}") 

1272 if t is not None: 

1273 if t < 0: 

1274 raise ValueError(f"Invalid timeout: {t}") 

1275 

1276 self.timeout = t 

1277 

1278 @sync_results 

1279 @save_ids 

1280 def _really_apply( 

1281 self, 

1282 f, 

1283 args=None, 

1284 kwargs=None, 

1285 block=None, 

1286 track=None, 

1287 after=None, 

1288 follow=None, 

1289 timeout=None, 

1290 targets=None, 

1291 retries=None, 

1292 ): 

1293 """calls f(*args, **kwargs) on a remote engine, returning the result. 

1294 

1295 This method temporarily sets all of `apply`'s flags for a single call. 

1296 

1297 Parameters 

1298 ---------- 

1299 f : callable 

1300 args : list [default: empty] 

1301 kwargs : dict [default: empty] 

1302 block : bool [default: self.block] 

1303 whether to block 

1304 track : bool [default: self.track] 

1305 whether to ask zmq to track the message, for safe non-copying sends 

1306 !!!!!! TODO : THE REST HERE !!!! 

1307 

1308 Returns 

1309 ------- 

1310 if self.block is False: 

1311 returns AsyncResult 

1312 else: 

1313 returns actual result of f(*args, **kwargs) on the engine(s) 

1314 This will be a list of self.targets is also a list (even length 1), or 

1315 the single result if self.targets is an integer engine id 

1316 """ 

1317 

1318 # validate whether we can run 

1319 if self._socket.closed(): 

1320 msg = "Task farming is disabled" 

1321 if self._task_scheme == 'pure': 

1322 msg += " because the pure ZMQ scheduler cannot handle" 

1323 msg += " disappearing engines." 

1324 raise RuntimeError(msg) 

1325 

1326 if self._task_scheme == 'pure': 

1327 # pure zmq scheme doesn't support extra features 

1328 msg = "Pure ZMQ scheduler doesn't support the following flags:" 

1329 "follow, after, retries, targets, timeout" 

1330 if follow or after or retries or targets or timeout: 

1331 # hard fail on Scheduler flags 

1332 raise RuntimeError(msg) 

1333 if isinstance(f, dependent): 

1334 # soft warn on functional dependencies 

1335 warnings.warn(msg, RuntimeWarning) 

1336 

1337 # build args 

1338 args = [] if args is None else args 

1339 kwargs = {} if kwargs is None else kwargs 

1340 block = self.block if block is None else block 

1341 track = self.track if track is None else track 

1342 after = self.after if after is None else after 

1343 retries = self.retries if retries is None else retries 

1344 follow = self.follow if follow is None else follow 

1345 timeout = self.timeout if timeout is None else timeout 

1346 targets = self.targets if targets is None else targets 

1347 

1348 if not isinstance(retries, int): 

1349 raise TypeError(f'retries must be int, not {type(retries)!r}') 

1350 

1351 if targets is None: 

1352 idents = [] 

1353 else: 

1354 idents = self.client._build_targets(targets)[0] 

1355 # ensure *not* bytes 

1356 idents = [ident.decode() for ident in idents] 

1357 

1358 after = self._render_dependency(after) 

1359 follow = self._render_dependency(follow) 

1360 metadata = dict( 

1361 after=after, follow=follow, timeout=timeout, targets=idents, retries=retries 

1362 ) 

1363 

1364 future = self.client.send_apply_request( 

1365 self._socket, f, args, kwargs, track=track, metadata=metadata 

1366 ) 

1367 

1368 ar = AsyncResult( 

1369 self.client, 

1370 future, 

1371 fname=getname(f), 

1372 targets=None, 

1373 owner=True, 

1374 ) 

1375 if block: 

1376 try: 

1377 return ar.get() 

1378 except KeyboardInterrupt: 

1379 pass 

1380 return ar 

1381 

1382 @sync_results 

1383 @save_ids 

1384 def map( 

1385 self, 

1386 f, 

1387 *sequences, 

1388 block=None, 

1389 chunksize=1, 

1390 ordered=True, 

1391 return_exceptions=False, 

1392 ): 

1393 """Parallel version of builtin `map`, load-balanced by this View. 

1394 

1395 Each `chunksize` elements will be a separate task, and will be 

1396 load-balanced. This lets individual elements be available for iteration 

1397 as soon as they arrive. 

1398 

1399 .. versionadded:: 7.0 

1400 `return_exceptions` 

1401 

1402 Parameters 

1403 ---------- 

1404 f : callable 

1405 function to be mapped 

1406 *sequences : one or more sequences of matching length 

1407 the sequences to be distributed and passed to `f` 

1408 block : bool [default self.block] 

1409 whether to wait for the result or not 

1410 chunksize : int [default 1] 

1411 how many elements should be in each task. 

1412 ordered : bool [default True] 

1413 Whether the results should be gathered as they arrive, or enforce 

1414 the order of submission. 

1415 

1416 Only applies when iterating through AsyncMapResult as results arrive. 

1417 Has no effect when block=True. 

1418 

1419 return_exceptions: bool [default False] 

1420 Return Exceptions instead of raising on the first exception. 

1421 

1422 Returns 

1423 ------- 

1424 if block=False 

1425 An :class:`~ipyparallel.client.asyncresult.AsyncMapResult` instance. 

1426 An object like AsyncResult, but which reassembles the sequence of results 

1427 into a single list. AsyncMapResults can be iterated through before all 

1428 results are complete. 

1429 else 

1430 A list, the result of ``map(f,*sequences)`` 

1431 """ 

1432 

1433 # default 

1434 if block is None: 

1435 block = self.block 

1436 

1437 assert len(sequences) > 0, "must have some sequences to map onto!" 

1438 

1439 pf = ParallelFunction( 

1440 self, 

1441 f, 

1442 block=block, 

1443 chunksize=chunksize, 

1444 ordered=ordered, 

1445 return_exceptions=return_exceptions, 

1446 ) 

1447 return pf.map(*sequences) 

1448 

1449 def imap( 

1450 self, 

1451 f, 

1452 *sequences, 

1453 ordered=True, 

1454 max_outstanding='auto', 

1455 return_exceptions=False, 

1456 ): 

1457 """Parallel version of lazily-evaluated `imap`, load-balanced by this View. 

1458 

1459 `ordered`, and `max_outstanding` can be specified by keyword only. 

1460 

1461 Unlike other map functions in IPython Parallel, 

1462 this one does not consume the full iterable before submitting work, 

1463 returning a single 'AsyncMapResult' representing the full computation. 

1464 

1465 Instead, it consumes iterables as they come, submitting up to `max_outstanding` 

1466 tasks to the cluster before waiting on results (default: one task per engine). 

1467 This allows it to work with infinite generators, 

1468 and avoid potentially expensive read-ahead for large streams of inputs 

1469 that may not fit in memory all at once. 

1470 

1471 .. versionadded:: 7.0 

1472 

1473 Parameters 

1474 ---------- 

1475 f : callable 

1476 function to be mapped 

1477 *sequences : one or more sequences of matching length 

1478 the sequences to be distributed and passed to `f` 

1479 ordered : bool [default True] 

1480 Whether the results should be yielded on a first-come-first-yield basis, 

1481 or preserve the order of submission. 

1482 

1483 max_outstanding : int [default len(engines)] 

1484 The maximum number of tasks to be outstanding. 

1485 

1486 max_outstanding=0 will greedily consume the whole generator 

1487 (map_async may be more efficient). 

1488 

1489 A limit of 1 should be strictly worse than running a local map, 

1490 as there will be no parallelism. 

1491 

1492 Use this to tune how greedily input generator should be consumed. 

1493 

1494 return_exceptions : bool [default False] 

1495 Return Exceptions instead of raising them. 

1496 

1497 Returns 

1498 ------- 

1499 

1500 lazily-evaluated generator, yielding results of `f` on each item of sequences. 

1501 Yield-order depends on `ordered` argument. 

1502 """ 

1503 

1504 assert len(sequences) > 0, "must have some sequences to map onto!" 

1505 

1506 if max_outstanding == 'auto': 

1507 max_outstanding = len(self) 

1508 

1509 pf = PrePickled(f) 

1510 

1511 map_id = secrets.token_bytes(16) 

1512 

1513 # record that a map is outstanding, mainly for Executor.shutdown 

1514 self._outstanding_maps.add(map_id) 

1515 

1516 def signal_done(): 

1517 nonlocal iterator_done 

1518 iterator_done = True 

1519 self._outstanding_maps.discard(map_id) 

1520 

1521 outstanding_lock = threading.Lock() 

1522 

1523 if ordered: 

1524 outstanding = deque() 

1525 add_outstanding = outstanding.append 

1526 else: 

1527 outstanding = set() 

1528 add_outstanding = outstanding.add 

1529 

1530 def wait_for_ready(): 

1531 while not outstanding and not iterator_done: 

1532 # no outstanding futures, need to wait for something to wait for 

1533 time.sleep(0.1) 

1534 if not outstanding: 

1535 # nothing to wait for, iterator_done is True 

1536 return [] 

1537 

1538 if ordered: 

1539 with outstanding_lock: 

1540 return [outstanding.popleft()] 

1541 else: 

1542 # unordered, yield whatever finishes first, as soon as it's ready 

1543 # repeat with timeout because the consumer thread may be adding to `outstanding` 

1544 with outstanding_lock: 

1545 to_wait = outstanding.copy() 

1546 done, _ = concurrent.futures.wait( 

1547 to_wait, 

1548 return_when=concurrent.futures.FIRST_COMPLETED, 

1549 timeout=0.5, 

1550 ) 

1551 if done: 

1552 with outstanding_lock: 

1553 for f in done: 

1554 outstanding.remove(f) 

1555 return done 

1556 

1557 arg_iterator = iter(zip(*sequences)) 

1558 iterator_done = False 

1559 

1560 # consume inputs in _another_ thread, 

1561 # to avoid blocking the IO thread with a possibly blocking generator 

1562 # only need one thread for this, though. 

1563 consumer_pool = concurrent.futures.ThreadPoolExecutor(1) 

1564 

1565 def consume_callback(f): 

1566 if not iterator_done: 

1567 consumer_pool.submit(consume_next) 

1568 

1569 def consume_next(): 

1570 """Consume the next call from the argument iterator 

1571 

1572 If max_outstanding, schedules consumption when the result finishes. 

1573 If running with no limit, schedules another consumption immediately. 

1574 """ 

1575 nonlocal iterator_done 

1576 if iterator_done: 

1577 return 

1578 

1579 try: 

1580 args = next(arg_iterator) 

1581 ar = self.apply_async(pf, *args) 

1582 except StopIteration: 

1583 signal_done() 

1584 return 

1585 except Exception as e: 

1586 # exception consuming iterator, propagate 

1587 ar = concurrent.futures.Future() 

1588 # mock get so it gets re-raised when awaited 

1589 ar.get = lambda *args: ar.result() 

1590 ar.set_exception(e) 

1591 with outstanding_lock: 

1592 add_outstanding(ar) 

1593 signal_done() 

1594 return 

1595 

1596 with outstanding_lock: 

1597 add_outstanding(ar) 

1598 if max_outstanding: 

1599 ar.add_done_callback(consume_callback) 

1600 else: 

1601 consumer_pool.submit(consume_next) 

1602 

1603 # kick it off 

1604 # only need one if not using max_outstanding, 

1605 # as each eventloop tick will submit a new item 

1606 # otherwise, start one consumer for each slot, which will chain 

1607 kickoff_count = 1 if max_outstanding == 0 else max_outstanding 

1608 submit_futures = [] 

1609 for i in range(kickoff_count): 

1610 submit_futures.append(consumer_pool.submit(consume_next)) 

1611 

1612 # await the first one, just in case it raises 

1613 try: 

1614 submit_futures[0].result() 

1615 except Exception: 

1616 # make sure we clean up 

1617 signal_done() 

1618 raise 

1619 del submit_futures 

1620 

1621 # wrap result-yielding in another call 

1622 # because if this function is itself a generator 

1623 # the first submission won't happen until the first result is requested 

1624 def iter_results(): 

1625 nonlocal outstanding 

1626 with consumer_pool: 

1627 while not iterator_done: 

1628 # yield results as they become ready 

1629 for ready_ar in wait_for_ready(): 

1630 yield ready_ar.get(return_exceptions=return_exceptions) 

1631 

1632 # yield any remaining results 

1633 if ordered: 

1634 for ar in outstanding: 

1635 yield ar.get(return_exceptions=return_exceptions) 

1636 else: 

1637 while outstanding: 

1638 done, outstanding = concurrent.futures.wait( 

1639 outstanding, return_when=concurrent.futures.FIRST_COMPLETED 

1640 ) 

1641 for ar in done: 

1642 yield ar.get(return_exceptions=return_exceptions) 

1643 

1644 return LazyMapIterator(iter_results(), signal_done) 

1645 

1646 def register_joblib_backend(self, name='ipyparallel', make_default=False): 

1647 """Register this View as a joblib parallel backend 

1648 

1649 To make this the default backend, set make_default=True. 

1650 

1651 Use with:: 

1652 

1653 p = Parallel(backend='ipyparallel') 

1654 ... 

1655 

1656 See joblib docs for details 

1657 

1658 Requires joblib >= 0.10 

1659 

1660 .. versionadded:: 5.1 

1661 """ 

1662 from joblib.parallel import register_parallel_backend 

1663 

1664 from ._joblib import IPythonParallelBackend 

1665 

1666 register_parallel_backend( 

1667 name, 

1668 lambda **kwargs: IPythonParallelBackend(view=self, **kwargs), 

1669 make_default=make_default, 

1670 ) 

1671 

1672 

1673class ViewExecutor(concurrent.futures.Executor): 

1674 """A PEP-3148 Executor API for Views 

1675 

1676 Access as view.executor 

1677 """ 

1678 

1679 def __init__(self, view): 

1680 self.view = view 

1681 self._max_workers = len(self.view) 

1682 

1683 def submit(self, fn, *args, **kwargs): 

1684 """Same as View.apply_async""" 

1685 return self.view.apply_async(fn, *args, **kwargs) 

1686 

1687 def map(self, func, *iterables, **kwargs): 

1688 """Return generator for View.map_async""" 

1689 if 'timeout' in kwargs: 

1690 warnings.warn("timeout unsupported in ViewExecutor.map") 

1691 kwargs.pop('timeout') 

1692 return self.view.imap(func, *iterables, **kwargs) 

1693 

1694 def shutdown(self, wait=True): 

1695 """ViewExecutor does *not* shutdown engines 

1696 

1697 results are awaited if wait=True, but engines are *not* shutdown. 

1698 """ 

1699 if wait: 

1700 # wait for *submission* of outstanding maps, 

1701 # otherwise view.wait won't know what to wait for 

1702 outstanding_maps = getattr(self.view, "_outstanding_maps") 

1703 if outstanding_maps: 

1704 while outstanding_maps: 

1705 time.sleep(0.1) 

1706 self.view.wait() 

1707 

1708 

1709__all__ = ['LoadBalancedView', 'DirectView', 'ViewExecutor', 'BroadcastView']