Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipyparallel/cluster/cluster.py: 4%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

513 statements  

1"""Cluster class 

2 

3defines the basic interface to a single IPython Parallel cluster 

4 

5starts/stops/polls controllers, engines, etc. 

6""" 

7 

8import asyncio 

9import atexit 

10import glob 

11import inspect 

12import json 

13import logging 

14import os 

15import random 

16import string 

17import sys 

18import time 

19import traceback 

20from functools import partial 

21from multiprocessing import cpu_count 

22from weakref import WeakSet 

23 

24import IPython 

25from traitlets import ( 

26 Any, 

27 Bool, 

28 Dict, 

29 Float, 

30 Instance, 

31 Integer, 

32 List, 

33 Unicode, 

34 default, 

35 import_item, 

36 validate, 

37) 

38from traitlets.config import Application, Config, LoggingConfigurable 

39 

40from .._async import AsyncFirst 

41from ..traitlets import Launcher 

42from ..util import ( 

43 _all_profile_dirs, 

44 _default_profile_dir, 

45 _locate_profiles, 

46 _traitlet_signature, 

47 abbreviate_profile_dir, 

48) 

49from . import launcher 

50 

51_suffix_chars = string.ascii_lowercase + string.digits 

52 

53# weak set of clusters to be cleaned up at exit 

54_atexit_clusters = WeakSet() 

55 

56 

57def _atexit_cleanup_clusters(*args): 

58 """Cleanup clusters during process shutdown""" 

59 for cluster in _atexit_clusters: 

60 if not cluster.shutdown_atexit: 

61 # overridden after register 

62 continue 

63 if cluster.controller or cluster.engines: 

64 print(f"Stopping cluster {cluster}", file=sys.stderr) 

65 try: 

66 cluster.stop_cluster_sync() 

67 except Exception: 

68 print(f"Error stopping cluster {cluster}", file=sys.stderr) 

69 traceback.print_exception(*sys.exc_info()) 

70 

71 

72_atexit_cleanup_clusters.registered = False 

73 

74 

75@_traitlet_signature 

76class Cluster(AsyncFirst, LoggingConfigurable): 

77 """Class representing an IPP cluster 

78 

79 i.e. one controller and one or more groups of engines 

80 

81 Can start/stop/monitor/poll cluster resources 

82 

83 All async methods can be called synchronously with a `_sync` suffix, 

84 e.g. `cluster.start_cluster_sync()` 

85 

86 .. versionchanged:: 8.0 

87 controller and engine launcher classes can be specified via 

88 `Cluster(controller='ssh', engines='mpi')` 

89 without the `_launcher_class` suffix. 

90 """ 

91 

92 # general configuration 

93 

94 shutdown_atexit = Bool( 

95 True, 

96 help=""" 

97 Shutdown the cluster at process exit. 

98 

99 Set to False if you want to launch a cluster and leave it running 

100 after the launching process exits. 

101 """, 

102 ) 

103 

104 cluster_id = Unicode(help="The id of the cluster (default: random string)").tag( 

105 to_dict=True 

106 ) 

107 

108 @default("cluster_id") 

109 def _default_cluster_id(self): 

110 return f"{int(time.time())}-{''.join(random.choice(_suffix_chars) for i in range(4))}" 

111 

112 profile_dir = Unicode( 

113 help="""The profile directory. 

114 

115 Default priority: 

116 

117 - specified explicitly 

118 - current IPython session 

119 - use profile name (default: 'default') 

120 

121 """ 

122 ).tag(to_dict=True) 

123 

124 @default("profile_dir") 

125 def _default_profile_dir(self): 

126 return _default_profile_dir(profile=self.profile) 

127 

128 @validate("profile_dir") 

129 def _validate_profile_dir(self, proposal): 

130 path = proposal.value 

131 if path: 

132 return os.path.abspath(path) 

133 return path 

134 

135 profile = Unicode( 

136 "", 

137 help="""The profile name, 

138 a shortcut for specifying profile_dir within $IPYTHONDIR.""", 

139 ) 

140 

141 cluster_file = Unicode( 

142 help="The path to the cluster file for saving this cluster to disk" 

143 ) 

144 

145 @default("cluster_file") 

146 def _default_cluster_file(self): 

147 return os.path.join( 

148 self.profile_dir, "security", f"cluster-{self.cluster_id}.json" 

149 ) 

150 

151 engine_timeout = Integer( 

152 60, 

153 help="""Timeout to use when waiting for engines to register 

154 

155 before giving up. 

156 """, 

157 config=True, 

158 ) 

159 

