1"""Cluster class
2
3defines the basic interface to a single IPython Parallel cluster
4
5starts/stops/polls controllers, engines, etc.
6"""
7
8import asyncio
9import atexit
10import glob
11import inspect
12import json
13import logging
14import os
15import random
16import string
17import sys
18import time
19import traceback
20from functools import partial
21from multiprocessing import cpu_count
22from weakref import WeakSet
23
24import IPython
25from traitlets import (
26 Any,
27 Bool,
28 Dict,
29 Float,
30 Instance,
31 Integer,
32 List,
33 Unicode,
34 default,
35 import_item,
36 validate,
37)
38from traitlets.config import Application, Config, LoggingConfigurable
39
40from .._async import AsyncFirst
41from ..traitlets import Launcher
42from ..util import (
43 _all_profile_dirs,
44 _default_profile_dir,
45 _locate_profiles,
46 _traitlet_signature,
47 abbreviate_profile_dir,
48)
49from . import launcher
50
51_suffix_chars = string.ascii_lowercase + string.digits
52
53# weak set of clusters to be cleaned up at exit
54_atexit_clusters = WeakSet()
55
56
57def _atexit_cleanup_clusters(*args):
58 """Cleanup clusters during process shutdown"""
59 for cluster in _atexit_clusters:
60 if not cluster.shutdown_atexit:
61 # overridden after register
62 continue
63 if cluster.controller or cluster.engines:
64 print(f"Stopping cluster {cluster}", file=sys.stderr)
65 try:
66 cluster.stop_cluster_sync()
67 except Exception:
68 print(f"Error stopping cluster {cluster}", file=sys.stderr)
69 traceback.print_exception(*sys.exc_info())
70
71
72_atexit_cleanup_clusters.registered = False
73
74
75@_traitlet_signature
76class Cluster(AsyncFirst, LoggingConfigurable):
77 """Class representing an IPP cluster
78
79 i.e. one controller and one or more groups of engines
80
81 Can start/stop/monitor/poll cluster resources
82
83 All async methods can be called synchronously with a `_sync` suffix,
84 e.g. `cluster.start_cluster_sync()`
85
86 .. versionchanged:: 8.0
87 controller and engine launcher classes can be specified via
88 `Cluster(controller='ssh', engines='mpi')`
89 without the `_launcher_class` suffix.
90 """
91
92 # general configuration
93
94 shutdown_atexit = Bool(
95 True,
96 help="""
97 Shutdown the cluster at process exit.
98
99 Set to False if you want to launch a cluster and leave it running
100 after the launching process exits.
101 """,
102 )
103
104 cluster_id = Unicode(help="The id of the cluster (default: random string)").tag(
105 to_dict=True
106 )
107
108 @default("cluster_id")
109 def _default_cluster_id(self):
110 return f"{int(time.time())}-{''.join(random.choice(_suffix_chars) for i in range(4))}"
111
112 profile_dir = Unicode(
113 help="""The profile directory.
114
115 Default priority:
116
117 - specified explicitly
118 - current IPython session
119 - use profile name (default: 'default')
120
121 """
122 ).tag(to_dict=True)
123
124 @default("profile_dir")
125 def _default_profile_dir(self):
126 return _default_profile_dir(profile=self.profile)
127
128 @validate("profile_dir")
129 def _validate_profile_dir(self, proposal):
130 path = proposal.value
131 if path:
132 return os.path.abspath(path)
133 return path
134
135 profile = Unicode(
136 "",
137 help="""The profile name,
138 a shortcut for specifying profile_dir within $IPYTHONDIR.""",
139 )
140
141 cluster_file = Unicode(
142 help="The path to the cluster file for saving this cluster to disk"
143 )
144
145 @default("cluster_file")
146 def _default_cluster_file(self):
147 return os.path.join(
148 self.profile_dir, "security", f"cluster-{self.cluster_id}.json"
149 )
150
151 engine_timeout = Integer(
152 60,
153 help="""Timeout to use when waiting for engines to register
154
155 before giving up.
156 """,
157 config=True,
158 )
159
160 send_engines_connection_env = Bool(
161 True,
162 config=True,
163 help="""
164 Wait for controller's connection info before passing to engines
165 via $IPP_CONNECTION_INFO environment variable.
166
167 Set to False to start engines immediately
168 without waiting for the controller's connection info to be available.
169
170 When True, no connection file movement is required.
171 False is mainly useful when submitting the controller may
172 take a long time in a job queue,
173 and the engines should enter the queue before the controller is running.
174
175 .. versionadded:: 8.0
176 """,
177 )
178
179 controller_launcher_class = Launcher(
180 default_value=launcher.LocalControllerLauncher,
181 entry_point_group='ipyparallel.controller_launchers',
182 help="""The class for launching a Controller. Change this value if you want
183 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
184
185 Each launcher class has its own set of configuration options, for making sure
186 it will work in your environment.
187
188 Note that using a batch launcher for the controller *does not* put it
189 in the same batch job as the engines, so they will still start separately.
190
191 Third-party engine launchers can be registered via `ipyparallel.engine_launchers` entry point.
192
193 They can be selected via case-insensitive abbreviation, e.g.
194
195 c.Cluster.controller_launcher_class = 'SSH'
196
197 or:
198
199 ipcluster start --controller=MPI
200
201 """,
202 config=True,
203 ).tag(alias="controller")
204
205 engine_launcher_class = Launcher(
206 default_value=launcher.LocalEngineSetLauncher,
207 entry_point_group='ipyparallel.engine_launchers',
208 help="""The class for launching a set of Engines. Change this value
209 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
210 Each launcher class has its own set of configuration options, for making sure
211 it will work in your environment.
212
213 Third-party engine launchers can be registered via `ipyparallel.engine_launchers` entry point.
214
215 They can be selected via case-insensitive abbreviation, e.g.
216
217 c.Cluster.engine_launcher_class = 'ssh'
218
219 or:
220
221 ipcluster start --engines=mpi
222
223 """,
224 config=True,
225 ).tag(alias="engines")
226
227 # controller configuration
228
229 controller_args = List(
230 Unicode(),
231 config=True,
232 help="Additional CLI args to pass to the controller.",
233 ).tag(to_dict=True)
234 controller_ip = Unicode(
235 config=True, help="Set the IP address of the controller."
236 ).tag(to_dict=True)
237 controller_location = Unicode(
238 config=True,
239 help="""Set the location (hostname or ip) of the controller.
240
241 This is used by engines and clients to locate the controller
242 when the controller listens on all interfaces
243 """,
244 ).tag(to_dict=True)
245
246 # engine configuration
247
248 delay = Float(
249 1.0,
250 config=True,
251 help="delay (in s) between starting the controller and the engines",
252 ).tag(to_dict=True)
253
254 n = Integer(
255 None, allow_none=True, config=True, help="The number of engines to start"
256 ).tag(to_dict=True)
257
258 @default("parent")
259 def _default_parent(self):
260 """Default to inheriting config from current IPython session"""
261 return IPython.get_ipython()
262
263 log_level = Integer(logging.INFO)
264
265 @default("log")
266 def _default_log(self):
267 if self.parent and self.parent is IPython.get_ipython():
268 # log to stdout in an IPython session
269 log = logging.getLogger(f"{__name__}.{self.cluster_id}")
270 log.setLevel(self.log_level)
271
272 handler = logging.StreamHandler(sys.stdout)
273 log.handlers = [handler]
274 log.propagate = False
275 return log
276 elif self.parent and getattr(self.parent, 'log', None) is not None:
277 return self.parent.log
278 elif Application.initialized():
279 return Application.instance().log
280 else:
281 # set up our own logger
282 log = logging.getLogger(f"{__name__}.{self.cluster_id}")
283 log.setLevel(self.log_level)
284 return log
285
286 load_profile = Bool(
287 True,
288 config=True,
289 help="""
290 If True (default) load ipcluster config from profile directory, if present.
291 """,
292 )
293 # private state
294 controller = Any().tag(nosignature=True)
295 engines = Dict().tag(nosignature=True)
296
297 @property
298 def engine_set(self):
299 """Return the first engine set
300
301 Most clusters have only one engine set,
302 which is tedious to get to via the `engines` dict
303 with random engine set ids.
304
305 ..versionadded:: 8.0
306 """
307 if self.engines:
308 return next(iter(self.engines.values()))
309
310 profile_config = Instance(Config, allow_none=False).tag(nosignature=True)
311
312 @default("profile_config")
313 def _profile_config_default(self):
314 """Load config from our profile"""
315 if not self.load_profile or not os.path.isdir(self.profile_dir):
316 # no profile dir, nothing to load
317 return Config()
318
319 from .app import BaseParallelApplication, IPClusterStart
320
321 # look up if we are descended from an 'ipcluster' app
322 # avoids repeated load of the current profile dir
323 parents = []
324 parent = self.parent
325 while parent is not None:
326 parents.append(parent)
327 parent = parent.parent
328
329 app_parents = list(
330 filter(lambda p: isinstance(p, BaseParallelApplication), parents)
331 )
332 if app_parents:
333 app_parent = app_parents[0]
334 else:
335 app_parent = None
336
337 if (
338 app_parent
339 and app_parent.name == 'ipcluster'
340 and app_parent.profile_dir.location == self.profile_dir
341 ):
342 # profile config already loaded by parent, nothing new to load
343 return Config()
344
345 self.log.debug(f"Loading profile {self.profile_dir}")
346 # set profile dir via config
347 config = Config()
348 config.ProfileDir.location = self.profile_dir
349
350 # load profile config via IPCluster
351 app = IPClusterStart(config=config, log=self.log)
352 # adds profile dir to config_files_path
353 app.init_profile_dir()
354 # adds system to config_files_path
355 app.init_config_files()
356 # actually load the config
357 app.load_config_file(suppress_errors=False)
358 return app.config
359
360 @validate("config")
361 def _merge_profile_config(self, proposal):
362 direct_config = proposal.value
363 if not self.load_profile:
364 return direct_config
365 profile_config = self.profile_config
366 if not profile_config:
367 return direct_config
368 # priority ?! direct > profile
369 config = Config()
370 if profile_config:
371 config.merge(profile_config)
372 config.merge(direct_config)
373 return config
374
375 @default("config")
376 def _default_config(self):
377 if self.load_profile:
378 return self.profile_config
379 else:
380 return Config()
381
382 def __init__(self, *, engines=None, controller=None, **kwargs):
383 """Construct a Cluster"""
384 # handle more intuitive aliases, which match ipcluster cli args, etc.
385 if engines is not None:
386 if 'engine_launcher_class' in kwargs:
387 raise TypeError(
388 "Only specify one of 'engines' or 'engine_launcher_class', not both"
389 )
390 kwargs['engine_launcher_class'] = engines
391 if controller is not None:
392 if 'controller_launcher_class' in kwargs:
393 raise TypeError(
394 "Only specify one of 'controller' or 'controller_launcher_class', not both"
395 )
396 kwargs['controller_launcher_class'] = controller
397 if 'parent' not in kwargs and 'config' not in kwargs:
398 kwargs['parent'] = self._default_parent()
399
400 super().__init__(**kwargs)
401
402 def __del__(self):
403 if not self.shutdown_atexit:
404 return
405 if self.controller or self.engines:
406 self.stop_cluster_sync()
407
408 def __repr__(self):
409 fields = {
410 "cluster_id": repr(self.cluster_id),
411 }
412 profile_dir = self.profile_dir
413 profile_prefix = os.path.join(IPython.paths.get_ipython_dir(), "profile_")
414 if profile_dir.startswith(profile_prefix):
415 fields["profile"] = repr(profile_dir[len(profile_prefix) :])
416 else:
417 home_dir = os.path.expanduser("~")
418
419 if profile_dir.startswith(home_dir + os.path.sep):
420 # truncate $HOME/. -> ~/...
421 profile_dir = "~" + profile_dir[len(home_dir) :]
422 fields["profile_dir"] = repr(profile_dir)
423
424 if self.controller:
425 fields["controller"] = f"<{self.controller.state}>"
426 if self.engines:
427 fields["engine_sets"] = list(self.engines)
428
429 fields_str = ', '.join(f"{key}={value}" for key, value in fields.items())
430
431 return f"<{self.__class__.__name__}({fields_str})>"
432
433 def to_dict(self):
434 """Serialize a Cluster object for later reconstruction"""
435 cluster_info = {}
436 d = {"cluster": cluster_info}
437 for attr in self.traits(to_dict=True):
438 cluster_info[attr] = getattr(self, attr)
439
440 def _cls_str(cls):
441 return f"{cls.__module__}.{cls.__name__}"
442
443 cluster_info["class"] = _cls_str(self.__class__)
444
445 if self.controller and self.controller.state != 'after':
446 d["controller"] = {
447 "class": launcher.abbreviate_launcher_class(
448 self.controller_launcher_class
449 ),
450 "state": None,
451 }
452 d["controller"]["state"] = self.controller.to_dict()
453
454 d["engines"] = {
455 "class": launcher.abbreviate_launcher_class(self.engine_launcher_class),
456 "sets": {},
457 }
458 sets = d["engines"]["sets"]
459 for engine_set_id, engine_launcher in self.engines.items():
460 if engine_launcher.state != 'after':
461 sets[engine_set_id] = engine_launcher.to_dict()
462 return d
463
464 @classmethod
465 def from_dict(cls, d, **kwargs):
466 """Construct a Cluster from serialized state"""
467 cluster_info = d["cluster"]
468 if cluster_info.get("class"):
469 specified_cls = import_item(cluster_info["class"])
470 if specified_cls is not cls:
471 # specified a custom Cluster class,
472 # dispatch to from_dict from that class
473 return specified_cls.from_dict(d, **kwargs)
474
475 kwargs.setdefault("shutdown_atexit", False)
476 self = cls(**kwargs)
477 for attr in self.traits(to_dict=True):
478 if attr in cluster_info:
479 setattr(self, attr, cluster_info[attr])
480
481 for attr in self.traits(to_dict=True):
482 if attr in d:
483 setattr(self, attr, d[attr])
484
485 cluster_key = ClusterManager._cluster_key(self)
486
487 if d.get("controller"):
488 controller_info = d["controller"]
489 self.controller_launcher_class = controller_info["class"]
490 # after traitlet coercion, which imports strings
491 cls = self.controller_launcher_class
492 if controller_info["state"]:
493 try:
494 self.controller = cls.from_dict(
495 controller_info["state"], parent=self
496 )
497 except launcher.NotRunning as e:
498 self.log.error(f"Controller for {cluster_key} not running: {e}")
499 else:
500 self.controller.on_stop(self._controller_stopped)
501
502 engine_info = d.get("engines")
503 if engine_info:
504 self.engine_launcher_class = engine_info["class"]
505 # after traitlet coercion, which imports strings
506 cls = self.engine_launcher_class
507 for engine_set_id, engine_state in engine_info.get("sets", {}).items():
508 try:
509 self.engines[engine_set_id] = engine_set = cls.from_dict(
510 engine_state,
511 engine_set_id=engine_set_id,
512 parent=self,
513 )
514 except launcher.NotRunning as e:
515 self.log.error(
516 f"Engine set {cluster_key}{engine_set_id} not running: {e}"
517 )
518 else:
519 engine_set.on_stop(partial(self._engines_stopped, engine_set_id))
520
521 # check if state changed
522 if self.to_dict() != d:
523 # if so, update our cluster file
524 self.update_cluster_file()
525 return self
526
527 @classmethod
528 def from_file(
529 cls,
530 cluster_file=None,
531 *,
532 profile=None,
533 profile_dir=None,
534 cluster_id='',
535 **kwargs,
536 ):
537 """Load a Cluster object from a file
538
539 Can specify a full path,
540 or combination of profile, profile_dir, and/or cluster_id.
541
542 With no arguments given, it will connect to a cluster created
543 with `ipcluster start`.
544 """
545
546 if cluster_file is None:
547 # determine cluster_file from profile/profile_dir
548
549 kwargs['cluster_id'] = cluster_id
550 if profile is not None:
551 kwargs['profile'] = profile
552 if profile_dir is not None:
553 kwargs['profile_dir'] = profile_dir
554 cluster_file = Cluster(**kwargs).cluster_file
555
556 # ensure from_file preserves cluster_file, even if it moved
557 kwargs.setdefault("cluster_file", cluster_file)
558 with open(cluster_file) as f:
559 return cls.from_dict(json.load(f), **kwargs)
560
561 def write_cluster_file(self):
562 """Write cluster info to disk for later loading"""
563 os.makedirs(os.path.dirname(self.cluster_file), exist_ok=True)
564 self.log.debug(f"Updating {self.cluster_file}")
565 with open(self.cluster_file, "w") as f:
566 json.dump(self.to_dict(), f)
567
568 def remove_cluster_file(self):
569 """Remove my cluster file."""
570 try:
571 os.remove(self.cluster_file)
572 except FileNotFoundError:
573 pass
574 else:
575 self.log.debug(f"Removed cluster file: {self.cluster_file}")
576
577 def _is_running(self):
578 """Return if we have any running components"""
579 if self.controller and self.controller.state != 'after':
580 return True
581 if any(es.state != 'after' for es in self.engines.values()):
582 return True
583 return False
584
585 def update_cluster_file(self):
586 """Update my cluster file
587
588 If cluster_file is disabled, do nothing
589 If cluster is fully stopped, remove the file
590 """
591 if not self.cluster_file:
592 # setting cluster_file='' disables saving to disk
593 return
594
595 if not self._is_running():
596 self.remove_cluster_file()
597 else:
598 self.write_cluster_file()
599
600 async def start_controller(self, **kwargs):
601 """Start the controller
602
603 Keyword arguments are passed to the controller launcher constructor
604 """
605 # start controller
606 # retrieve connection info
607 # webhook?
608 if self.controller is not None:
609 raise RuntimeError(
610 "controller is already running. Call stopcontroller() first."
611 )
612
613 if self.shutdown_atexit:
614 _atexit_clusters.add(self)
615 if not _atexit_cleanup_clusters.registered:
616 atexit.register(_atexit_cleanup_clusters)
617
618 self.controller = controller = self.controller_launcher_class(
619 work_dir='.',
620 parent=self,
621 log=self.log,
622 profile_dir=self.profile_dir,
623 cluster_id=self.cluster_id,
624 **kwargs,
625 )
626
627 controller_args = getattr(controller, 'controller_args', None)
628 if controller_args is None:
629
630 def add_args(args):
631 # only some Launchers support modifying controller args
632 self.log.warning(
633 "Not adding controller args %s. "
634 "controller_args passthrough is not supported by %s",
635 args,
636 self.controller_launcher_class.__name__,
637 )
638
639 else:
640 # copy to make sure change events fire
641 controller_args = list(controller_args)
642 add_args = controller_args.extend
643
644 if self.controller_ip:
645 add_args([f'--ip={self.controller_ip}'])
646 if self.controller_location:
647 add_args([f'--location={self.controller_location}'])
648 if self.controller_args:
649 add_args(self.controller_args)
650
651 if controller_args is not None:
652 # ensure we trigger trait observers after we are done
653 self.controller.controller_args = list(controller_args)
654
655 self.controller.on_stop(self._controller_stopped)
656 r = self.controller.start()
657 if inspect.isawaitable(r):
658 await r
659
660 self.update_cluster_file()
661
662 def _controller_stopped(self, stop_data=None):
663 """Callback when a controller stops"""
664 if stop_data and stop_data.get("exit_code"):
665 log = self.log.warning
666 else:
667 log = self.log.info
668 log(f"Controller stopped: {stop_data}")
669 self.update_cluster_file()
670
671 def _new_engine_set_id(self):
672 """Generate a new engine set id"""
673 engine_set_id = base = f"{int(time.time())}"
674 i = 1
675 while engine_set_id in self.engines:
676 engine_set_id = f"{base}-{i}"
677 i += 1
678 return engine_set_id
679
680 async def start_engines(self, n=None, engine_set_id=None, **kwargs):
681 """Start an engine set
682
683 Returns an engine set id which can be used in stop_engines
684 """
685 # TODO: send engines connection info
686 if engine_set_id is None:
687 engine_set_id = self._new_engine_set_id()
688 engine_set = self.engines[engine_set_id] = self.engine_launcher_class(
689 work_dir='.',
690 parent=self,
691 log=self.log,
692 profile_dir=self.profile_dir,
693 cluster_id=self.cluster_id,
694 engine_set_id=engine_set_id,
695 **kwargs,
696 )
697 if self.send_engines_connection_env and self.controller:
698 self.log.debug("Setting $IPP_CONNECTION_INFO environment")
699 connection_info = await self.controller.get_connection_info()
700 connection_info_json = json.dumps(connection_info["engine"])
701 engine_set.environment["IPP_CONNECTION_INFO"] = connection_info_json
702
703 if n is None:
704 n = self.n
705 n = getattr(engine_set, 'engine_count', n)
706 if n is None:
707 n = cpu_count()
708 self.log.info(f"Starting {n or ''} engines with {self.engine_launcher_class}")
709 r = engine_set.start(n)
710 engine_set.on_stop(partial(self._engines_stopped, engine_set_id))
711 if inspect.isawaitable(r):
712 await r
713 self.update_cluster_file()
714 return engine_set_id
715
716 def _engines_stopped(self, engine_set_id, stop_data=None):
717 if stop_data and stop_data.get("exit_code"):
718 log = self.log.warning
719 else:
720 log = self.log.info
721 log(f"engine set stopped {engine_set_id}: {stop_data}")
722 self.update_cluster_file()
723
724 async def start_and_connect(self, n=None, activate=False):
725 """Single call to start a cluster and connect a client
726
727 If `activate` is given, a blocking DirectView on all engines will be created
728 and activated, registering `%px` magics for use in IPython
729
730 Example::
731
732 rc = await Cluster(engines="mpi").start_and_connect(n=8, activate=True)
733
734 %px print("hello, world!")
735
736 Equivalent to::
737
738 await self.start_cluster(n)
739 client = await self.connect_client()
740 await client.wait_for_engines(n, block=False)
741
742 .. versionadded:: 7.1
743
744 .. versionadded:: 8.1
745
746 activate argument.
747 """
748 if n is None:
749 n = self.n
750 await self.start_cluster(n=n)
751 client = await self.connect_client()
752
753 if n is None:
754 # number of engines to wait for
755 # if not specified, derive current value from EngineSets
756 n = sum(engine_set.n for engine_set in self.engines.values())
757
758 if n:
759 await asyncio.wrap_future(
760 client.wait_for_engines(n, block=False, timeout=self.engine_timeout)
761 )
762
763 if activate:
764 view = client[:]
765 view.block = True
766 view.activate()
767 return client
768
769 async def start_cluster(self, n=None):
770 """Start a cluster
771
772 starts one controller and n engines (default: self.n)
773
774 .. versionchanged:: 7.1
775 return self, to allow method chaining
776 """
777 if self.controller is None:
778 await self.start_controller()
779 if self.delay:
780 await asyncio.sleep(self.delay)
781 await self.start_engines(n)
782 # return self to allow chaining
783 return self
784
785 async def stop_engines(self, engine_set_id=None):
786 """Stop an engine set
787
788 If engine_set_id is not given,
789 all engines are stopped.
790 """
791 if engine_set_id is None:
792 futures = []
793 for engine_set_id in list(self.engines):
794 futures.append(self.stop_engines(engine_set_id))
795 if futures:
796 await asyncio.gather(*futures)
797 return
798 self.log.info(f"Stopping engine(s): {engine_set_id}")
799 engine_set = self.engines[engine_set_id]
800 r = engine_set.stop()
801 if inspect.isawaitable(r):
802 await r
803 # retrieve and cleanup output files
804 engine_set.get_output(remove=True)
805 self.engines.pop(engine_set_id)
806 self.update_cluster_file()
807
808 async def stop_engine(self, engine_id):
809 """Stop one engine
810
811 *May* stop all engines in a set,
812 depending on EngineSet features (e.g. mpiexec)
813 """
814 raise NotImplementedError("How do we find an engine by id?")
815
816 async def restart_engines(self, engine_set_id=None):
817 """Restart an engine set"""
818 if engine_set_id is None:
819 for engine_set_id in list(self.engines):
820 await self.restart_engines(engine_set_id)
821 return
822 engine_set = self.engines[engine_set_id]
823 n = engine_set.n
824 await self.stop_engines(engine_set_id)
825 await self.start_engines(n, engine_set_id)
826
827 async def restart_engine(self, engine_id):
828 """Restart one engine
829
830 *May* stop all engines in a set,
831 depending on EngineSet features (e.g. mpiexec)
832 """
833 raise NotImplementedError("How do we find an engine by id?")
834
835 async def signal_engine(self, signum, engine_id):
836 """Signal one engine
837
838 *May* signal all engines in a set,
839 depending on EngineSet features (e.g. mpiexec)
840 """
841 raise NotImplementedError("How do we find an engine by id?")
842
843 async def signal_engines(self, signum, engine_set_id=None):
844 """Signal all engines in a set
845
846 If no engine set is specified, signal all engine sets.
847 """
848 if engine_set_id is None:
849 for engine_set_id in list(self.engines):
850 await self.signal_engines(signum, engine_set_id)
851 return
852 self.log.info(f"Sending signal {signum} to engine(s) {engine_set_id}")
853 engine_set = self.engines[engine_set_id]
854 r = engine_set.signal(signum)
855 if inspect.isawaitable(r):
856 await r
857
858 async def stop_controller(self):
859 """Stop the controller"""
860 if self.controller and self.controller.running:
861 self.log.info("Stopping controller")
862 r = self.controller.stop()
863 if inspect.isawaitable(r):
864 await r
865
866 if self.controller:
867 self.controller.get_output(remove=True)
868
869 self.controller = None
870 self.update_cluster_file()
871
872 async def stop_cluster(self):
873 """Stop the controller and all engines"""
874 await asyncio.gather(self.stop_controller(), self.stop_engines())
875
876 async def connect_client(self, **client_kwargs):
877 """Return a client connected to the cluster"""
878 # TODO: get connect info directly from controller
879 # this assumes local files exist
880 from ipyparallel import Client
881
882 connection_info = self.controller.get_connection_info()
883 if inspect.isawaitable(connection_info):
884 connection_info = await connection_info
885
886 return Client(
887 connection_info['client'],
888 cluster=self,
889 profile_dir=self.profile_dir,
890 cluster_id=self.cluster_id,
891 **client_kwargs,
892 )
893
894 # context managers (both async and sync)
895 _context_client = None
896
897 async def __aenter__(self):
898 client = self._context_client = await self.start_and_connect()
899 return client
900
901 async def __aexit__(self, *args):
902 if self._context_client is not None:
903 self._context_client.close()
904 self._context_client = None
905 await self.stop_engines()
906 await self.stop_controller()
907
908 def __enter__(self):
909 client = self._context_client = self.start_and_connect_sync()
910 return client
911
912 def __exit__(self, *args):
913 if self._context_client:
914 self._context_client.close()
915 self._context_client = None
916 self.stop_engines_sync()
917 self.stop_controller_sync()
918
919
920class ClusterManager(LoggingConfigurable):
921 """A manager of clusters
922
923 Wraps Cluster, adding lookup/list by cluster id
924 """
925
926 clusters = Dict(help="My cluster objects")
927
928 @staticmethod
929 def _cluster_key(cluster):
930 """Return a unique cluster key for a cluster
931
932 Default is {profile}:{cluster_id}
933 """
934 return f"{abbreviate_profile_dir(cluster.profile_dir)}:{cluster.cluster_id}"
935
936 @staticmethod
937 def _cluster_files_in_profile_dir(profile_dir):
938 """List clusters in a profile directory
939
940 Returns list of cluster *files*
941 """
942 return glob.glob(os.path.join(profile_dir, "security", "cluster-*.json"))
943
944 def load_clusters(
945 self,
946 *,
947 profile_dirs=None,
948 profile_dir=None,
949 profiles=None,
950 profile=None,
951 init_default_clusters=False,
952 **kwargs,
953 ):
954 """Populate a ClusterManager from cluster files on disk
955
956 Load all cluster objects from the given profile directory(ies).
957
958 Default is to find clusters in all IPython profiles,
959 but profile directories or profile names can be specified explicitly.
960
961 If `init_default_clusters` is True,
962 a stopped Cluster object is loaded for every profile dir
963 with cluster_id="" if no running cluster is found.
964
965 Priority:
966
967 - profile_dirs list
968 - single profile_dir
969 - profiles list by name
970 - single profile by name
971 - all IPython profiles, if nothing else specified
972 """
973
974 # first, check our current clusters
975 for key, cluster in list(self.clusters.items()):
976 # remove stopped clusters
977 # but not *new* clusters that haven't started yet
978 # if `cluster.controller` is present
979 # that means it was running at some point
980 if cluster.controller and not cluster._is_running():
981 self.log.info(f"Removing stopped cluster {key}")
982 self.clusters.pop(key)
983
984 if profile_dirs is None:
985 if profile_dir is not None:
986 profile_dirs = [profile_dir]
987 else:
988 if profiles is None:
989 if profile is not None:
990 profiles = [profile]
991
992 if profiles is not None:
993 profile_dirs = _locate_profiles(profiles)
994
995 if profile_dirs is None:
996 # totally unspecified, default to all
997 profile_dirs = _all_profile_dirs()
998
999 by_cluster_file = {c.cluster_file: c for c in self.clusters.values()}
1000 for profile_dir in profile_dirs:
1001 cluster_files = self._cluster_files_in_profile_dir(profile_dir)
1002 # load default cluster for each profile
1003 # TODO: only if it has any ipyparallel config files
1004 # *or* it's the default profile
1005 if init_default_clusters and not cluster_files:
1006 cluster = Cluster(profile_dir=profile_dir, cluster_id="")
1007 cluster_key = self._cluster_key(cluster)
1008 if cluster_key not in self.clusters:
1009 self.clusters[cluster_key] = cluster
1010
1011 for cluster_file in cluster_files:
1012 if cluster_file in by_cluster_file:
1013 # already loaded, skip it
1014 continue
1015 self.log.debug(f"Loading cluster file {cluster_file}")
1016 try:
1017 cluster = Cluster.from_file(cluster_file, parent=self)
1018 except Exception as e:
1019 self.log.warning(f"Failed to load cluster from {cluster_file}: {e}")
1020 continue
1021 else:
1022 cluster_key = self._cluster_key(cluster)
1023 self.clusters[cluster_key] = cluster
1024
1025 return self.clusters
1026
1027 def new_cluster(self, **kwargs):
1028 """Create a new cluster"""
1029 cluster = Cluster(parent=self, **kwargs)
1030 cluster_key = self._cluster_key(cluster)
1031 if cluster_key in self.clusters:
1032 raise KeyError(f"Cluster {cluster_key} already exists!")
1033 self.clusters[cluster_key] = cluster
1034 return cluster_key, cluster
1035
1036 def get_cluster(self, cluster_id):
1037 """Get a Cluster object by id"""
1038 return self.clusters[cluster_id]
1039
1040 def remove_cluster(self, cluster_id):
1041 """Delete a cluster by id"""
1042 # TODO: check running?
1043 del self.clusters[cluster_id]
1044
1045
1046def clean_cluster_files(profile_dirs=None, *, log=None, force=False):
1047 """Find all files related to clusters, and remove them
1048
1049 Cleans up stale logs, etc.
1050
1051 if force: remove cluster files even for running
1052 If not force, will raise if any cluster files are found,
1053 because it's not safe to clean up cluster files while processes are running
1054
1055 """
1056 if profile_dirs is None:
1057 # default to all profiles
1058 profile_dirs = _all_profile_dirs()
1059
1060 if isinstance(profile_dirs, str):
1061 profile_dirs = [profile_dirs]
1062
1063 def _remove(f):
1064 if log:
1065 log.debug(f"Removing {f}")
1066 try:
1067 os.remove(f)
1068 except OSError as e:
1069 log.error(f"Error removing {f}: {e}")
1070
1071 for profile_dir in profile_dirs:
1072 if log:
1073 log.info(f"Cleaning ipython cluster files in {profile_dir}")
1074 for cluster_file in ClusterManager._cluster_files_in_profile_dir(profile_dir):
1075 if force:
1076 _remove(cluster_file)
1077 else:
1078 # check if running first
1079 try:
1080 cluster = Cluster.from_file(cluster_file=cluster_file)
1081 except Exception as e:
1082 if log:
1083 log.error(
1084 f"Removing cluster file, which failed to load {cluster_file}: {e}"
1085 )
1086 _remove(cluster_file)
1087 else:
1088 if cluster._is_running():
1089 raise ValueError(
1090 "f{cluster} is still running. Use force=True to cleanup files for running clusters (may leave orphan processes!)"
1091 )
1092 else:
1093 _remove(cluster_file)
1094 for log_file in glob.glob(os.path.join(profile_dir, 'log', '*')):
1095 _remove(log_file)
1096 for security_file in glob.glob(
1097 os.path.join(profile_dir, 'security', 'ipcontroller-*.json')
1098 ):
1099 _remove(security_file)