1"""Facilities for launching IPython Parallel processes asynchronously."""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5import asyncio
6import copy
7import inspect
8import json
9import logging
10import os
11import re
12import shlex
13import shutil
14import signal
15import stat
16import sys
17import threading
18import time
19from concurrent.futures import ThreadPoolExecutor
20from functools import lru_cache, partial
21from signal import SIGTERM
22from subprocess import PIPE, STDOUT, Popen, check_output
23from tempfile import TemporaryDirectory
24from textwrap import indent
25
26import psutil
27from IPython.utils.path import ensure_dir_exists, get_home_dir
28from IPython.utils.text import EvalFormatter
29from tornado import ioloop
30from traitlets import (
31 Any,
32 CRegExp,
33 Dict,
34 Float,
35 Instance,
36 Integer,
37 List,
38 Unicode,
39 default,
40 observe,
41)
42from traitlets.config.configurable import LoggingConfigurable
43
44from ..traitlets import entry_points
45from ..util import _OutputProducingThread as Thread
46from ..util import shlex_join
47from ._winhpcjob import IPControllerJob, IPControllerTask, IPEngineSetJob, IPEngineTask
48from .shellcmd import ShellCommandSend
49
50WINDOWS = os.name == 'nt'
51
52SIGKILL = getattr(signal, "SIGKILL", -1)
53# -----------------------------------------------------------------------------
54# Paths to the kernel apps
55# -----------------------------------------------------------------------------
56
57ipcluster_cmd_argv = [sys.executable, "-m", "ipyparallel.cluster"]
58
59ipengine_cmd_argv = [sys.executable, "-m", "ipyparallel.engine"]
60
61ipcontroller_cmd_argv = [sys.executable, "-m", "ipyparallel.controller"]
62
63# -----------------------------------------------------------------------------
64# Base launchers and errors
65# -----------------------------------------------------------------------------
66
67
68class LauncherError(Exception):
69 pass
70
71
72class ProcessStateError(LauncherError):
73 pass
74
75
76class UnknownStatus(LauncherError):
77 pass
78
79
80class NotRunning(LauncherError):
81 """Raised when a launcher is no longer running"""
82
83 pass
84
85
86class BaseLauncher(LoggingConfigurable):
87 """An abstraction for starting, stopping and signaling a process."""
88
89 stop_timeout = Integer(
90 60,
91 config=True,
92 help="The number of seconds to wait for a process to exit before raising a TimeoutError in stop",
93 )
94
95 # In all of the launchers, the work_dir is where child processes will be
96 # run. This will usually be the profile_dir, but may not be. any work_dir
97 # passed into the __init__ method will override the config value.
98 # This should not be used to set the work_dir for the actual engine
99 # and controller. Instead, use their own config files or the
100 # controller_args, engine_args attributes of the launchers to add
101 # the work_dir option.
102 work_dir = Unicode('.')
103
104 # used in various places for labeling. often 'ipengine' or 'ipcontroller'
105 name = Unicode("process")
106
107 start_data = Any()
108 stop_data = Any()
109
110 identifier = Unicode(
111 help="Used for lookup in e.g. EngineSetLauncher during notify_stop and default log files"
112 )
113
114 @default("identifier")
115 def _default_identifier(self):
116 identifier = f"{self.name}"
117 if self.cluster_id:
118 identifier = f"{identifier}-{self.cluster_id}"
119 if getattr(self, 'engine_set_id', None):
120 identifier = f"{identifier}-{self.engine_set_id}"
121 identifier = f"{identifier}-{os.getpid()}"
122 return identifier
123
124 loop = Instance(ioloop.IOLoop, allow_none=True)
125
126 def _loop_default(self):
127 return ioloop.IOLoop.current()
128
129 profile_dir = Unicode('').tag(to_dict=True)
130 cluster_id = Unicode('').tag(to_dict=True)
131
132 state = Unicode("before").tag(to_dict=True)
133
134 stop_callbacks = List()
135
136 def to_dict(self):
137 """Serialize a Launcher to a dict, for later restoration"""
138 d = {}
139 for attr in self.traits(to_dict=True):
140 d[attr] = getattr(self, attr)
141
142 return d
143
144 @classmethod
145 def from_dict(cls, d, *, config=None, parent=None, **kwargs):
146 """Restore a Launcher from a dict
147
148 Subclasses should always call `launcher = super().from_dict(*args, **kwargs)`
149 and finish initialization after that.
150
151 After calling from_dict(),
152 the launcher should be in the same state as after `.start()`
153 (i.e. monitoring for exit, etc.)
154
155 Returns: Launcher
156 The instantiated and fully configured Launcher.
157
158 Raises: NotRunning
159 e.g. if the process has stopped and is no longer running.
160 """
161 launcher = cls(config=config, parent=parent, **kwargs)
162 for attr in launcher.traits(to_dict=True):
163 if attr in d:
164 setattr(launcher, attr, d[attr])
165 return launcher
166
167 @property
168 def cluster_args(self):
169 """Common cluster arguments"""
170 return []
171
172 @property
173 def connection_files(self):
174 """Dict of connection file paths"""
175 security_dir = os.path.join(self.profile_dir, 'security')
176 name_prefix = "ipcontroller"
177 if self.cluster_id:
178 name_prefix = f"{name_prefix}-{self.cluster_id}"
179 return {
180 kind: os.path.join(security_dir, f"{name_prefix}-{kind}.json")
181 for kind in ("client", "engine")
182 }
183
184 @property
185 def args(self):
186 """A list of cmd and args that will be used to start the process.
187
188 This is what is passed to :func:`spawnProcess` and the first element
189 will be the process name.
190 """
191 return self.find_args()
192
193 def find_args(self):
194 """The ``.args`` property calls this to find the args list.
195
196 Subcommand should implement this to construct the cmd and args.
197 """
198 raise NotImplementedError('find_args must be implemented in a subclass')
199
200 @property
201 def arg_str(self):
202 """The string form of the program arguments."""
203 return ' '.join(self.args)
204
205 @property
206 def cluster_env(self):
207 """Cluster-related env variables"""
208 return {
209 "IPP_CLUSTER_ID": self.cluster_id,
210 "IPP_PROFILE_DIR": self.profile_dir,
211 }
212
213 environment = Dict(
214 help="""Set environment variables for the launched process
215
216 .. versionadded:: 8.0
217 """,
218 config=True,
219 )
220
221 def get_env(self):
222 """Get the full environment for the process
223
224 merges different sources for environment variables
225 """
226 env = {}
227 env.update(self.cluster_env)
228 env.update(self.environment)
229 return env
230
231 @property
232 def running(self):
233 """Am I running."""
234 if self.state == 'running':
235 return True
236 else:
237 return False
238
239 async def start(self):
240 """Start the process.
241
242 Should be an `async def` coroutine.
243
244 When start completes,
245 the process should be requested (it need not be running yet),
246 and waiting should begin in the background such that :meth:`.notify_stop`
247 will be called when the process finishes.
248 """
249 raise NotImplementedError('start must be implemented in a subclass')
250
251 async def stop(self):
252 """Stop the process and notify observers of stopping.
253
254 This method should be an `async def` coroutine,
255 and return only after the process has stopped.
256
257 All resources should be cleaned up by the time this returns.
258 """
259 raise NotImplementedError('stop must be implemented in a subclass')
260
261 def on_stop(self, f):
262 """Register a callback to be called with this Launcher's stop_data
263 when the process actually finishes.
264 """
265 if self.state == 'after':
266 return f(self.stop_data)
267 else:
268 self.stop_callbacks.append(f)
269
270 def notify_start(self, data):
271 """Call this to trigger startup actions.
272
273 This logs the process startup and sets the state to 'running'. It is
274 a pass-through so it can be used as a callback.
275 """
276
277 self.log.debug(f"{self.__class__.__name__} {self.args[0]} started: {data}")
278 self.start_data = data
279 self.state = 'running'
280 return data
281
282 def notify_stop(self, data):
283 """Call this to trigger process stop actions.
284
285 This logs the process stopping and sets the state to 'after'. Call
286 this to trigger callbacks registered via :meth:`on_stop`."""
287 if self.state == 'after':
288 self.log.debug("Already notified stop (data)")
289 return data
290 self.log.debug(f"{self.__class__.__name__} {self.args[0]} stopped: {data}")
291
292 self.stop_data = data
293 self.state = 'after'
294 self._log_output(data)
295 for f in self.stop_callbacks:
296 f(data)
297 return data
298
299 def signal(self, sig):
300 """Signal the process.
301
302 Parameters
303 ----------
304 sig : str or int
305 'KILL', 'INT', etc., or any signal number
306 """
307 raise NotImplementedError('signal must be implemented in a subclass')
308
309 async def join(self, timeout=None):
310 """Wait for the process to finish"""
311 raise NotImplementedError('join must be implemented in a subclass')
312
313 output_limit = Integer(
314 100,
315 config=True,
316 help="""
317 When a process exits, display up to this many lines of output
318 """,
319 )
320
321 def get_output(self, remove=False):
322 """Retrieve the output form the Launcher.
323
324 If remove: remove the file, if any, where it was being stored.
325 """
326 # override in subclasses to retrieve output
327 return ""
328
329 def _log_output(self, stop_data=None):
330 output = self.get_output(remove=True)
331 if self.output_limit:
332 output = "".join(output.splitlines(True)[-self.output_limit :])
333
334 log = self.log.debug
335 if stop_data and stop_data.get("exit_code", 0) != 0:
336 log = self.log.warning
337 if output:
338 log("Output for %s:\n%s", self.identifier, output)
339
340
341class ControllerLauncher(BaseLauncher):
342 """Base class for launching ipcontroller"""
343
344 name = Unicode("ipcontroller")
345
346 controller_cmd = List(
347 list(ipcontroller_cmd_argv),
348 config=True,
349 help="""Popen command to launch ipcontroller.""",
350 )
351 # Command line arguments to ipcontroller.
352 controller_args = List(
353 Unicode(),
354 config=True,
355 help="""command-line args to pass to ipcontroller""",
356 )
357
358 connection_info_timeout = Float(
359 60,
360 config=True,
361 help="""
362 Default timeout (in seconds) for get_connection_info
363
364 .. versionadded:: 8.7
365 """,
366 )
367
368 async def get_connection_info(self, timeout=None):
369 """Retrieve connection info for the controller
370
371 Default implementation assumes profile_dir and cluster_id are local.
372
373 .. versionchanged:: 8.7
374 Accept `timeout=None` (default) to use `.connection_info_timeout` config.
375 """
376 if timeout is None:
377 timeout = self.connection_info_timeout
378 connection_files = self.connection_files
379 paths = list(connection_files.values())
380 start_time = time.monotonic()
381 if timeout >= 0:
382 deadline = start_time + timeout
383 else:
384 deadline = None
385
386 if not all(os.path.exists(f) for f in paths):
387 self.log.debug(f"Waiting for {paths}")
388 while not all(os.path.exists(f) for f in paths):
389 if deadline is not None and time.monotonic() > deadline:
390 missing_files = [f for f in paths if not os.path.exists(f)]
391 raise TimeoutError(
392 f"Connection files {missing_files} did not arrive in {timeout}s"
393 )
394 await asyncio.sleep(0.1)
395 status = self.poll()
396 if inspect.isawaitable(status):
397 status = await status
398 if status is not None:
399 raise RuntimeError(
400 f"Controller stopped with {status} while waiting for {paths}"
401 )
402 self.log.debug(f"Loading {paths}")
403 connection_info = {}
404 for key, path in connection_files.items():
405 try:
406 with open(path) as f:
407 connection_info[key] = json.load(f)
408 except ValueError:
409 # possible race while controller is still writing the file
410 # give it half a second before trying again
411 time.sleep(0.5)
412 with open(path) as f:
413 connection_info[key] = json.load(f)
414
415 return connection_info
416
417
418class EngineLauncher(BaseLauncher):
419 """Base class for launching one engine"""
420
421 name = Unicode("ipengine")
422
423 engine_cmd = List(
424 ipengine_cmd_argv, config=True, help="""command to launch the Engine."""
425 )
426 # Command line arguments for ipengine.
427 engine_args = List(
428 Unicode(),
429 config=True,
430 help="command-line arguments to pass to ipengine",
431 )
432
433 n = Integer(1).tag(to_dict=True)
434
435 engine_set_id = Unicode()
436
437
438# -----------------------------------------------------------------------------
439# Local process launchers
440# -----------------------------------------------------------------------------
441
442
443class LocalProcessLauncher(BaseLauncher):
444 """Start and stop an external process in an asynchronous manner.
445
446 This will launch the external process with a working directory of
447 ``self.work_dir``.
448 """
449
450 # This is used to to construct self.args, which is passed to
451 # spawnProcess.
452 cmd_and_args = List(Unicode())
453
454 poll_seconds = Integer(
455 30,
456 config=True,
457 help="""Interval on which to poll processes (.
458
459 Note: process exit should be noticed immediately,
460 due to use of Process.wait(),
461 but this interval should ensure we aren't leaving threads running forever,
462 as other signals/events are checked on this interval
463 """,
464 )
465
466 pid = Integer(-1).tag(to_dict=True)
467
468 output_file = Unicode().tag(to_dict=True)
469
470 @default("output_file")
471 def _default_output_file(self):
472 log_dir = os.path.join(self.profile_dir, "log")
473 os.makedirs(log_dir, exist_ok=True)
474 return os.path.join(log_dir, f'{self.identifier}.log')
475
476 stop_seconds_until_kill = Integer(
477 5,
478 config=True,
479 help="""The number of seconds to wait for a process to exit after sending SIGTERM before sending SIGKILL""",
480 )
481
482 stdout = None
483 stderr = None
484 process = None
485 _wait_thread = None
486 _popen_process = None
487
488 def find_args(self):
489 return self.cmd_and_args
490
491 @classmethod
492 def from_dict(cls, d, **kwargs):
493 self = super().from_dict(d, **kwargs)
494 self._reconstruct_process(d)
495 return self
496
497 def _reconstruct_process(self, d):
498 """Reconstruct our process"""
499 if 'pid' in d and d['pid'] > 0:
500 try:
501 self.process = psutil.Process(d['pid'])
502 except psutil.NoSuchProcess as e:
503 raise NotRunning(f"Process {d['pid']}")
504 self._start_waiting()
505
506 def _wait(self):
507 """Background thread waiting for a process to exit"""
508 exit_code = None
509 while not self._stop_waiting.is_set() and self.state == 'running':
510 try:
511 # use a timeout so we can check the _stop_waiting event
512 exit_code = self.process.wait(timeout=self.poll_seconds)
513 except psutil.TimeoutExpired:
514 continue
515 else:
516 break
517 stop_data = dict(exit_code=exit_code, pid=self.pid, identifier=self.identifier)
518 self.loop.add_callback(lambda: self.notify_stop(stop_data))
519 if self._popen_process:
520 # wait avoids ResourceWarning if the process has exited
521 self._popen_process.wait(0)
522
523 def _start_waiting(self):
524 """Start background thread waiting on the process to exit"""
525 # ensure self.loop is accessed on the main thread before waiting
526 self.loop
527 self._stop_waiting = threading.Event()
528 self._wait_thread = Thread(
529 target=self._wait, daemon=True, name=f"wait(pid={self.pid})"
530 )
531 self._wait_thread.start()
532
533 def start(self):
534 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
535 if self.state != 'before':
536 raise ProcessStateError(
537 'The process was already started and has state: {self.state}'
538 )
539 self.log.debug(f"Sending output for {self.identifier} to {self.output_file}")
540
541 env = os.environ.copy()
542 env.update(self.get_env())
543 self.log.debug(f"Setting environment: {','.join(self.get_env())}")
544
545 with open(self.output_file, "ab") as f, open(os.devnull, "rb") as stdin:
546 proc = self._popen_process = Popen(
547 self.args,
548 stdout=f.fileno(),
549 stderr=STDOUT,
550 stdin=stdin,
551 env=env,
552 cwd=self.work_dir,
553 start_new_session=True, # don't forward signals
554 )
555 self.pid = proc.pid
556 # use psutil API for self.process
557 self.process = psutil.Process(proc.pid)
558
559 self.notify_start(self.process.pid)
560 self._start_waiting()
561 if 1 <= self.log.getEffectiveLevel() <= logging.DEBUG:
562 self._start_streaming()
563
564 async def join(self, timeout=None):
565 """Wait for the process to exit"""
566 if self._wait_thread is not None:
567 self._wait_thread.join(timeout=timeout)
568 if self._wait_thread.is_alive():
569 raise TimeoutError(
570 f"Process {self.process.pid} did not exit in {timeout} seconds."
571 )
572
573 def _stream_file(self, path):
574 """Stream one file"""
575 with open(path) as f:
576 while self.state == 'running' and not self._stop_waiting.is_set():
577 line = f.readline()
578 # log prefix?
579 # or stream directly to sys.stderr
580 if line:
581 sys.stderr.write(line)
582 else:
583 # pause while we are at the end of the file
584 time.sleep(0.1)
585
586 def _start_streaming(self):
587 self._stream_thread = t = Thread(
588 target=partial(self._stream_file, self.output_file),
589 name=f"Stream Output {self.identifier}",
590 daemon=True,
591 )
592 t.start()
593
594 _output = None
595
596 def get_output(self, remove=False):
597 if self._output is None:
598 if self.output_file:
599 try:
600 with open(self.output_file) as f:
601 self._output = f.read()
602 except FileNotFoundError:
603 self.log.debug(f"Missing output file: {self.output_file}")
604 self._output = ""
605 else:
606 self._output = ""
607
608 if remove and os.path.isfile(self.output_file):
609 self.log.debug(f"Removing {self.output_file}")
610 try:
611 os.remove(self.output_file)
612 except Exception as e:
613 # don't crash on failure to remove a file,
614 # e.g. due to another processing having it open
615 self.log.error(f"Failed to remove {self.output_file}: {e}")
616
617 return self._output
618
619 async def stop(self):
620 try:
621 self.signal(SIGTERM)
622 except Exception as e:
623 self.log.debug(f"TERM failed: {e!r}")
624
625 try:
626 await self.join(timeout=self.stop_seconds_until_kill)
627 except TimeoutError:
628 self.log.warning(
629 f"Process {self.pid} did not exit in {self.stop_seconds_until_kill} seconds after TERM"
630 )
631 else:
632 return
633
634 try:
635 self.signal(SIGKILL)
636 except Exception as e:
637 self.log.debug(f"KILL failed: {e!r}")
638
639 await self.join(timeout=self.stop_timeout)
640
641 def signal(self, sig):
642 if self.state == 'running':
643 if WINDOWS and sig in {SIGTERM, SIGKILL}:
644 # use Windows tree-kill for better child cleanup
645 cmd = ['taskkill', '/pid', str(self.process.pid), '/t', '/F']
646 check_output(cmd)
647 else:
648 self.process.send_signal(sig)
649
650 # callbacks, etc:
651
652 def handle_stdout(self, fd, events):
653 if WINDOWS:
654 line = self.stdout.recv().decode('utf8', 'replace')
655 else:
656 line = self.stdout.readline().decode('utf8', 'replace')
657 # a stopped process will be readable but return empty strings
658 if line:
659 self.log.debug(line.rstrip())
660
661 def handle_stderr(self, fd, events):
662 if WINDOWS:
663 line = self.stderr.recv().decode('utf8', 'replace')
664 else:
665 line = self.stderr.readline().decode('utf8', 'replace')
666 # a stopped process will be readable but return empty strings
667 if line:
668 self.log.debug(line.rstrip())
669 else:
670 self.poll()
671
672 def poll(self):
673 if self.process.is_running():
674 return None
675
676 status = self.process.wait(0)
677 if status is None:
678 # return code cannot always be retrieved.
679 # but we need to not return None if it's still running
680 status = 'unknown'
681 self.notify_stop(
682 dict(exit_code=status, pid=self.process.pid, identifier=self.identifier)
683 )
684 return status
685
686
687class LocalControllerLauncher(LocalProcessLauncher, ControllerLauncher):
688 """Launch a controller as a regular external process."""
689
690 def find_args(self):
691 return self.controller_cmd + self.cluster_args + self.controller_args
692
693 def start(self):
694 """Start the controller by profile_dir."""
695 return super().start()
696
697
698class LocalEngineLauncher(LocalProcessLauncher, EngineLauncher):
699 """Launch a single engine as a regular external process."""
700
701 def find_args(self):
702 return self.engine_cmd + self.cluster_args + self.engine_args
703
704
705class LocalEngineSetLauncher(LocalEngineLauncher):
706 """Launch a set of engines as regular external processes."""
707
708 delay = Float(
709 0.1,
710 config=True,
711 help="""delay (in seconds) between starting each engine after the first.
712 This can help force the engines to get their ids in order, or limit
713 process flood when starting many engines.""",
714 )
715
716 # launcher class
717 launcher_class = LocalEngineLauncher
718
719 launchers = Dict()
720 stop_data = Dict()
721 outputs = Dict()
722 output_file = "" # no output file for me
723
724 def __init__(self, work_dir='.', config=None, **kwargs):
725 super().__init__(work_dir=work_dir, config=config, **kwargs)
726
727 def to_dict(self):
728 d = super().to_dict()
729 d['engines'] = {i: launcher.to_dict() for i, launcher in self.launchers.items()}
730 return d
731
732 @classmethod
733 def from_dict(cls, d, **kwargs):
734 self = super().from_dict(d, **kwargs)
735 n = 0
736 for i, engine_dict in d['engines'].items():
737 try:
738 self.launchers[i] = el = self.launcher_class.from_dict(
739 engine_dict, identifier=i, parent=self
740 )
741 except NotRunning as e:
742 self.log.error(f"Engine {i} not running: {e}")
743 else:
744 n += 1
745 el.on_stop(self._notice_engine_stopped)
746 if n == 0:
747 raise NotRunning("No engines left")
748 else:
749 self.n = n
750 return self
751
752 def start(self, n):
753 """Start n engines by profile or profile_dir."""
754 self.n = n
755 dlist = []
756 for i in range(n):
757 identifier = str(i)
758 if i > 0:
759 time.sleep(self.delay)
760 el = self.launchers[identifier] = self.launcher_class(
761 work_dir=self.work_dir,
762 parent=self,
763 log=self.log,
764 profile_dir=self.profile_dir,
765 cluster_id=self.cluster_id,
766 environment=self.environment,
767 identifier=identifier,
768 output_file=os.path.join(
769 self.profile_dir,
770 "log",
771 f"ipengine-{self.cluster_id}-{self.engine_set_id}-{i}.log",
772 ),
773 )
774
775 # Copy the engine args over to each engine launcher.
776 el.engine_cmd = copy.deepcopy(self.engine_cmd)
777 el.engine_args = copy.deepcopy(self.engine_args)
778 el.on_stop(self._notice_engine_stopped)
779 d = el.start()
780 dlist.append(d)
781 self.notify_start(dlist)
782 return dlist
783
784 def find_args(self):
785 return ['engine set']
786
787 def signal(self, sig):
788 for el in list(self.launchers.values()):
789 el.signal(sig)
790
791 async def stop(self):
792 futures = []
793 for el in list(self.launchers.values()):
794 f = el.stop()
795 if inspect.isawaitable(f):
796 futures.append(asyncio.ensure_future(f))
797
798 if futures:
799 await asyncio.gather(*futures)
800
801 def _notice_engine_stopped(self, data):
802 identifier = data['identifier']
803 launcher = self.launchers.pop(identifier)
804 engines = self.stop_data.setdefault("engines", {})
805 if launcher is not None:
806 self.outputs[identifier] = launcher.get_output()
807 engines[identifier] = data
808 if not self.launchers:
809 # get exit code from engine exit codes
810 # set error code if any engine has an error
811 self.stop_data["exit_code"] = None
812 for engine in engines.values():
813 if 'exit_code' in engine:
814 if self.stop_data['exit_code'] is None:
815 self.stop_data['exit_code'] = engine['exit_code']
816 if engine['exit_code']:
817 # save the first nonzero exit code
818 self.stop_data['exit_code'] = engine['exit_code']
819 break
820
821 self.notify_stop(self.stop_data)
822
823 def _log_output(self, stop_data=None):
824 # avoid double-logging output, already logged by each engine
825 # that will be a lot if all 100 engines fail!
826 pass
827
828 def get_output(self, remove=False):
829 """Get the output of all my child Launchers"""
830 for identifier, launcher in self.launchers.items():
831 # remaining launchers
832 self.outputs[identifier] = launcher.get_output(remove=remove)
833
834 joined_output = []
835 for identifier, engine_output in self.outputs.items():
836 if engine_output:
837 joined_output.append(f"Output for engine {identifier}")
838 if self.output_limit:
839 engine_output = "".join(
840 engine_output.splitlines(True)[-self.output_limit :]
841 )
842 joined_output.append(indent(engine_output, ' '))
843 return '\n'.join(joined_output)
844
845
846# -----------------------------------------------------------------------------
847# MPI launchers
848# -----------------------------------------------------------------------------
849
850
851class MPILauncher(LocalProcessLauncher):
852 """Launch an external process using mpiexec."""
853
854 mpi_cmd = List(
855 ['mpiexec'],
856 config=True,
857 help="The mpiexec command to use in starting the process.",
858 )
859 mpi_args = List(
860 [], config=True, help="The command line arguments to pass to mpiexec."
861 )
862 program = List(['date'], help="The program to start via mpiexec.")
863 program_args = List([], help="The command line argument to the program.")
864
865 def __init__(self, *args, **kwargs):
866 # deprecation for old MPIExec names:
867 config = kwargs.get('config') or {}
868 for oldname in (
869 'MPIExecLauncher',
870 'MPIExecControllerLauncher',
871 'MPIExecEngineSetLauncher',
872 ):
873 deprecated = config.get(oldname)
874 if deprecated:
875 newname = oldname.replace('MPIExec', 'MPI')
876 config[newname].update(deprecated)
877 self.log.warning(
878 "WARNING: %s name has been deprecated, use %s", oldname, newname
879 )
880
881 super().__init__(*args, **kwargs)
882
883 def find_args(self):
884 """Build self.args using all the fields."""
885 return (
886 self.mpi_cmd
887 + ['-n', str(self.n)]
888 + self.mpi_args
889 + self.program
890 + self.program_args
891 )
892
893 def start(self, n=1):
894 """Start n instances of the program using mpiexec."""
895 self.n = n
896 return super().start()
897
898 def _log_output(self, stop_data):
899 """Try to log mpiexec error output, if any, at warning level"""
900 super()._log_output(stop_data)
901
902 if stop_data and self.stop_data.get("exit_code", 0) != 0:
903 # if this is True, super()._log_output would have already logged the full output
904 # no need to extract from MPI
905 return
906
907 output = self.get_output(remove=False)
908 mpiexec_lines = []
909
910 in_mpi = False
911 after_mpi = False
912 mpi_tail = 0
913 for line in output.splitlines(True):
914 if line.startswith("======="):
915 # mpich output looks like one block,
916 # with a few lines trailing after
917 # =========
918 # = message
919 # =
920 # =========
921 # YOUR APPLICATION TERMINATED WITH...
922 if in_mpi:
923 after_mpi = True
924 mpi_tail = 2
925 in_mpi = False
926 else:
927 in_mpi = True
928 elif not in_mpi and line.startswith("-----"):
929 # openmpi has less clear boundaries;
930 # potentially several blocks that start and end with `----`
931 # and error messages can show up after one or more blocks
932 # once we see one of these lines, capture everything after it
933 # toggle on each such line
934 if not in_mpi:
935 in_mpi = True
936 # this would let us only capture messages inside blocks
937 # but doing so would exclude most useful error output
938 # else:
939 # # show the trailing delimiter line
940 # mpiexec_lines.append(line)
941 # in_mpi = False
942 # continue
943
944 if in_mpi:
945 mpiexec_lines.append(line)
946 elif after_mpi:
947 if mpi_tail <= 0:
948 break
949 else:
950 mpi_tail -= 1
951 mpiexec_lines.append(line)
952
953 if mpiexec_lines:
954 self.log.warning("mpiexec error output:\n" + "".join(mpiexec_lines))
955
956
957class MPIControllerLauncher(MPILauncher, ControllerLauncher):
958 """Launch a controller using mpiexec."""
959
960 # alias back to *non-configurable* program[_args] for use in find_args()
961 # this way all Controller/EngineSetLaunchers have the same form, rather
962 # than *some* having `program_args` and others `controller_args`
963 @property
964 def program(self):
965 return self.controller_cmd
966
967 @property
968 def program_args(self):
969 return self.cluster_args + self.controller_args
970
971
972class MPIEngineSetLauncher(MPILauncher, EngineLauncher):
973 """Launch engines using mpiexec"""
974
975 # alias back to *non-configurable* program[_args] for use in find_args()
976 # this way all Controller/EngineSetLaunchers have the same form, rather
977 # than *some* having `program_args` and others `controller_args`
978 @property
979 def program(self):
980 return self.engine_cmd + ['--mpi']
981
982 @property
983 def program_args(self):
984 return self.cluster_args + self.engine_args
985
986 def start(self, n):
987 """Start n engines by profile or profile_dir."""
988 self.n = n
989 return super().start(n)
990
991
992# deprecated MPIExec names
993class DeprecatedMPILauncher:
994 def warn(self):
995 oldname = self.__class__.__name__
996 newname = oldname.replace('MPIExec', 'MPI')
997 self.log.warning("WARNING: %s name is deprecated, use %s", oldname, newname)
998
999
1000class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
1001 """Deprecated, use MPILauncher"""
1002
1003 def __init__(self, *args, **kwargs):
1004 super().__init__(*args, **kwargs)
1005 self.warn()
1006
1007
1008class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
1009 """Deprecated, use MPIControllerLauncher"""
1010
1011 def __init__(self, *args, **kwargs):
1012 super().__init__(*args, **kwargs)
1013 self.warn()
1014
1015
1016class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
1017 """Deprecated, use MPIEngineSetLauncher"""
1018
1019 def __init__(self, *args, **kwargs):
1020 super().__init__(*args, **kwargs)
1021 self.warn()
1022
1023
1024# -----------------------------------------------------------------------------
1025# SSH launchers
1026# -----------------------------------------------------------------------------
1027
1028ssh_output_pattern = re.compile(r"__([a-z][a-z0-9_]+)=([a-z0-9\-\.]+)__", re.IGNORECASE)
1029
1030
1031def _ssh_outputs(out):
1032 """Extract ssh output variables from process output"""
1033 return dict(ssh_output_pattern.findall(out))
1034
1035
1036def sshx(ssh_cmd, cmd, env, remote_output_file, log=None):
1037 """Launch a remote process, returning its remote pid
1038
1039 Uses nohup and pipes to put it in the background
1040 """
1041 remote_cmd = shlex_join(cmd)
1042
1043 nohup_start = f"nohup {remote_cmd} > {remote_output_file} 2>&1 </dev/null & echo __remote_pid=$!__"
1044 full_cmd = ssh_cmd + ["--", "sh -"]
1045
1046 input_script = "\n".join(
1047 [
1048 "set -eu",
1049 ]
1050 + [f"export {key}={shlex.quote(value)}" for key, value in env.items()]
1051 + ["", f"exec {nohup_start}"]
1052 )
1053 if log:
1054 log.info(f"Running `{remote_cmd}`")
1055 log.debug("Running script via ssh:\n%s", input_script)
1056 out = check_output(full_cmd, input=input_script.encode("utf8")).decode(
1057 "utf8", "replace"
1058 )
1059 values = _ssh_outputs(out)
1060 if 'remote_pid' in values:
1061 return int(values['remote_pid'])
1062 else:
1063 raise RuntimeError("Failed to get pid for {full_cmd}: {out}")
1064
1065
1066def ssh_waitpid(pid, timeout=None):
1067 """To be called on a remote host, waiting on a pid"""
1068 try:
1069 p = psutil.Process(pid)
1070 exit_code = p.wait(timeout)
1071 except psutil.NoSuchProcess:
1072 print("__process_running=0__")
1073 print("__exit_code=-1__")
1074 except psutil.TimeoutExpired:
1075 print("__process_running=1__")
1076 else:
1077 print("__process_running=0__")
1078 print("__exit_code=-1__")
1079
1080
1081class SSHLauncher(LocalProcessLauncher):
1082 """A minimal launcher for ssh.
1083
1084 To be useful this will probably have to be extended to use the ``sshx``
1085 idea for environment variables. There could be other things this needs
1086 as well.
1087 """
1088
1089 ssh_cmd = List(['ssh'], config=True, help="command for starting ssh").tag(
1090 to_dict=True
1091 )
1092 ssh_args = List([], config=True, help="args to pass to ssh").tag(to_dict=True)
1093 scp_cmd = List(['scp'], config=True, help="command for sending files").tag(
1094 to_dict=True
1095 )
1096 scp_args = List([], config=True, help="args to pass to scp").tag(to_dict=True)
1097 program = List([], help="Program to launch via ssh")
1098 program_args = List([], help="args to pass to remote program")
1099 hostname = Unicode(
1100 '', config=True, help="hostname on which to launch the program"
1101 ).tag(to_dict=True)
1102 user = Unicode('', config=True, help="username for ssh").tag(to_dict=True)
1103 location = Unicode(
1104 '', config=True, help="user@hostname location for ssh in one setting"
1105 )
1106 to_fetch = List(
1107 [], config=True, help="List of (remote, local) files to fetch after starting"
1108 )
1109 to_send = List(
1110 [], config=True, help="List of (local, remote) files to send before starting"
1111 )
1112
1113 @default("poll_seconds")
1114 def _default_poll_seconds(self):
1115 # slower poll for ssh
1116 return 60
1117
1118 @observe('hostname')
1119 def _hostname_changed(self, change):
1120 if self.user:
1121 self.location = '{}@{}'.format(self.user, change['new'])
1122 else:
1123 self.location = change['new']
1124
1125 @observe('user')
1126 def _user_changed(self, change):
1127 self.location = '{}@{}'.format(change['new'], self.hostname)
1128
1129 def find_args(self):
1130 # not really used except in logging
1131 return list(self.ssh_cmd)
1132
1133 remote_output_file = Unicode(
1134 help="""The remote file to store output""",
1135 ).tag(to_dict=True)
1136
1137 @default("remote_output_file")
1138 def _default_remote_output_file(self):
1139 full_program = ' '.join(self.program)
1140 if 'engine' in full_program:
1141 name = 'ipengine'
1142 elif 'controller' in full_program:
1143 name = 'ipcontroller'
1144 else:
1145 name = self.program[0]
1146 return os.path.join(
1147 self.remote_profile_dir,
1148 'log',
1149 os.path.basename(name) + f"-{time.time():.4f}.out",
1150 )
1151
1152 remote_profile_dir = Unicode(
1153 '',
1154 config=True,
1155 help="""The remote profile_dir to use.
1156
1157 If not specified, use calling profile, stripping out possible leading homedir.
1158 """,
1159 ).tag(to_dict=True)
1160
1161 @observe('profile_dir')
1162 def _profile_dir_changed(self, change):
1163 if not self.remote_profile_dir:
1164 # trigger remote_profile_dir_default logic again,
1165 # in case it was already triggered before profile_dir was set
1166 self.remote_profile_dir = self._strip_home(change['new'])
1167
1168 remote_python = Unicode(
1169 "python3", config=True, help="""Remote path to Python interpreter, if needed"""
1170 ).tag(to_dict=True)
1171
1172 @staticmethod
1173 def _strip_home(path):
1174 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
1175 home = get_home_dir()
1176 if not home.endswith('/'):
1177 home = home + '/'
1178
1179 if path.startswith(home):
1180 return path[len(home) :]
1181 else:
1182 return path
1183
1184 @default("remote_profile_dir")
1185 def _remote_profile_dir_default(self):
1186 return self._strip_home(self.profile_dir)
1187
1188 @property
1189 def cluster_env(self):
1190 # use remote profile dir in env
1191 env = super().cluster_env
1192 env['IPP_PROFILE_DIR'] = self.remote_profile_dir
1193 return env
1194
1195 _output = None
1196
1197 _ssh_sender = None
1198
1199 @property
1200 def ssh_sender(self):
1201 """instantiate ShellCommandSend object if needed"""
1202 if self._ssh_sender:
1203 return self._ssh_sender
1204
1205 self.log.info(
1206 f'Create ShellCommandSend object ({self.ssh_cmd}, {self.ssh_args + [self.location]}, {self.remote_python} )'
1207 )
1208 self._ssh_sender = ShellCommandSend(
1209 self.ssh_cmd,
1210 self.ssh_args + [self.location],
1211 self.remote_python,
1212 log=self.log,
1213 )
1214 return self._ssh_sender
1215
1216 def _reconstruct_process(self, d):
1217 # called in from_dict
1218 # override from LocalProcessLauncher which invokes psutil.Process
1219 if 'pid' in d and d['pid'] > 0:
1220 self._start_waiting()
1221
1222 def poll(self):
1223 """Override poll"""
1224 if self.state == 'running':
1225 return None
1226 else:
1227 return 0
1228
1229 def get_output(self, remove=False):
1230 """Retrieve engine output from the remote file"""
1231 if self._output is None:
1232 with TemporaryDirectory() as td:
1233 output_file = os.path.join(
1234 td, os.path.basename(self.remote_output_file)
1235 )
1236 try:
1237 self._fetch_file(self.remote_output_file, output_file)
1238 except Exception as e:
1239 self.log.error(
1240 f"Failed to get output file {self.remote_output_file}: {e}"
1241 )
1242 self._output = ''
1243 else:
1244 if remove:
1245 # remove the file after we retrieve it
1246 self.log.info(
1247 f"Removing {self.location}:{self.remote_output_file}"
1248 )
1249 self.ssh_sender.cmd_remove(self.remote_output_file)
1250 with open(output_file) as f:
1251 self._output = f.read()
1252 return self._output
1253
1254 def _send_file(self, local, remote, wait=True):
1255 """send a single file"""
1256 full_remote = f"{self.location}:{remote}".replace(os.path.sep, "/")
1257 for i in range(10 if wait else 0):
1258 if not os.path.exists(local):
1259 self.log.debug(f"waiting for {local}")
1260 time.sleep(1)
1261 else:
1262 break
1263 remote_dir = os.path.dirname(remote)
1264 self.log.info("ensuring remote %s:%s/ exists", self.location, remote_dir)
1265 if not self.ssh_sender.cmd_exists(remote_dir):
1266 self.ssh_sender.cmd_mkdir(remote_dir)
1267 self.log.info("sending %s to %s", local, full_remote)
1268 check_output(self.scp_cmd + self.scp_args + [local, full_remote], input=None)
1269
1270 def send_files(self):
1271 """send our files (called before start)"""
1272 if not self.to_send:
1273 return
1274 for local_file, remote_file in self.to_send:
1275 self._send_file(local_file, remote_file)
1276
1277 def _fetch_file(self, remote, local, wait=True):
1278 """fetch a single file"""
1279 full_remote = f"{self.location}:{remote}".replace(os.path.sep, "/")
1280 self.log.info("fetching %s from %s", local, full_remote)
1281 for i in range(10 if wait else 0):
1282 # wait up to 10s for remote file to exist
1283 check = self.ssh_sender.cmd_exists(remote)
1284 if check is False:
1285 time.sleep(1)
1286 elif check is True:
1287 break
1288 else:
1289 raise ValueError(f"cmd_exists expects bool, got {check!r}")
1290 local_dir = os.path.dirname(local)
1291 ensure_dir_exists(local_dir, 700)
1292 check_output(self.scp_cmd + self.scp_args + [full_remote, local])
1293
1294 def fetch_files(self):
1295 """fetch remote files (called after start)"""
1296 if not self.to_fetch:
1297 return
1298 for remote_file, local_file in self.to_fetch:
1299 self._fetch_file(remote_file, local_file)
1300
1301 def start(self, hostname=None, user=None, port=None):
1302 if hostname is not None:
1303 self.hostname = hostname
1304 if user is not None:
1305 self.user = user
1306 if port is not None:
1307 if '-p' not in self.ssh_args:
1308 self.ssh_args.append('-p')
1309 self.ssh_args.append(str(port))
1310 if '-P' not in self.scp_args:
1311 self.scp_args.append('-P')
1312 self.scp_args.append(str(port))
1313
1314 # do some checks that setting are correct
1315 shell_info = self.ssh_sender.get_shell_info()
1316 python_ok = self.ssh_sender.has_python()
1317 self.log.debug(
1318 f"ssh sender object initiated (break_away_support={self.ssh_sender.breakaway_support})"
1319 )
1320
1321 # create remote profile dir
1322 self.ssh_sender.check_output_python_module(
1323 ["IPython", "profile", "create", "--profile-dir", self.remote_profile_dir]
1324 )
1325 self.send_files()
1326 self.pid = self.ssh_sender.cmd_start(
1327 self.program + self.program_args,
1328 env=self.get_env(),
1329 output_file=self.remote_output_file,
1330 )
1331 remote_cmd = ' '.join(self.program + self.program_args)
1332 self.log.debug("Running `%s` (pid=%s)", remote_cmd, self.pid)
1333 self.notify_start({'host': self.location, 'pid': self.pid})
1334 self._start_waiting()
1335 self.fetch_files()
1336
1337 def _wait(self):
1338 """Background thread waiting for a process to exit"""
1339 exit_code = None
1340 while not self._stop_waiting.is_set() and self.state == 'running':
1341 try:
1342 # use a timeout so we can check the _stop_waiting event
1343 exit_code = self.wait_one(timeout=self.poll_seconds)
1344 except TimeoutError:
1345 continue
1346 else:
1347 break
1348 stop_data = dict(exit_code=exit_code, pid=self.pid, identifier=self.identifier)
1349 self.loop.add_callback(lambda: self.notify_stop(stop_data))
1350
1351 def _start_waiting(self):
1352 """Start background thread waiting on the process to exit"""
1353 # ensure self.loop is accessed on the main thread before waiting
1354 self.loop
1355 self._stop_waiting = threading.Event()
1356 self._wait_thread = Thread(
1357 target=self._wait,
1358 daemon=True,
1359 name=f"wait(host={self.location}, pid={self.pid})",
1360 )
1361 self._wait_thread.start()
1362
1363 def wait_one(self, timeout):
1364 python_code = f"from ipyparallel.cluster.launcher import ssh_waitpid; ssh_waitpid({self.pid}, timeout={timeout})"
1365 out = self.ssh_sender.check_output_python_code(python_code)
1366 values = _ssh_outputs(out)
1367 if 'process_running' not in values:
1368 raise RuntimeError(out)
1369 running = int(values.get("process_running", 0))
1370 if running:
1371 raise TimeoutError("still running")
1372 return int(values.get("exit_code", -1))
1373
1374 async def join(self, timeout=None):
1375 with ThreadPoolExecutor(1) as pool:
1376 wait = partial(self.wait_one, timeout=timeout)
1377 try:
1378 future = pool.submit(wait)
1379 except RuntimeError:
1380 # e.g. called during process shutdown,
1381 # which raises
1382 # RuntimeError: cannot schedule new futures after interpreter shutdown
1383 # Instead, do the blocking call
1384 wait()
1385 else:
1386 await asyncio.wrap_future(future)
1387 if getattr(self, '_stop_waiting', None) and self._wait_thread:
1388 self._stop_waiting.set()
1389 # got here, should be done
1390 # wait for wait_thread to cleanup
1391 self._wait_thread.join()
1392
1393 def signal(self, sig):
1394 if self.state == 'running':
1395 self.ssh_sender.cmd_kill(self.pid, sig)
1396
1397 @property
1398 def remote_connection_files(self):
1399 """Return remote paths for connection files"""
1400 return {
1401 key: self.remote_profile_dir + local_path[len(self.profile_dir) :]
1402 for key, local_path in self.connection_files.items()
1403 }
1404
1405
1406class SSHControllerLauncher(SSHLauncher, ControllerLauncher):
1407 # alias back to *non-configurable* program[_args] for use in find_args()
1408 # this way all Controller/EngineSetLaunchers have the same form, rather
1409 # than *some* having `program_args` and others `controller_args`
1410
1411 def _controller_cmd_default(self):
1412 return [self.remote_python, "-m", 'ipyparallel.controller']
1413
1414 @property
1415 def program(self):
1416 return self.controller_cmd
1417
1418 @property
1419 def program_args(self):
1420 return self.cluster_args + self.controller_args
1421
1422 @default("to_fetch")
1423 def _to_fetch_default(self):
1424 to_fetch = []
1425 return [
1426 (self.remote_connection_files[key], local_path)
1427 for key, local_path in self.connection_files.items()
1428 ]
1429
1430
1431class SSHEngineLauncher(SSHLauncher, EngineLauncher):
1432 # alias back to *non-configurable* program[_args] for use in find_args()
1433 # this way all Controller/EngineSetLaunchers have the same form, rather
1434 # than *some* having `program_args` and others `controller_args`
1435
1436 def _engine_cmd_default(self):
1437 return [self.remote_python, "-m", "ipyparallel.engine"]
1438
1439 @property
1440 def program(self):
1441 return self.engine_cmd
1442
1443 @property
1444 def program_args(self):
1445 return self.cluster_args + self.engine_args
1446
1447 @default("to_send")
1448 def _to_send_default(self):
1449 return [
1450 (local_path, self.remote_connection_files[key])
1451 for key, local_path in self.connection_files.items()
1452 ]
1453
1454
1455class SSHEngineSetLauncher(LocalEngineSetLauncher, SSHLauncher):
1456 launcher_class = SSHEngineLauncher
1457 engines = Dict(
1458 config=True,
1459 help="""dict of engines to launch. This is a dict by hostname of ints,
1460 corresponding to the number of engines to start on that host.""",
1461 ).tag(to_dict=True)
1462
1463 def _engine_cmd_default(self):
1464 return [self.remote_python, "-m", "ipyparallel.engine"]
1465
1466 # unset some traits we inherit but don't use
1467 remote_output_file = ""
1468
1469 def start(self, n):
1470 """Start engines by profile or profile_dir.
1471 `n` is an *upper limit* of engines.
1472 The `engines` config property is used to assign slots to hosts.
1473 """
1474
1475 dlist = []
1476 # traits to inherit:
1477 # + all common config traits
1478 # - traits set per-engine via engines dict
1479 # + some non-configurable traits such as cluster_id
1480 engine_traits = self.launcher_class.class_traits(config=True)
1481 my_traits = self.traits(config=True)
1482 shared_traits = set(my_traits).intersection(engine_traits)
1483 # in addition to shared traits, pass some derived traits
1484 # and exclude some composite traits
1485 inherited_traits = shared_traits.difference(
1486 {"location", "user", "hostname", "to_send", "to_fetch"}
1487 ).union({"profile_dir", "cluster_id"})
1488
1489 requested_n = n
1490 started_n = 0
1491 for host, n_or_config in self.engines.items():
1492 if isinstance(n_or_config, dict):
1493 overrides = n_or_config
1494 n = overrides.pop("n", 1)
1495 else:
1496 overrides = {}
1497 n = n_or_config
1498
1499 full_host = host
1500
1501 if '@' in host:
1502 user, host = host.split('@', 1)
1503 else:
1504 user = None
1505 if ':' in host:
1506 host, port = host.split(':', 1)
1507 else:
1508 port = None
1509
1510 for i in range(min(n, requested_n - started_n)):
1511 if i > 0:
1512 time.sleep(self.delay)
1513 # pass all common traits to the launcher
1514 kwargs = {attr: getattr(self, attr) for attr in inherited_traits}
1515 # overrides from engine config
1516 kwargs.update(overrides)
1517 # explicit per-engine values
1518 kwargs['parent'] = self
1519 kwargs['identifier'] = key = f"{full_host}/{i}"
1520 el = self.launchers[key] = self.launcher_class(**kwargs)
1521 if i > 0:
1522 # only send files for the first engine on each host
1523 el.to_send = []
1524
1525 el.on_stop(self._notice_engine_stopped)
1526 d = el.start(user=user, hostname=host, port=port)
1527 dlist.append(key)
1528 started_n += 1
1529 if started_n >= requested_n:
1530 break
1531 self.notify_start(dlist)
1532 self.n = started_n
1533 return dlist
1534
1535
1536class SSHProxyEngineSetLauncher(SSHLauncher, EngineLauncher):
1537 """Launcher for calling
1538 `ipcluster engines` on a remote machine.
1539
1540 Requires that remote profile is already configured.
1541 """
1542
1543 n = Integer().tag(to_dict=True)
1544 ipcluster_cmd = List(Unicode(), config=True)
1545
1546 @default("ipcluster_cmd")
1547 def _default_ipcluster_cmd(self):
1548 return [self.remote_python, "-m", "ipyparallel.cluster"]
1549
1550 ipcluster_args = List(
1551 Unicode(),
1552 config=True,
1553 help="""Extra CLI arguments to pass to ipcluster engines""",
1554 )
1555
1556 @property
1557 def program(self):
1558 return self.ipcluster_cmd + ['engines']
1559
1560 @property
1561 def program_args(self):
1562 return [
1563 '-n',
1564 str(self.n),
1565 '--profile-dir',
1566 self.remote_profile_dir,
1567 '--cluster-id',
1568 self.cluster_id,
1569 ] + self.ipcluster_args
1570
1571 @default("to_send")
1572 def _to_send_default(self):
1573 return [
1574 (local_path, self.remote_connection_files[key])
1575 for key, local_path in self.connection_files.items()
1576 ]
1577
1578 def start(self, n):
1579 self.n = n
1580 super().start()
1581
1582
1583# -----------------------------------------------------------------------------
1584# Windows HPC Server 2008 scheduler launchers
1585# -----------------------------------------------------------------------------
1586
1587
1588class WindowsHPCLauncher(BaseLauncher):
1589 job_id_regexp = CRegExp(
1590 r'\d+',
1591 config=True,
1592 help="""A regular expression used to get the job id from the output of the
1593 submit_command. """,
1594 )
1595 job_file_name = Unicode(
1596 'ipython_job.xml',
1597 config=True,
1598 help="The filename of the instantiated job script.",
1599 )
1600 scheduler = Unicode(
1601 '', config=True, help="The hostname of the scheduler to submit the job to."
1602 )
1603 job_cmd = Unicode(config=True, help="The command for submitting jobs.")
1604
1605 @default("job_cmd")
1606 def _default_job(self):
1607 return shutil.which("job") or "job"
1608
1609 @property
1610 def job_file(self):
1611 return os.path.join(self.work_dir, self.job_file_name)
1612
1613 def write_job_file(self, n):
1614 raise NotImplementedError("Implement write_job_file in a subclass.")
1615
1616 def find_args(self):
1617 return ['job.exe']
1618
1619 def parse_job_id(self, output):
1620 """Take the output of the submit command and return the job id."""
1621 m = self.job_id_regexp.search(output)
1622 if m is not None:
1623 job_id = m.group()
1624 else:
1625 raise LauncherError(f"Job id couldn't be determined: {output}")
1626 self.job_id = job_id
1627 self.log.info(f'Job started with id: {job_id}')
1628 return job_id
1629
1630 def start(self, n):
1631 """Start n copies of the process using the Win HPC job scheduler."""
1632 self.write_job_file(n)
1633 args = [
1634 'submit',
1635 f'/jobfile:{self.job_file}',
1636 f'/scheduler:{self.scheduler}',
1637 ]
1638 self.log.debug(
1639 "Starting Win HPC Job: {}".format(self.job_cmd + ' ' + ' '.join(args))
1640 )
1641
1642 output = check_output(
1643 [self.job_cmd] + args, env=os.environ, cwd=self.work_dir, stderr=STDOUT
1644 )
1645 output = output.decode("utf8", 'replace')
1646 job_id = self.parse_job_id(output)
1647 self.notify_start(job_id)
1648 return job_id
1649
1650 def stop(self):
1651 args = ['cancel', self.job_id, f'/scheduler:{self.scheduler}']
1652 self.log.info(
1653 "Stopping Win HPC Job: {}".format(self.job_cmd + ' ' + ' '.join(args))
1654 )
1655 try:
1656 output = check_output(
1657 [self.job_cmd] + args, env=os.environ, cwd=self.work_dir, stderr=STDOUT
1658 )
1659 output = output.decode("utf8", 'replace')
1660 except Exception:
1661 output = f'The job already appears to be stopped: {self.job_id}'
1662 self.notify_stop(
1663 dict(job_id=self.job_id, output=output)
1664 ) # Pass the output of the kill cmd
1665 return output
1666
1667
1668class WindowsHPCControllerLauncher(WindowsHPCLauncher):
1669 job_file_name = Unicode(
1670 'ipcontroller_job.xml', config=True, help="WinHPC xml job file."
1671 )
1672 controller_args = List([], config=False, help="extra args to pass to ipcontroller")
1673
1674 def write_job_file(self, n):
1675 job = IPControllerJob(parent=self)
1676
1677 t = IPControllerTask(parent=self)
1678 # The tasks work directory is *not* the actual work directory of
1679 # the controller. It is used as the base path for the stdout/stderr
1680 # files that the scheduler redirects to.
1681 t.work_directory = self.profile_dir
1682 # Add the profile_dir and from self.start().
1683 t.controller_args.extend(self.cluster_args)
1684 t.controller_args.extend(self.controller_args)
1685 job.add_task(t)
1686
1687 self.log.debug("Writing job description file: %s", self.job_file)
1688 job.write(self.job_file)
1689
1690
1691class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
1692 job_file_name = Unicode(
1693 'ipengineset_job.xml', config=True, help="jobfile for ipengines job"
1694 )
1695 engine_args = List(Unicode(), config=False, help="extra args to pas to ipengine")
1696
1697 def write_job_file(self, n):
1698 job = IPEngineSetJob(parent=self)
1699
1700 for i in range(n):
1701 t = IPEngineTask(parent=self)
1702 # The tasks work directory is *not* the actual work directory of
1703 # the engine. It is used as the base path for the stdout/stderr
1704 # files that the scheduler redirects to.
1705 t.work_directory = self.profile_dir
1706 # Add the profile_dir and from self.start().
1707 t.engine_args.extend(self.cluster_args)
1708 t.engine_args.extend(self.engine_args)
1709 job.add_task(t)
1710
1711 self.log.debug("Writing job description file: %s", self.job_file)
1712 job.write(self.job_file)
1713
1714 def start(self, n):
1715 """Start the controller by profile_dir."""
1716 return super().start(n)
1717
1718
1719# -----------------------------------------------------------------------------
1720# Batch (PBS) system launchers
1721# -----------------------------------------------------------------------------
1722
1723
1724class BatchSystemLauncher(BaseLauncher):
1725 """Launch an external process using a batch system.
1726
1727 This class is designed to work with UNIX batch systems like PBS, LSF,
1728 GridEngine, etc. The overall model is that there are different commands
1729 like qsub, qdel, etc. that handle the starting and stopping of the process.
1730
1731 This class also has the notion of a batch script. The ``batch_template``
1732 attribute can be set to a string that is a template for the batch script.
1733 This template is instantiated using string formatting. Thus the template can
1734 use {n} for the number of instances. Subclasses can add additional variables
1735 to the template dict.
1736 """
1737
1738 # load cluster args into context instead of cli
1739
1740 output_file = Unicode(
1741 config=True, help="File in which to store stdout/err of processes"
1742 ).tag(to_dict=True)
1743
1744 @default("output_file")
1745 def _default_output_file(self):
1746 log_dir = os.path.join(self.profile_dir, "log")
1747 os.makedirs(log_dir, exist_ok=True)
1748 return os.path.join(log_dir, f'{self.identifier}.log')
1749
1750 # Subclasses must fill these in. See PBSEngineSet
1751 submit_command = List(
1752 [''],
1753 config=True,
1754 help="The name of the command line program used to submit jobs.",
1755 )
1756 delete_command = List(
1757 [''],
1758 config=True,
1759 help="The name of the command line program used to delete jobs.",
1760 )
1761
1762 signal_command = List(
1763 [''],
1764 config=True,
1765 help="The name of the command line program used to send signals to jobs.",
1766 )
1767
1768 job_id = Unicode().tag(to_dict=True)
1769
1770 job_id_regexp = CRegExp(
1771 '',
1772 config=True,
1773 help="""A regular expression used to get the job id from the output of the
1774 submit_command.""",
1775 )
1776 job_id_regexp_group = Integer(
1777 0,
1778 config=True,
1779 help="""The group we wish to match in job_id_regexp (0 to match all)""",
1780 )
1781 batch_template = Unicode(
1782 '', config=True, help="The string that is the batch script template itself."
1783 ).tag(to_dict=True)
1784 batch_template_file = Unicode(
1785 '', config=True, help="The file that contains the batch template."
1786 )
1787 batch_file_name = Unicode(
1788 'batch_script',
1789 config=True,
1790 help="The filename of the instantiated batch script.",
1791 ).tag(to_dict=True)
1792 queue = Unicode('', config=True, help="The batch queue.").tag(to_dict=True)
1793
1794 n = Integer(1).tag(to_dict=True)
1795
1796 @observe('queue', 'n', 'cluster_id', 'profile_dir', 'output_file')
1797 def _context_field_changed(self, change):
1798 self._update_context(change)
1799
1800 # not configurable, override in subclasses
1801 # Job Array regex
1802 job_array_regexp = CRegExp('')
1803 job_array_template = Unicode('')
1804
1805 # Queue regex
1806 queue_regexp = CRegExp('')
1807 queue_template = Unicode('')
1808
1809 # Output file
1810 output_regexp = CRegExp('')
1811 output_template = Unicode('')
1812
1813 # The default batch template, override in subclasses
1814 default_template = Unicode('')
1815 # The full path to the instantiated batch script.
1816 batch_file = Unicode('')
1817 # the format dict used with batch_template:
1818 context = Dict()
1819
1820 namespace = Dict(
1821 config=True,
1822 help="""Extra variables to pass to the template.
1823
1824 This lets you parameterize additional options,
1825 such as wall_time with a custom template.
1826 """,
1827 ).tag(to_dict=True)
1828
1829 @default("context")
1830 def _context_default(self):
1831 """load the default context with the default values for the basic keys
1832
1833 because the _trait_changed methods only load the context if they
1834 are set to something other than the default value.
1835 """
1836 return dict(
1837 n=self.n,
1838 queue=self.queue,
1839 profile_dir=self.profile_dir,
1840 cluster_id=self.cluster_id,
1841 output_file=self.output_file,
1842 )
1843
1844 program = List(Unicode())
1845 program_args = List(Unicode())
1846
1847 @observe("program", "program_args")
1848 def _program_changed(self, change=None):
1849 self.context['program'] = shlex_join(self.program)
1850 self.context['program_args'] = shlex_join(self.program_args)
1851 self.context['program_and_args'] = shlex_join(self.program + self.program_args)
1852
1853 @observe("n", "queue")
1854 def _update_context(self, change):
1855 self.context[change['name']] = change['new']
1856
1857 # the Formatter instance for rendering the templates:
1858 formatter = Instance(EvalFormatter, (), {})
1859
1860 def find_args(self):
1861 return self.submit_command + [self.batch_file]
1862
1863 def __init__(self, work_dir='.', config=None, **kwargs):
1864 super().__init__(work_dir=work_dir, config=config, **kwargs)
1865 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1866 # trigger program_changed to populate default context arguments
1867 self._program_changed()
1868
1869 def _run_command(self, command, **kwargs):
1870 joined_command = shlex_join(command)
1871 self.log.debug("Running command: %s", joined_command)
1872 output = check_output(
1873 command,
1874 stdin=None,
1875 **kwargs,
1876 ).decode("utf8", "replace")
1877 self.log.debug("Command %s output: %s", command[0], output)
1878 return output
1879
1880 def parse_job_id(self, output):
1881 """Take the output of the submit command and return the job id."""
1882 m = self.job_id_regexp.search(output)
1883 if m is not None:
1884 job_id = m.group(self.job_id_regexp_group)
1885 else:
1886 raise LauncherError(f"Job id couldn't be determined: {output}")
1887 self.job_id = job_id
1888 self.log.info('Job submitted with job id: %r', job_id)
1889 return job_id
1890
1891 def write_batch_script(self, n=1):
1892 """Instantiate and write the batch script to the work_dir."""
1893 self.n = n
1894 self.context['environment_json'] = json.dumps(self.get_env())
1895
1896 # first priority is batch_template if set
1897 if self.batch_template_file and not self.batch_template:
1898 # second priority is batch_template_file
1899 with open(self.batch_template_file) as f:
1900 self.batch_template = f.read()
1901 if not self.batch_template:
1902 # third (last) priority is default_template
1903 self.batch_template = self.default_template
1904 # add jobarray or queue lines to user-specified template
1905 # note that this is *only* when user did not specify a template.
1906 self._insert_options_in_script()
1907 self._insert_job_array_in_script()
1908 ns = {}
1909 # internally generated
1910 ns.update(self.context)
1911 # from user config
1912 ns.update(self.namespace)
1913 script_as_string = self.formatter.format(self.batch_template, **ns)
1914 self.log.debug(f'Writing batch script: {self.batch_file}\n{script_as_string}')
1915 with open(self.batch_file, 'w') as f:
1916 f.write(script_as_string)
1917 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1918
1919 def _insert_options_in_script(self):
1920 """Inserts a queue if required into the batch script."""
1921 inserts = []
1922 if self.queue and not self.queue_regexp.search(self.batch_template):
1923 self.log.debug(f"Adding queue={self.queue} to {self.batch_file}")
1924 inserts.append(self.queue_template)
1925
1926 if (
1927 self.output_file
1928 and self.output_template
1929 and not self.output_regexp.search(self.batch_template)
1930 ):
1931 self.log.debug(f"Adding output={self.output_file} to {self.batch_file}")
1932 inserts.append(self.output_template)
1933
1934 if inserts:
1935 firstline, rest = self.batch_template.split('\n', 1)
1936 self.batch_template = '\n'.join([firstline] + inserts + [rest])
1937
1938 def _insert_job_array_in_script(self):
1939 """Inserts a job array if required into the batch script."""
1940 if not self.job_array_regexp.search(self.batch_template):
1941 self.log.debug("adding job array settings to batch script")
1942 firstline, rest = self.batch_template.split('\n', 1)
1943 self.batch_template = '\n'.join([firstline, self.job_array_template, rest])
1944
1945 def start(self, n=1):
1946 """Start n copies of the process using a batch system."""
1947 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1948 # Here we save profile_dir in the context so they
1949 # can be used in the batch script template as {profile_dir}
1950 self.write_batch_script(n)
1951
1952 env = os.environ.copy()
1953 env.update(self.get_env())
1954 output = self._run_command(self.args, env=env)
1955
1956 job_id = self.parse_job_id(output)
1957 self.notify_start(job_id)
1958 return job_id
1959
1960 def stop(self):
1961 command = self.delete_command + [self.job_id]
1962 output = self._run_command(command)
1963
1964 self.notify_stop(
1965 dict(job_id=self.job_id, output=output)
1966 ) # Pass the output of the kill cmd
1967 return output
1968
1969 def signal(self, sig):
1970 command = self.signal_command + [str(sig), self.job_id]
1971 self._run_command(command)
1972
1973 # same local-file implementation as LocalProcess
1974 # should this be on the base class?
1975 _output = None
1976
1977 def get_output(self, remove=True):
1978 return LocalProcessLauncher.get_output(self, remove=remove)
1979
1980 def poll(self):
1981 """Poll not implemented
1982
1983 Need to use `squeue` and friends to check job status
1984 """
1985 return None
1986
1987
1988class BatchControllerLauncher(BatchSystemLauncher, ControllerLauncher):
1989 @default("program")
1990 def _default_program(self):
1991 return self.controller_cmd
1992
1993 @observe("controller_cmd")
1994 def _controller_cmd_changed(self, change):
1995 self.program = self._default_program()
1996
1997 @default("program_args")
1998 def _default_program_args(self):
1999 return self.cluster_args + self.controller_args
2000
2001 @observe("controller_args")
2002 def _controller_args_changed(self, change):
2003 self.program_args = self._default_program_args()
2004
2005 def start(self):
2006 return super().start(n=1)
2007
2008
2009class BatchEngineSetLauncher(BatchSystemLauncher, EngineLauncher):
2010 @default("program")
2011 def _default_program(self):
2012 return self.engine_cmd
2013
2014 @observe("engine_cmd")
2015 def _engine_cmd_changed(self, change):
2016 self.program = self._default_program()
2017
2018 @default("program_args")
2019 def _default_program_args(self):
2020 return self.cluster_args + self.engine_args
2021
2022 @observe("engine_args")
2023 def _engine_args_changed(self, change):
2024 self.program_args = self._default_program_args()
2025
2026
2027class PBSLauncher(BatchSystemLauncher):
2028 """A BatchSystemLauncher subclass for PBS."""
2029
2030 submit_command = List(['qsub'], config=True, help="The PBS submit command ['qsub']")
2031 delete_command = List(['qdel'], config=True, help="The PBS delete command ['qdel']")
2032 signal_command = List(
2033 ['qsig', '-s'], config=True, help="The PBS signal command ['qsig']"
2034 )
2035 job_id_regexp = CRegExp(
2036 r'\d+',
2037 config=True,
2038 help=r"Regular expresion for identifying the job ID [r'\d+']",
2039 )
2040
2041 batch_file = Unicode('')
2042 job_array_regexp = CRegExp(r'#PBS\W+-t\W+[\w\d\-\$]+')
2043 job_array_template = Unicode('#PBS -t 1-{n}')
2044 queue_regexp = CRegExp(r'#PBS\W+-q\W+\$?\w+')
2045 queue_template = Unicode('#PBS -q {queue}')
2046 output_regexp = CRegExp(r'#PBS\W+(?:-o)\W+\$?\w+')
2047 output_template = Unicode('#PBS -j oe\n#PBS -o {output_file}')
2048
2049
2050class PBSControllerLauncher(PBSLauncher, BatchControllerLauncher):
2051 """Launch a controller using PBS."""
2052
2053 batch_file_name = Unicode(
2054 'pbs_controller', config=True, help="batch file name for the controller job."
2055 )
2056 default_template = Unicode(
2057 """#!/bin/sh
2058#PBS -V
2059#PBS -N ipcontroller
2060{program_and_args}
2061"""
2062 )
2063
2064
2065class PBSEngineSetLauncher(PBSLauncher, BatchEngineSetLauncher):
2066 """Launch Engines using PBS"""
2067
2068 batch_file_name = Unicode(
2069 'pbs_engines', config=True, help="batch file name for the engine(s) job."
2070 )
2071 default_template = Unicode(
2072 """#!/bin/sh
2073#PBS -V
2074#PBS -N ipengine
2075{program_and_args}
2076"""
2077 )
2078
2079
2080# Slurm is very similar to PBS
2081
2082
2083class SlurmLauncher(BatchSystemLauncher):
2084 """A BatchSystemLauncher subclass for slurm."""
2085
2086 submit_command = List(
2087 ['sbatch'], config=True, help="The slurm submit command ['sbatch']"
2088 )
2089 delete_command = List(
2090 ['scancel'], config=True, help="The slurm delete command ['scancel']"
2091 )
2092 signal_command = List(
2093 ['scancel', '-s'],
2094 config=True,
2095 help="The slurm signal command ['scancel', '-s']",
2096 )
2097 job_id_regexp = CRegExp(
2098 r'\d+',
2099 config=True,
2100 help=r"Regular expresion for identifying the job ID [r'\d+']",
2101 )
2102
2103 account = Unicode("", config=True, help="Slurm account to be used")
2104
2105 qos = Unicode("", config=True, help="Slurm QoS to be used")
2106
2107 # Note: from the man page:
2108 #'Acceptable time formats include "minutes", "minutes:seconds",
2109 # "hours:minutes:seconds", "days-hours", "days-hours:minutes"
2110 # and "days-hours:minutes:seconds".
2111 timelimit = Any("", config=True, help="Slurm timelimit to be used")
2112
2113 options = Unicode("", config=True, help="Extra Slurm options")
2114
2115 @observe('account')
2116 def _account_changed(self, change):
2117 self._update_context(change)
2118
2119 @observe('qos')
2120 def _qos_changed(self, change):
2121 self._update_context(change)
2122
2123 @observe('timelimit')
2124 def _timelimit_changed(self, change):
2125 self._update_context(change)
2126
2127 @observe('options')
2128 def _options_changed(self, change):
2129 self._update_context(change)
2130
2131 batch_file = Unicode('')
2132
2133 job_array_regexp = CRegExp(r'#SBATCH\W+(?:--ntasks|-n)[\w\d\-\$]+')
2134 job_array_template = Unicode('''#SBATCH --ntasks={n}''')
2135
2136 queue_regexp = CRegExp(r'#SBATCH\W+(?:--partition|-p)\W+\$?\w+')
2137 queue_template = Unicode('#SBATCH --partition={queue}')
2138
2139 account_regexp = CRegExp(r'#SBATCH\W+(?:--account|-A)\W+\$?\w+')
2140 account_template = Unicode('#SBATCH --account={account}')
2141
2142 qos_regexp = CRegExp(r'#SBATCH\W+--qos\W+\$?\w+')
2143 qos_template = Unicode('#SBATCH --qos={qos}')
2144
2145 timelimit_regexp = CRegExp(r'#SBATCH\W+(?:--time|-t)\W+\$?\w+')
2146 timelimit_template = Unicode('#SBATCH --time={timelimit}')
2147
2148 output_regexp = CRegExp(r'#SBATCH\W+(?:--output)\W+\$?\w+')
2149 output_template = Unicode('#SBATCH --output={output_file}')
2150
2151 def _insert_options_in_script(self):
2152 """Insert 'partition' (slurm name for queue), 'account', 'time' and other options if necessary"""
2153 super()._insert_options_in_script()
2154 inserts = []
2155 if self.account and not self.account_regexp.search(self.batch_template):
2156 self.log.debug("adding slurm account settings to batch script")
2157 inserts.append(self.account_template)
2158
2159 if self.qos and not self.qos_regexp.search(self.batch_template):
2160 self.log.debug("adding Slurm qos settings to batch script")
2161 firstline, rest = self.batch_template.split('\n', 1)
2162 inserts.append(self.qos_template)
2163
2164 if self.timelimit and not self.timelimit_regexp.search(self.batch_template):
2165 self.log.debug("adding slurm time limit settings to batch script")
2166 inserts.append(self.timelimit_template)
2167
2168 if inserts:
2169 firstline, rest = self.batch_template.split('\n', 1)
2170 self.batch_template = '\n'.join([firstline] + inserts + [rest])
2171
2172
2173class SlurmControllerLauncher(SlurmLauncher, BatchControllerLauncher):
2174 """Launch a controller using Slurm."""
2175
2176 batch_file_name = Unicode(
2177 'slurm_controller.sbatch',
2178 config=True,
2179 help="batch file name for the controller job.",
2180 )
2181 default_template = Unicode(
2182 """#!/bin/sh
2183#SBATCH --export=ALL
2184#SBATCH --job-name=ipcontroller-{cluster_id}
2185#SBATCH --ntasks=1
2186{program_and_args}
2187"""
2188 )
2189
2190
2191class SlurmEngineSetLauncher(SlurmLauncher, BatchEngineSetLauncher):
2192 """Launch Engines using Slurm"""
2193
2194 batch_file_name = Unicode(
2195 'slurm_engine.sbatch',
2196 config=True,
2197 help="batch file name for the engine(s) job.",
2198 )
2199 default_template = Unicode(
2200 """#!/bin/sh
2201#SBATCH --export=ALL
2202#SBATCH --job-name=ipengine-{cluster_id}
2203srun {program_and_args}
2204"""
2205 )
2206
2207
2208# SGE is very similar to PBS
2209
2210
2211class SGELauncher(PBSLauncher):
2212 """Sun GridEngine is a PBS clone with slightly different syntax"""
2213
2214 job_array_regexp = CRegExp(r'#\$\W+\-t')
2215 job_array_template = Unicode('#$ -t 1-{n}')
2216 queue_regexp = CRegExp(r'#\$\W+-q\W+\$?\w+')
2217 queue_template = Unicode('#$ -q {queue}')
2218
2219
2220class SGEControllerLauncher(SGELauncher, BatchControllerLauncher):
2221 """Launch a controller using SGE."""
2222
2223 batch_file_name = Unicode(
2224 'sge_controller', config=True, help="batch file name for the ipontroller job."
2225 )
2226 default_template = Unicode(
2227 """#$ -V
2228#$ -S /bin/sh
2229#$ -N ipcontroller
2230{program_and_args}
2231"""
2232 )
2233
2234
2235class SGEEngineSetLauncher(SGELauncher, BatchEngineSetLauncher):
2236 """Launch Engines with SGE"""
2237
2238 batch_file_name = Unicode(
2239 'sge_engines', config=True, help="batch file name for the engine(s) job."
2240 )
2241 default_template = Unicode(
2242 """#$ -V
2243#$ -S /bin/sh
2244#$ -N ipengine
2245{program_and_args}
2246"""
2247 )
2248
2249
2250# LSF launchers
2251
2252
2253class LSFLauncher(BatchSystemLauncher):
2254 """A BatchSystemLauncher subclass for LSF."""
2255
2256 submit_command = List(['bsub'], config=True, help="The LSF submit command ['bsub']")
2257 delete_command = List(
2258 ['bkill'], config=True, help="The LSF delete command ['bkill']"
2259 )
2260 signal_command = List(
2261 ['bkill', '-s'], config=True, help="The LSF signal command ['bkill', '-s']"
2262 )
2263 job_id_regexp = CRegExp(
2264 r'\d+',
2265 config=True,
2266 help=r"Regular expresion for identifying the job ID [r'\d+']",
2267 )
2268
2269 batch_file = Unicode('')
2270 job_array_regexp = CRegExp(r'#BSUB\s+-J+\w+\[\d+-\d+\]')
2271 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
2272 queue_regexp = CRegExp(r'#BSUB\s+-q\s+\w+')
2273 queue_template = Unicode('#BSUB -q {queue}')
2274 output_regexp = CRegExp(r'#BSUB\s+-oo?\s+\w+')
2275 output_template = Unicode('#BSUB -o {output_file}\n#BSUB -e {output_file}\n')
2276
2277 def start(self, n=1):
2278 """Start n copies of the process using LSF batch system.
2279 This cant inherit from the base class because bsub expects
2280 to be piped a shell script in order to honor the #BSUB directives :
2281 bsub < script
2282 """
2283 # Here we save profile_dir in the context so they
2284 # can be used in the batch script template as {profile_dir}
2285 self.write_batch_script(n)
2286 piped_cmd = self.args[0] + '<"' + self.args[1] + '"'
2287 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
2288 p = Popen(piped_cmd, shell=True, env=os.environ, stdout=PIPE)
2289 output, err = p.communicate()
2290 output = output.decode("utf8", 'replace')
2291 job_id = self.parse_job_id(output)
2292 self.notify_start(job_id)
2293 return job_id
2294
2295
2296class LSFControllerLauncher(LSFLauncher, BatchControllerLauncher):
2297 """Launch a controller using LSF."""
2298
2299 batch_file_name = Unicode(
2300 'lsf_controller', config=True, help="batch file name for the controller job."
2301 )
2302 default_template = Unicode(
2303 """#!/bin/sh
2304 #BSUB -env all
2305 #BSUB -J ipcontroller-{cluster_id}
2306 {program_and_args}
2307 """
2308 )
2309
2310
2311class LSFEngineSetLauncher(LSFLauncher, BatchEngineSetLauncher):
2312 """Launch Engines using LSF"""
2313
2314 batch_file_name = Unicode(
2315 'lsf_engines', config=True, help="batch file name for the engine(s) job."
2316 )
2317 default_template = Unicode(
2318 """#!/bin/sh
2319 #BSUB -J ipengine-{cluster_id}
2320 #BSUB -env all
2321 {program_and_args}
2322 """
2323 )
2324
2325 def get_env(self):
2326 # write directly to output files
2327 # otherwise, will copy and clobber merged stdout/err
2328 env = {"LSB_STDOUT_DIRECT": "Y"}
2329 env.update(super().get_env())
2330 return env
2331
2332
2333class HTCondorLauncher(BatchSystemLauncher):
2334 """A BatchSystemLauncher subclass for HTCondor.
2335
2336 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
2337 that the python instance but otherwise is very similar to PBS. This is because
2338 HTCondor destroys sys.executable when launching remote processes - a launched
2339 python process depends on sys.executable to effectively evaluate its
2340 module search paths. Without it, regardless of which python interpreter you launch
2341 you will get the to built in module search paths.
2342
2343 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
2344 this - the mechanism of shebanged scripts means that the python binary will be
2345 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
2346 scripts on the remote node*. This means you need to take care that:
2347
2348 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
2349 of the python environment you wish to execute code in having top precedence.
2350 b. This functionality is untested on Windows.
2351
2352 If you need different behavior, consider making you own template.
2353 """
2354
2355 submit_command = List(
2356 ['condor_submit'],
2357 config=True,
2358 help="The HTCondor submit command ['condor_submit']",
2359 )
2360 delete_command = List(
2361 ['condor_rm'], config=True, help="The HTCondor delete command ['condor_rm']"
2362 )
2363 job_id_regexp = CRegExp(
2364 r'(\d+)\.$',
2365 config=True,
2366 help=r"Regular expression for identifying the job ID [r'(\d+)\.$']",
2367 )
2368 job_id_regexp_group = Integer(
2369 1, config=True, help="""The group we wish to match in job_id_regexp [1]"""
2370 )
2371
2372 job_array_regexp = CRegExp(r'queue\W+\$')
2373 job_array_template = Unicode('queue {n}')
2374
2375 def _insert_job_array_in_script(self):
2376 """Inserts a job array if required into the batch script."""
2377 if not self.job_array_regexp.search(self.batch_template):
2378 self.log.debug("adding job array settings to batch script")
2379 # HTCondor requires that the job array goes at the bottom of the script
2380 self.batch_template = '\n'.join(
2381 [self.batch_template, self.job_array_template]
2382 )
2383
2384 def _insert_options_in_script(self):
2385 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
2386 specified in the script.
2387 """
2388 super()._insert_options_in_script()
2389
2390
2391class HTCondorControllerLauncher(HTCondorLauncher, BatchControllerLauncher):
2392 """Launch a controller using HTCondor."""
2393
2394 batch_file_name = Unicode(
2395 'htcondor_controller',
2396 config=True,
2397 help="batch file name for the controller job.",
2398 )
2399 default_template = Unicode(
2400 r"""
2401universe = vanilla
2402executable = ipcontroller
2403# by default we expect a shared file system
2404transfer_executable = False
2405arguments = {program_args}
2406"""
2407 )
2408
2409
2410class HTCondorEngineSetLauncher(HTCondorLauncher, BatchEngineSetLauncher):
2411 """Launch Engines using HTCondor"""
2412
2413 batch_file_name = Unicode(
2414 'htcondor_engines', config=True, help="batch file name for the engine(s) job."
2415 )
2416 default_template = Unicode(
2417 """
2418universe = vanilla
2419executable = ipengine
2420# by default we expect a shared file system
2421transfer_executable = False
2422arguments = "{program_args}"
2423"""
2424 )
2425
2426
2427# -----------------------------------------------------------------------------
2428# Collections of launchers
2429# -----------------------------------------------------------------------------
2430
2431local_launchers = [
2432 LocalControllerLauncher,
2433 LocalEngineLauncher,
2434 LocalEngineSetLauncher,
2435]
2436mpi_launchers = [
2437 MPILauncher,
2438 MPIControllerLauncher,
2439 MPIEngineSetLauncher,
2440]
2441ssh_launchers = [
2442 SSHLauncher,
2443 SSHControllerLauncher,
2444 SSHEngineLauncher,
2445 SSHEngineSetLauncher,
2446 SSHProxyEngineSetLauncher,
2447]
2448winhpc_launchers = [
2449 WindowsHPCLauncher,
2450 WindowsHPCControllerLauncher,
2451 WindowsHPCEngineSetLauncher,
2452]
2453pbs_launchers = [
2454 PBSLauncher,
2455 PBSControllerLauncher,
2456 PBSEngineSetLauncher,
2457]
2458slurm_launchers = [
2459 SlurmLauncher,
2460 SlurmControllerLauncher,
2461 SlurmEngineSetLauncher,
2462]
2463sge_launchers = [
2464 SGELauncher,
2465 SGEControllerLauncher,
2466 SGEEngineSetLauncher,
2467]
2468lsf_launchers = [
2469 LSFLauncher,
2470 LSFControllerLauncher,
2471 LSFEngineSetLauncher,
2472]
2473htcondor_launchers = [
2474 HTCondorLauncher,
2475 HTCondorControllerLauncher,
2476 HTCondorEngineSetLauncher,
2477]
2478all_launchers = (
2479 local_launchers
2480 + mpi_launchers
2481 + ssh_launchers
2482 + winhpc_launchers
2483 + pbs_launchers
2484 + slurm_launchers
2485 + sge_launchers
2486 + lsf_launchers
2487 + htcondor_launchers
2488)
2489
2490
2491def find_launcher_class(name, kind):
2492 """Return a launcher class for a given name and kind.
2493
2494 Parameters
2495 ----------
2496 name : str
2497 The full name of the launcher class, either with or without the
2498 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, HTCondor
2499 Slurm, WindowsHPC).
2500 kind : str
2501 Either 'EngineSet' or 'Controller'.
2502 """
2503 if kind == 'engine':
2504 group_name = 'ipyparallel.engine_launchers'
2505 elif kind == 'controller':
2506 group_name = 'ipyparallel.controller_launchers'
2507 else:
2508 raise ValueError(f"kind must be 'engine' or 'controller', not {kind!r}")
2509 group = entry_points(group=group_name)
2510 # make it case-insensitive
2511 registry = {entrypoint.name.lower(): entrypoint for entrypoint in group}
2512 return registry[name.lower()].load()
2513
2514
2515@lru_cache
2516def abbreviate_launcher_class(cls):
2517 """Abbreviate a launcher class back to its entrypoint name"""
2518 cls_key = f"{cls.__module__}:{cls.__name__}"
2519 # allow entrypoint_name attribute in case the definition module
2520 # is not the same as the 'import' module
2521 if getattr(cls, 'entrypoint_name', None):
2522 return getattr(cls, 'entrypoint_name')
2523
2524 for kind in ('controller', 'engine'):
2525 group_name = f'ipyparallel.{kind}_launchers'
2526 group = entry_points(group=group_name)
2527 for entrypoint in group:
2528 if entrypoint.value == cls_key:
2529 return entrypoint.name.lower()
2530 return cls_key