160 send_engines_connection_env = Bool( 

161 True, 

162 config=True, 

163 help=""" 

164 Wait for controller's connection info before passing to engines 

165 via $IPP_CONNECTION_INFO environment variable. 

166 

167 Set to False to start engines immediately 

168 without waiting for the controller's connection info to be available. 

169 

170 When True, no connection file movement is required. 

171 False is mainly useful when submitting the controller may 

172 take a long time in a job queue, 

173 and the engines should enter the queue before the controller is running. 

174 

175 .. versionadded:: 8.0 

176 """, 

177 ) 

178 

179 controller_launcher_class = Launcher( 

180 default_value=launcher.LocalControllerLauncher, 

181 entry_point_group='ipyparallel.controller_launchers', 

182 help="""The class for launching a Controller. Change this value if you want 

183 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc. 

184 

185 Each launcher class has its own set of configuration options, for making sure 

186 it will work in your environment. 

187 

188 Note that using a batch launcher for the controller *does not* put it 

189 in the same batch job as the engines, so they will still start separately. 

190 

191 Third-party engine launchers can be registered via `ipyparallel.engine_launchers` entry point. 

192 

193 They can be selected via case-insensitive abbreviation, e.g. 

194 

195 c.Cluster.controller_launcher_class = 'SSH' 

196 

197 or: 

198 

199 ipcluster start --controller=MPI 

200 

201 """, 

202 config=True, 

203 ).tag(alias="controller") 

204 

205 engine_launcher_class = Launcher( 

206 default_value=launcher.LocalEngineSetLauncher, 

207 entry_point_group='ipyparallel.engine_launchers', 

208 help="""The class for launching a set of Engines. Change this value 

209 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc. 

210 Each launcher class has its own set of configuration options, for making sure 

211 it will work in your environment. 

212 

213 Third-party engine launchers can be registered via `ipyparallel.engine_launchers` entry point. 

214 

215 They can be selected via case-insensitive abbreviation, e.g. 

216 

217 c.Cluster.engine_launcher_class = 'ssh' 

218 

219 or: 

220 

221 ipcluster start --engines=mpi 

222 

223 """, 

224 config=True, 

225 ).tag(alias="engines") 

226 

227 # controller configuration 

228 

229 controller_args = List( 

230 Unicode(), 

231 config=True, 

232 help="Additional CLI args to pass to the controller.", 

233 ).tag(to_dict=True) 

234 controller_ip = Unicode( 

235 config=True, help="Set the IP address of the controller." 

236 ).tag(to_dict=True) 

237 controller_location = Unicode( 

238 config=True, 

239 help="""Set the location (hostname or ip) of the controller. 

240 

241 This is used by engines and clients to locate the controller 

242 when the controller listens on all interfaces 

243 """, 

244 ).tag(to_dict=True) 

245 

246 # engine configuration 

247 

248 delay = Float( 

249 1.0, 

250 config=True, 

251 help="delay (in s) between starting the controller and the engines", 

252 ).tag(to_dict=True) 

253 

254 n = Integer( 

255 None, allow_none=True, config=True, help="The number of engines to start" 

256 ).tag(to_dict=True) 

257 

258 @default("parent") 

259 def _default_parent(self): 

260 """Default to inheriting config from current IPython session""" 

261 return IPython.get_ipython() 

262 

263 log_level = Integer(logging.INFO) 

264 

265 @default("log") 

266 def _default_log(self): 

267 if self.parent and self.parent is IPython.get_ipython(): 

268 # log to stdout in an IPython session 

269 log = logging.getLogger(f"{__name__}.{self.cluster_id}") 

270 log.setLevel(self.log_level) 

271 

272 handler = logging.StreamHandler(sys.stdout) 

273 log.handlers = [handler] 

274 log.propagate = False 

275 return log 

276 elif self.parent and getattr(self.parent, 'log', None) is not None: 

277 return self.parent.log 

278 elif Application.initialized(): 

279 return Application.instance().log 

280 else: 

281 # set up our own logger 

282 log = logging.getLogger(f"{__name__}.{self.cluster_id}") 

283 log.setLevel(self.log_level) 

284 return log 

285 

286 load_profile = Bool( 

287 True, 

288 config=True, 

289 help=""" 

290 If True (default) load ipcluster config from profile directory, if present. 

291 """, 

292 ) 

293 # private state 

294 controller = Any().tag(nosignature=True) 

295 engines = Dict().tag(nosignature=True) 

296 

297 @property 

298 def engine_set(self): 

299 """Return the first engine set 

300 

301 Most clusters have only one engine set, 

302 which is tedious to get to via the `engines` dict 

303 with random engine set ids. 

304 

305 ..versionadded:: 8.0 

306 """ 

307 if self.engines: 

308 return next(iter(self.engines.values())) 

309 

310 profile_config = Instance(Config, allow_none=False).tag(nosignature=True) 

311 

312 @default("profile_config") 

313 def _profile_config_default(self): 

314 """Load config from our profile""" 

315 if not self.load_profile or not os.path.isdir(self.profile_dir): 

316 # no profile dir, nothing to load 

317 return Config() 

318 

319 from .app import BaseParallelApplication, IPClusterStart 

320 

321 # look up if we are descended from an 'ipcluster' app 

322 # avoids repeated load of the current profile dir 

323 parents = [] 

324 parent = self.parent 

325 while parent is not None: 

326 parents.append(parent) 

327 parent = parent.parent 

328 

329 app_parents = list( 

330 filter(lambda p: isinstance(p, BaseParallelApplication), parents) 

331 ) 

332 if app_parents: 

333 app_parent = app_parents[0] 

334 else: 

335 app_parent = None 

336 

337 if ( 

338 app_parent 

339 and app_parent.name == 'ipcluster' 

340 and app_parent.profile_dir.location == self.profile_dir 

341 ): 

342 # profile config already loaded by parent, nothing new to load 

343 return Config() 

344 

345 self.log.debug(f"Loading profile {self.profile_dir}") 

346 # set profile dir via config 

347 config = Config() 

348 config.ProfileDir.location = self.profile_dir 

349 

350 # load profile config via IPCluster 

351 app = IPClusterStart(config=config, log=self.log) 

352 # adds profile dir to config_files_path 

353 app.init_profile_dir() 

354 # adds system to config_files_path 

355 app.init_config_files() 

356 # actually load the config 

357 app.load_config_file(suppress_errors=False) 

358 return app.config 

359 

360 @validate("config") 

361 def _merge_profile_config(self, proposal): 

362 direct_config = proposal.value 

363 if not self.load_profile: 

364 return direct_config 

365 profile_config = self.profile_config 

366 if not profile_config: 

367 return direct_config 

368 # priority ?! direct > profile 

369 config = Config() 

370 if profile_config: 

371 config.merge(profile_config) 

372 config.merge(direct_config) 

373 return config 

374 

375 @default("config") 

376 def _default_config(self): 

377 if self.load_profile: 

378 return self.profile_config 

379 else: 

380 return Config() 

381 

382 def __init__(self, *, engines=None, controller=None, **kwargs): 

383 """Construct a Cluster""" 

384 # handle more intuitive aliases, which match ipcluster cli args, etc. 

385 if engines is not None: 

386 if 'engine_launcher_class' in kwargs: 

387 raise TypeError( 

388 "Only specify one of 'engines' or 'engine_launcher_class', not both" 

389 ) 

390 kwargs['engine_launcher_class'] = engines 

391 if controller is not None: 

392 if 'controller_launcher_class' in kwargs: 

393 raise TypeError( 

394 "Only specify one of 'controller' or 'controller_launcher_class', not both" 

395 ) 

396 kwargs['controller_launcher_class'] = controller 

397 if 'parent' not in kwargs and 'config' not in kwargs: 

398 kwargs['parent'] = self._default_parent() 

399 

400 super().__init__(**kwargs) 

401 

402 def __del__(self): 

403 if not self.shutdown_atexit: 

404 return 

405 if self.controller or self.engines: 

406 self.stop_cluster_sync() 

407 

408 def __repr__(self): 

409 fields = { 

410 "cluster_id": repr(self.cluster_id), 

411 } 

412 profile_dir = self.profile_dir 

413 profile_prefix = os.path.join(IPython.paths.get_ipython_dir(), "profile_") 

414 if profile_dir.startswith(profile_prefix): 

415 fields["profile"] = repr(profile_dir[len(profile_prefix) :]) 

416 else: 

417 home_dir = os.path.expanduser("~") 

418 

419 if profile_dir.startswith(home_dir + os.path.sep): 

420 # truncate $HOME/. -> ~/... 

421 profile_dir = "~" + profile_dir[len(home_dir) :] 

422 fields["profile_dir"] = repr(profile_dir) 

423 

424 if self.controller: 

425 fields["controller"] = f"<{self.controller.state}>" 

426 if self.engines: 

427 fields["engine_sets"] = list(self.engines) 

428 

429 fields_str = ', '.join(f"{key}={value}" for key, value in fields.items()) 

430 

431 return f"<{self.__class__.__name__}({fields_str})>" 

432 

433 def to_dict(self): 

434 """Serialize a Cluster object for later reconstruction""" 

435 cluster_info = {} 

436 d = {"cluster": cluster_info} 

437 for attr in self.traits(to_dict=True): 

438 cluster_info[attr] = getattr(self, attr) 

439 

440 def _cls_str(cls): 

441 return f"{cls.__module__}.{cls.__name__}" 

442 

443 cluster_info["class"] = _cls_str(self.__class__) 

444 

445 if self.controller and self.controller.state != 'after': 

446 d["controller"] = { 

447 "class": launcher.abbreviate_launcher_class( 

448 self.controller_launcher_class 

449 ), 

450 "state": None, 

451 } 

452 d["controller"]["state"] = self.controller.to_dict() 

453 

454 d["engines"] = { 

455 "class": launcher.abbreviate_launcher_class(self.engine_launcher_class), 

456 "sets": {}, 

457 } 

458 sets = d["engines"]["sets"] 

459 for engine_set_id, engine_launcher in self.engines.items(): 

460 if engine_launcher.state != 'after': 

461 sets[engine_set_id] = engine_launcher.to_dict() 

462 return d 

463 

464 @classmethod 

465 def from_dict(cls, d, **kwargs): 

466 """Construct a Cluster from serialized state""" 

467 cluster_info = d["cluster"] 

468 if cluster_info.get("class"): 

469 specified_cls = import_item(cluster_info["class"]) 

470 if specified_cls is not cls: 

471 # specified a custom Cluster class, 

472 # dispatch to from_dict from that class 

473 return specified_cls.from_dict(d, **kwargs) 

474 

475 kwargs.setdefault("shutdown_atexit", False) 

476 self = cls(**kwargs) 

477 for attr in self.traits(to_dict=True): 

478 if attr in cluster_info: 

479 setattr(self, attr, cluster_info[attr]) 

480 

481 for attr in self.traits(to_dict=True): 

482 if attr in d: 

483 setattr(self, attr, d[attr]) 

484 

485 cluster_key = ClusterManager._cluster_key(self) 

486 

487 if d.get("controller"): 

488 controller_info = d["controller"] 

489 self.controller_launcher_class = controller_info["class"] 

490 # after traitlet coercion, which imports strings 

491 cls = self.controller_launcher_class 

492 if controller_info["state"]: 

493 try: 

494 self.controller = cls.from_dict( 

495 controller_info["state"], parent=self 

496 ) 

497 except launcher.NotRunning as e: 

498 self.log.error(f"Controller for {cluster_key} not running: {e}") 

499 else: 

500 self.controller.on_stop(self._controller_stopped) 

501 

502 engine_info = d.get("engines") 

503 if engine_info: 

504 self.engine_launcher_class = engine_info["class"] 

505 # after traitlet coercion, which imports strings 

506 cls = self.engine_launcher_class 

507 for engine_set_id, engine_state in engine_info.get("sets", {}).items(): 

508 try: 

509 self.engines[engine_set_id] = engine_set = cls.from_dict( 

510 engine_state, 

511 engine_set_id=engine_set_id, 

512 parent=self, 

513 ) 

514 except launcher.NotRunning as e: 

515 self.log.error( 

516 f"Engine set {cluster_key}{engine_set_id} not running: {e}" 

517 ) 

518 else: 

519 engine_set.on_stop(partial(self._engines_stopped, engine_set_id)) 

520 

521 # check if state changed 

522 if self.to_dict() != d: 

523 # if so, update our cluster file 

524 self.update_cluster_file() 

525 return self 

526 

527 @classmethod 

528 def from_file( 

529 cls, 

530 cluster_file=None, 

531 *, 

532 profile=None, 

533 profile_dir=None, 

534 cluster_id='', 

535 **kwargs, 

536 ): 

537 """Load a Cluster object from a file 

538 

539 Can specify a full path, 

540 or combination of profile, profile_dir, and/or cluster_id. 

541 

542 With no arguments given, it will connect to a cluster created 

543 with `ipcluster start`. 

544 """ 

545 

546 if cluster_file is None: 

547 # determine cluster_file from profile/profile_dir 

548 

549 kwargs['cluster_id'] = cluster_id 

550 if profile is not None: 

551 kwargs['profile'] = profile 

552 if profile_dir is not None: 

553 kwargs['profile_dir'] = profile_dir 

554 cluster_file = Cluster(**kwargs).cluster_file 

555 

556 # ensure from_file preserves cluster_file, even if it moved 

557 kwargs.setdefault("cluster_file", cluster_file) 

558 with open(cluster_file) as f: 

559 return cls.from_dict(json.load(f), **kwargs) 

560 

561 def write_cluster_file(self): 

562 """Write cluster info to disk for later loading""" 

563 os.makedirs(os.path.dirname(self.cluster_file), exist_ok=True) 

564 self.log.debug(f"Updating {self.cluster_file}") 

565 with open(self.cluster_file, "w") as f: 

566 json.dump(self.to_dict(), f) 

567 

568 def remove_cluster_file(self): 

569 """Remove my cluster file.""" 

570 try: 

571 os.remove(self.cluster_file) 

572 except FileNotFoundError: 

573 pass 

574 else: 

575 self.log.debug(f"Removed cluster file: {self.cluster_file}") 

576 

577 def _is_running(self): 

578 """Return if we have any running components""" 

579 if self.controller and self.controller.state != 'after': 

580 return True 

581 if any(es.state != 'after' for es in self.engines.values()): 

582 return True 

583 return False 

584 

585 def update_cluster_file(self): 

586 """Update my cluster file 

587 

588 If cluster_file is disabled, do nothing 

589 If cluster is fully stopped, remove the file 

590 """ 

591 if not self.cluster_file: 

592 # setting cluster_file='' disables saving to disk 

593 return 

594 

595 if not self._is_running(): 

596 self.remove_cluster_file() 

597 else: 

598 self.write_cluster_file() 

599 

600 async def start_controller(self, **kwargs): 

601 """Start the controller 

602 

603 Keyword arguments are passed to the controller launcher constructor 

604 """ 

605 # start controller 

606 # retrieve connection info 

607 # webhook? 

608 if self.controller is not None: 

609 raise RuntimeError( 

610 "controller is already running. Call stopcontroller() first." 

611 ) 

612 

613 if self.shutdown_atexit: 

614 _atexit_clusters.add(self) 

615 if not _atexit_cleanup_clusters.registered: 

616 atexit.register(_atexit_cleanup_clusters) 

617 

618 self.controller = controller = self.controller_launcher_class( 

619 work_dir='.', 

620 parent=self, 

621 log=self.log, 

622 profile_dir=self.profile_dir, 

623 cluster_id=self.cluster_id, 

624 **kwargs, 

625 ) 

626 

627 controller_args = getattr(controller, 'controller_args', None) 

628 if controller_args is None: 

629 

630 def add_args(args): 

631 # only some Launchers support modifying controller args 

632 self.log.warning( 

633 "Not adding controller args %s. " 

634 "controller_args passthrough is not supported by %s", 

635 args, 

636 self.controller_launcher_class.__name__, 

637 ) 

638 

639 else: 

640 # copy to make sure change events fire 

641 controller_args = list(controller_args) 

642 add_args = controller_args.extend 

643 

644 if self.controller_ip: 

645 add_args([f'--ip={self.controller_ip}']) 

646 if self.controller_location: 

647 add_args([f'--location={self.controller_location}']) 

648 if self.controller_args: 

649 add_args(self.controller_args) 

650 

651 if controller_args is not None: 

652 # ensure we trigger trait observers after we are done 

653 self.controller.controller_args = list(controller_args) 

654 

655 self.controller.on_stop(self._controller_stopped) 

656 r = self.controller.start() 

657 if inspect.isawaitable(r): 

658 await r 

659 

660 self.update_cluster_file() 

661 

662 def _controller_stopped(self, stop_data=None): 

663 """Callback when a controller stops""" 

664 if stop_data and stop_data.get("exit_code"): 

665 log = self.log.warning 

666 else: 

667 log = self.log.info 

668 log(f"Controller stopped: {stop_data}") 

669 self.update_cluster_file() 

670 

671 def _new_engine_set_id(self): 

672 """Generate a new engine set id""" 

673 engine_set_id = base = f"{int(time.time())}" 

674 i = 1 

675 while engine_set_id in self.engines: 

676 engine_set_id = f"{base}-{i}" 

677 i += 1 

678 return engine_set_id 

679 

680 async def start_engines(self, n=None, engine_set_id=None, **kwargs): 

681 """Start an engine set 

682 

683 Returns an engine set id which can be used in stop_engines 

684 """ 

685 # TODO: send engines connection info 

686 if engine_set_id is None: 

687 engine_set_id = self._new_engine_set_id() 

688 engine_set = self.engines[engine_set_id] = self.engine_launcher_class( 

689 work_dir='.', 

690 parent=self, 

691 log=self.log, 

692 profile_dir=self.profile_dir, 

693 cluster_id=self.cluster_id, 

694 engine_set_id=engine_set_id, 

695 **kwargs, 

696 ) 

697 if self.send_engines_connection_env and self.controller: 

698 self.log.debug("Setting $IPP_CONNECTION_INFO environment") 

699 connection_info = await self.controller.get_connection_info() 

700 connection_info_json = json.dumps(connection_info["engine"]) 

701 engine_set.environment["IPP_CONNECTION_INFO"] = connection_info_json 

702 

703 if n is None: 

704 n = self.n 

705 n = getattr(engine_set, 'engine_count', n) 

706 if n is None: 

707 n = cpu_count() 

708 self.log.info(f"Starting {n or ''} engines with {self.engine_launcher_class}") 

709 r = engine_set.start(n) 

710 engine_set.on_stop(partial(self._engines_stopped, engine_set_id)) 

711 if inspect.isawaitable(r): 

712 await r 

713 self.update_cluster_file() 

714 return engine_set_id 

715 

716 def _engines_stopped(self, engine_set_id, stop_data=None): 

717 if stop_data and stop_data.get("exit_code"): 

718 log = self.log.warning 

719 else: 

720 log = self.log.info 

721 log(f"engine set stopped {engine_set_id}: {stop_data}") 

722 self.update_cluster_file() 

723 

724 async def start_and_connect(self, n=None, activate=False): 

725 """Single call to start a cluster and connect a client 

726 

727 If `activate` is given, a blocking DirectView on all engines will be created 

728 and activated, registering `%px` magics for use in IPython 

729 

730 Example:: 

731 

732 rc = await Cluster(engines="mpi").start_and_connect(n=8, activate=True) 

733 

734 %px print("hello, world!") 

735 

736 Equivalent to:: 

737 

738 await self.start_cluster(n) 

739 client = await self.connect_client() 

740 await client.wait_for_engines(n, block=False) 

741 

742 .. versionadded:: 7.1 

743 

744 .. versionadded:: 8.1 

745 

746 activate argument. 

747 """ 

748 if n is None: 

749 n = self.n 

750 await self.start_cluster(n=n) 

751 client = await self.connect_client() 

752 

753 if n is None: 

754 # number of engines to wait for 

755 # if not specified, derive current value from EngineSets 

756 n = sum(engine_set.n for engine_set in self.engines.values()) 

757 

758 if n: 

759 await asyncio.wrap_future( 

760 client.wait_for_engines(n, block=False, timeout=self.engine_timeout) 

761 ) 

762 

763 if activate: 

764 view = client[:] 

765 view.block = True 

766 view.activate() 

767 return client 

768 

769 async def start_cluster(self, n=None): 

770 """Start a cluster 

771 

772 starts one controller and n engines (default: self.n) 

773 

774 .. versionchanged:: 7.1 

775 return self, to allow method chaining 

776 """ 

777 if self.controller is None: 

778 await self.start_controller() 

779 if self.delay: 

780 await asyncio.sleep(self.delay) 

781 await self.start_engines(n) 

782 # return self to allow chaining 

783 return self 

784 

785 async def stop_engines(self, engine_set_id=None): 

786 """Stop an engine set 

787 

788 If engine_set_id is not given, 

789 all engines are stopped. 

790 """ 

791 if engine_set_id is None: 

792 futures = [] 

793 for engine_set_id in list(self.engines): 

794 futures.append(self.stop_engines(engine_set_id)) 

795 if futures: 

796 await asyncio.gather(*futures) 

797 return 

798 self.log.info(f"Stopping engine(s): {engine_set_id}") 

799 engine_set = self.engines[engine_set_id] 

800 r = engine_set.stop() 

801 if inspect.isawaitable(r): 

802 await r 

803 # retrieve and cleanup output files 

804 engine_set.get_output(remove=True) 

805 self.engines.pop(engine_set_id) 

806 self.update_cluster_file() 

807 

808 async def stop_engine(self, engine_id): 

809 """Stop one engine 

810 

811 *May* stop all engines in a set, 

812 depending on EngineSet features (e.g. mpiexec) 

813 """ 

814 raise NotImplementedError("How do we find an engine by id?") 

815 

816 async def restart_engines(self, engine_set_id=None): 

817 """Restart an engine set""" 

818 if engine_set_id is None: 

819 for engine_set_id in list(self.engines): 

820 await self.restart_engines(engine_set_id) 

821 return 

822 engine_set = self.engines[engine_set_id] 

823 n = engine_set.n 

824 await self.stop_engines(engine_set_id) 

825 await self.start_engines(n, engine_set_id) 

826 

827 async def restart_engine(self, engine_id): 

828 """Restart one engine 

829 

830 *May* stop all engines in a set, 

831 depending on EngineSet features (e.g. mpiexec) 

832 """ 

833 raise NotImplementedError("How do we find an engine by id?") 

834 

835 async def signal_engine(self, signum, engine_id): 

836 """Signal one engine 

837 

838 *May* signal all engines in a set, 

839 depending on EngineSet features (e.g. mpiexec) 

840 """ 

841 raise NotImplementedError("How do we find an engine by id?") 

842 

843 async def signal_engines(self, signum, engine_set_id=None): 

844 """Signal all engines in a set 

845 

846 If no engine set is specified, signal all engine sets. 

847 """ 

848 if engine_set_id is None: 

849 for engine_set_id in list(self.engines): 

850 await self.signal_engines(signum, engine_set_id) 

851 return 

852 self.log.info(f"Sending signal {signum} to engine(s) {engine_set_id}") 

853 engine_set = self.engines[engine_set_id] 

854 r = engine_set.signal(signum) 

855 if inspect.isawaitable(r): 

856 await r 

857 

858 async def stop_controller(self): 

859 """Stop the controller""" 

860 if self.controller and self.controller.running: 

861 self.log.info("Stopping controller") 

862 r = self.controller.stop() 

863 if inspect.isawaitable(r): 

864 await r 

865 

866 if self.controller: 

867 self.controller.get_output(remove=True) 

868 

869 self.controller = None 

870 self.update_cluster_file() 

871 

872 async def stop_cluster(self): 

873 """Stop the controller and all engines""" 

874 await asyncio.gather(self.stop_controller(), self.stop_engines()) 

875 

876 async def connect_client(self, **client_kwargs): 

877 """Return a client connected to the cluster""" 

878 # TODO: get connect info directly from controller 

879 # this assumes local files exist 

880 from ipyparallel import Client 

881 

882 connection_info = self.controller.get_connection_info() 

883 if inspect.isawaitable(connection_info): 

884 connection_info = await connection_info 

885 

886 return Client( 

887 connection_info['client'], 

888 cluster=self, 

889 profile_dir=self.profile_dir, 

890 cluster_id=self.cluster_id, 

891 **client_kwargs, 

892 ) 

893 

894 # context managers (both async and sync) 

895 _context_client = None 

896 

897 async def __aenter__(self): 

898 client = self._context_client = await self.start_and_connect() 

899 return client 

900 

901 async def __aexit__(self, *args): 

902 if self._context_client is not None: 

903 self._context_client.close() 

904 self._context_client = None 

905 await self.stop_engines() 

906 await self.stop_controller() 

907 

908 def __enter__(self): 

909 client = self._context_client = self.start_and_connect_sync() 

910 return client 

911 

912 def __exit__(self, *args): 

913 if self._context_client: 

914 self._context_client.close() 

915 self._context_client = None 

916 self.stop_engines_sync() 

917 self.stop_controller_sync() 

918 

919 

920class ClusterManager(LoggingConfigurable): 

921 """A manager of clusters 

922 

923 Wraps Cluster, adding lookup/list by cluster id 

924 """ 

925 

926 clusters = Dict(help="My cluster objects") 

927 

928 @staticmethod 

929 def _cluster_key(cluster): 

930 """Return a unique cluster key for a cluster 

931 

932 Default is {profile}:{cluster_id} 

933 """ 

934 return f"{abbreviate_profile_dir(cluster.profile_dir)}:{cluster.cluster_id}" 

935 

936 @staticmethod 

937 def _cluster_files_in_profile_dir(profile_dir): 

938 """List clusters in a profile directory 

939 

940 Returns list of cluster *files* 

941 """ 

942 return glob.glob(os.path.join(profile_dir, "security", "cluster-*.json")) 

943 

944 def load_clusters( 

945 self, 

946 *, 

947 profile_dirs=None, 

948 profile_dir=None, 

949 profiles=None, 

950 profile=None, 

951 init_default_clusters=False, 

952 **kwargs, 

953 ): 

954 """Populate a ClusterManager from cluster files on disk 

955 

956 Load all cluster objects from the given profile directory(ies). 

957 

958 Default is to find clusters in all IPython profiles, 

959 but profile directories or profile names can be specified explicitly. 

960 

961 If `init_default_clusters` is True, 

962 a stopped Cluster object is loaded for every profile dir 

963 with cluster_id="" if no running cluster is found. 

964 

965 Priority: 

966 

967 - profile_dirs list 

968 - single profile_dir 

969 - profiles list by name 

970 - single profile by name 

971 - all IPython profiles, if nothing else specified 

972 """ 

973 

974 # first, check our current clusters 

975 for key, cluster in list(self.clusters.items()): 

976 # remove stopped clusters 

977 # but not *new* clusters that haven't started yet 

978 # if `cluster.controller` is present 

979 # that means it was running at some point 

980 if cluster.controller and not cluster._is_running(): 

981 self.log.info(f"Removing stopped cluster {key}") 

982 self.clusters.pop(key) 

983 

984 if profile_dirs is None: 

985 if profile_dir is not None: 

986 profile_dirs = [profile_dir] 

987 else: 

988 if profiles is None: 

989 if profile is not None: 

990 profiles = [profile] 

991 

992 if profiles is not None: 

993 profile_dirs = _locate_profiles(profiles) 

994 

995 if profile_dirs is None: 

996 # totally unspecified, default to all 

997 profile_dirs = _all_profile_dirs() 

998 

999 by_cluster_file = {c.cluster_file: c for c in self.clusters.values()} 

1000 for profile_dir in profile_dirs: 

1001 cluster_files = self._cluster_files_in_profile_dir(profile_dir) 

1002 # load default cluster for each profile 

1003 # TODO: only if it has any ipyparallel config files 

1004 # *or* it's the default profile 

1005 if init_default_clusters and not cluster_files: 

1006 cluster = Cluster(profile_dir=profile_dir, cluster_id="") 

1007 cluster_key = self._cluster_key(cluster) 

1008 if cluster_key not in self.clusters: 

1009 self.clusters[cluster_key] = cluster 

1010 

1011 for cluster_file in cluster_files: 

1012 if cluster_file in by_cluster_file: 

1013 # already loaded, skip it 

1014 continue 

1015 self.log.debug(f"Loading cluster file {cluster_file}") 

1016 try: 

1017 cluster = Cluster.from_file(cluster_file, parent=self) 

1018 except Exception as e: 

1019 self.log.warning(f"Failed to load cluster from {cluster_file}: {e}") 

1020 continue 

1021 else: 

1022 cluster_key = self._cluster_key(cluster) 

1023 self.clusters[cluster_key] = cluster 

1024 

1025 return self.clusters 

1026 

1027 def new_cluster(self, **kwargs): 

1028 """Create a new cluster""" 

1029 cluster = Cluster(parent=self, **kwargs) 

1030 cluster_key = self._cluster_key(cluster) 

1031 if cluster_key in self.clusters: 

1032 raise KeyError(f"Cluster {cluster_key} already exists!") 

1033 self.clusters[cluster_key] = cluster 

1034 return cluster_key, cluster 

1035 

1036 def get_cluster(self, cluster_id): 

1037 """Get a Cluster object by id""" 

1038 return self.clusters[cluster_id] 

1039 

1040 def remove_cluster(self, cluster_id): 

1041 """Delete a cluster by id""" 

1042 # TODO: check running? 

1043 del self.clusters[cluster_id] 

1044 

1045 

1046def clean_cluster_files(profile_dirs=None, *, log=None, force=False): 

1047 """Find all files related to clusters, and remove them 

1048 

1049 Cleans up stale logs, etc. 

1050 

1051 if force: remove cluster files even for running 

1052 If not force, will raise if any cluster files are found, 

1053 because it's not safe to clean up cluster files while processes are running 

1054 

1055 """ 

1056 if profile_dirs is None: 

1057 # default to all profiles 

1058 profile_dirs = _all_profile_dirs() 

1059 

1060 if isinstance(profile_dirs, str): 

1061 profile_dirs = [profile_dirs] 

1062 

1063 def _remove(f): 

1064 if log: 

1065 log.debug(f"Removing {f}") 

1066 try: 

1067 os.remove(f) 

1068 except OSError as e: 

1069 log.error(f"Error removing {f}: {e}") 

1070 

1071 for profile_dir in profile_dirs: 

1072 if log: 

1073 log.info(f"Cleaning ipython cluster files in {profile_dir}") 

1074 for cluster_file in ClusterManager._cluster_files_in_profile_dir(profile_dir): 

1075 if force: 

1076 _remove(cluster_file) 

1077 else: 

1078 # check if running first 

1079 try: 

1080 cluster = Cluster.from_file(cluster_file=cluster_file) 

1081 except Exception as e: 

1082 if log: 

1083 log.error( 

1084 f"Removing cluster file, which failed to load {cluster_file}: {e}" 

1085 ) 

1086 _remove(cluster_file) 

1087 else: 

1088 if cluster._is_running(): 

1089 raise ValueError( 

1090 "f{cluster} is still running. Use force=True to cleanup files for running clusters (may leave orphan processes!)" 

1091 ) 

1092 else: 

1093 _remove(cluster_file) 

1094 for log_file in glob.glob(os.path.join(profile_dir, 'log', '*')): 

1095 _remove(log_file) 

1096 for security_file in glob.glob( 

1097 os.path.join(profile_dir, 'security', 'ipcontroller-*.json') 

1098 ): 

1099 _remove(security_file)