Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipyparallel/cluster/launcher.py: 3%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1199 statements  

1"""Facilities for launching IPython Parallel processes asynchronously.""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5import asyncio 

6import copy 

7import inspect 

8import json 

9import logging 

10import os 

11import re 

12import shlex 

13import shutil 

14import signal 

15import stat 

16import sys 

17import threading 

18import time 

19from concurrent.futures import ThreadPoolExecutor 

20from functools import lru_cache, partial 

21from signal import SIGTERM 

22from subprocess import PIPE, STDOUT, Popen, check_output 

23from tempfile import TemporaryDirectory 

24from textwrap import indent 

25 

26import psutil 

27from IPython.utils.path import ensure_dir_exists, get_home_dir 

28from IPython.utils.text import EvalFormatter 

29from tornado import ioloop 

30from traitlets import ( 

31 Any, 

32 CRegExp, 

33 Dict, 

34 Float, 

35 Instance, 

36 Integer, 

37 List, 

38 Unicode, 

39 default, 

40 observe, 

41) 

42from traitlets.config.configurable import LoggingConfigurable 

43 

44from ..traitlets import entry_points 

45from ..util import _OutputProducingThread as Thread 

46from ..util import shlex_join 

47from ._winhpcjob import IPControllerJob, IPControllerTask, IPEngineSetJob, IPEngineTask 

48from .shellcmd import ShellCommandSend 

49 

50WINDOWS = os.name == 'nt' 

51 

52SIGKILL = getattr(signal, "SIGKILL", -1) 

53# ----------------------------------------------------------------------------- 

54# Paths to the kernel apps 

55# ----------------------------------------------------------------------------- 

56 

57ipcluster_cmd_argv = [sys.executable, "-m", "ipyparallel.cluster"] 

58 

59ipengine_cmd_argv = [sys.executable, "-m", "ipyparallel.engine"] 

60 

61ipcontroller_cmd_argv = [sys.executable, "-m", "ipyparallel.controller"] 

62 

63# ----------------------------------------------------------------------------- 

64# Base launchers and errors 

65# ----------------------------------------------------------------------------- 

66 

67 

68class LauncherError(Exception): 

69 pass 

70 

71 

72class ProcessStateError(LauncherError): 

73 pass 

74 

75 

76class UnknownStatus(LauncherError): 

77 pass 

78 

79 

80class NotRunning(LauncherError): 

81 """Raised when a launcher is no longer running""" 

82 

83 pass 

84 

85 

86class BaseLauncher(LoggingConfigurable): 

87 """An abstraction for starting, stopping and signaling a process.""" 

88 

89 stop_timeout = Integer( 

90 60, 

91 config=True, 

92 help="The number of seconds to wait for a process to exit before raising a TimeoutError in stop", 

93 ) 

94 

95 # In all of the launchers, the work_dir is where child processes will be 

96 # run. This will usually be the profile_dir, but may not be. any work_dir 

97 # passed into the __init__ method will override the config value. 

98 # This should not be used to set the work_dir for the actual engine 

99 # and controller. Instead, use their own config files or the 

100 # controller_args, engine_args attributes of the launchers to add 

101 # the work_dir option. 

102 work_dir = Unicode('.') 

103 

104 # used in various places for labeling. often 'ipengine' or 'ipcontroller' 

105 name = Unicode("process") 

106 

107 start_data = Any() 

108 stop_data = Any() 

109 

110 identifier = Unicode( 

111 help="Used for lookup in e.g. EngineSetLauncher during notify_stop and default log files" 

112 ) 

113 

114 @default("identifier") 

115 def _default_identifier(self): 

116 identifier = f"{self.name}" 

117 if self.cluster_id: 

118 identifier = f"{identifier}-{self.cluster_id}" 

119 if getattr(self, 'engine_set_id', None): 

120 identifier = f"{identifier}-{self.engine_set_id}" 

121 identifier = f"{identifier}-{os.getpid()}" 

122 return identifier 

123 

124 loop = Instance(ioloop.IOLoop, allow_none=True) 

125 

126 def _loop_default(self): 

127 return ioloop.IOLoop.current() 

128 

129 profile_dir = Unicode('').tag(to_dict=True) 

130 cluster_id = Unicode('').tag(to_dict=True) 

131 

132 state = Unicode("before").tag(to_dict=True) 

133 

134 stop_callbacks = List() 

135 

136 def to_dict(self): 

137 """Serialize a Launcher to a dict, for later restoration""" 

138 d = {} 

139 for attr in self.traits(to_dict=True): 

140 d[attr] = getattr(self, attr) 

141 

142 return d 

143 

144 @classmethod 

145 def from_dict(cls, d, *, config=None, parent=None, **kwargs): 

146 """Restore a Launcher from a dict 

147 

148 Subclasses should always call `launcher = super().from_dict(*args, **kwargs)` 

149 and finish initialization after that. 

150 

151 After calling from_dict(), 

152 the launcher should be in the same state as after `.start()` 

153 (i.e. monitoring for exit, etc.) 

154 

155 Returns: Launcher 

156 The instantiated and fully configured Launcher. 

157 

158 Raises: NotRunning 

159 e.g. if the process has stopped and is no longer running. 

160 """ 

161 launcher = cls(config=config, parent=parent, **kwargs) 

162 for attr in launcher.traits(to_dict=True): 

163 if attr in d: 

164 setattr(launcher, attr, d[attr]) 

165 return launcher 

166 

167 @property 

168 def cluster_args(self): 

169 """Common cluster arguments""" 

170 return [] 

171 

172 @property 

173 def connection_files(self): 

174 """Dict of connection file paths""" 

175 security_dir = os.path.join(self.profile_dir, 'security') 

176 name_prefix = "ipcontroller" 

177 if self.cluster_id: 

178 name_prefix = f"{name_prefix}-{self.cluster_id}" 

179 return { 

180 kind: os.path.join(security_dir, f"{name_prefix}-{kind}.json") 

181 for kind in ("client", "engine") 

182 } 

183 

184 @property 

185 def args(self): 

186 """A list of cmd and args that will be used to start the process. 

187 

188 This is what is passed to :func:`spawnProcess` and the first element 

189 will be the process name. 

190 """ 

191 return self.find_args() 

192 

193 def find_args(self): 

194 """The ``.args`` property calls this to find the args list. 

195 

196 Subcommand should implement this to construct the cmd and args. 

197 """ 

198 raise NotImplementedError('find_args must be implemented in a subclass') 

199 

200 @property 

201 def arg_str(self): 

202 """The string form of the program arguments.""" 

203 return ' '.join(self.args) 

204 

205 @property 

206 def cluster_env(self): 

207 """Cluster-related env variables""" 

208 return { 

209 "IPP_CLUSTER_ID": self.cluster_id, 

210 "IPP_PROFILE_DIR": self.profile_dir, 

211 } 

212 

213 environment = Dict( 

214 help="""Set environment variables for the launched process 

215 

216 .. versionadded:: 8.0 

217 """, 

218 config=True, 

219 ) 

220 

221 def get_env(self): 

222 """Get the full environment for the process 

223 

224 merges different sources for environment variables 

225 """ 

226 env = {} 

227 env.update(self.cluster_env) 

228 env.update(self.environment) 

229 return env 

230 

231 @property 

232 def running(self): 

233 """Am I running.""" 

234 if self.state == 'running': 

235 return True 

236 else: 

237 return False 

238 

239 async def start(self): 

240 """Start the process. 

241 

242 Should be an `async def` coroutine. 

243 

244 When start completes, 

245 the process should be requested (it need not be running yet), 

246 and waiting should begin in the background such that :meth:`.notify_stop` 

247 will be called when the process finishes. 

248 """ 

249 raise NotImplementedError('start must be implemented in a subclass') 

250 

251 async def stop(self): 

252 """Stop the process and notify observers of stopping. 

253 

254 This method should be an `async def` coroutine, 

255 and return only after the process has stopped. 

256 

257 All resources should be cleaned up by the time this returns. 

258 """ 

259 raise NotImplementedError('stop must be implemented in a subclass') 

260 

261 def on_stop(self, f): 

262 """Register a callback to be called with this Launcher's stop_data 

263 when the process actually finishes. 

264 """ 

265 if self.state == 'after': 

266 return f(self.stop_data) 

267 else: 

268 self.stop_callbacks.append(f) 

269 

270 def notify_start(self, data): 

271 """Call this to trigger startup actions. 

272 

273 This logs the process startup and sets the state to 'running'. It is 

274 a pass-through so it can be used as a callback. 

275 """ 

276 

277 self.log.debug(f"{self.__class__.__name__} {self.args[0]} started: {data}") 

278 self.start_data = data 

279 self.state = 'running' 

280 return data 

281 

282 def notify_stop(self, data): 

283 """Call this to trigger process stop actions. 

284 

285 This logs the process stopping and sets the state to 'after'. Call 

286 this to trigger callbacks registered via :meth:`on_stop`.""" 

287 if self.state == 'after': 

288 self.log.debug("Already notified stop (data)") 

289 return data 

290 self.log.debug(f"{self.__class__.__name__} {self.args[0]} stopped: {data}") 

291 

292 self.stop_data = data 

293 self.state = 'after' 

294 self._log_output(data) 

295 for f in self.stop_callbacks: 

296 f(data) 

297 return data 

298 

299 def signal(self, sig): 

300 """Signal the process. 

301 

302 Parameters 

303 ---------- 

304 sig : str or int 

305 'KILL', 'INT', etc., or any signal number 

306 """ 

307 raise NotImplementedError('signal must be implemented in a subclass') 

308 

309 async def join(self, timeout=None): 

310 """Wait for the process to finish""" 

311 raise NotImplementedError('join must be implemented in a subclass') 

312 

313 output_limit = Integer( 

314 100, 

315 config=True, 

316 help=""" 

317 When a process exits, display up to this many lines of output 

318 """, 

319 ) 

320 

321 def get_output(self, remove=False): 

322 """Retrieve the output form the Launcher. 

323 

324 If remove: remove the file, if any, where it was being stored. 

325 """ 

326 # override in subclasses to retrieve output 

327 return "" 

328 

329 def _log_output(self, stop_data=None): 

330 output = self.get_output(remove=True) 

331 if self.output_limit: 

332 output = "".join(output.splitlines(True)[-self.output_limit :]) 

333 

334 log = self.log.debug 

335 if stop_data and stop_data.get("exit_code", 0) != 0: 

336 log = self.log.warning 

337 if output: 

338 log("Output for %s:\n%s", self.identifier, output) 

339 

340 

341class ControllerLauncher(BaseLauncher): 

342 """Base class for launching ipcontroller""" 

343 

344 name = Unicode("ipcontroller") 

345 

346 controller_cmd = List( 

347 list(ipcontroller_cmd_argv), 

348 config=True, 

349 help="""Popen command to launch ipcontroller.""", 

350 ) 

351 # Command line arguments to ipcontroller. 

352 controller_args = List( 

353 Unicode(), 

354 config=True, 

355 help="""command-line args to pass to ipcontroller""", 

356 ) 

357 

358 connection_info_timeout = Float( 

359 60, 

360 config=True, 

361 help=""" 

362 Default timeout (in seconds) for get_connection_info 

363  

364 .. versionadded:: 8.7 

365 """, 

366 ) 

367 

368 async def get_connection_info(self, timeout=None): 

369 """Retrieve connection info for the controller 

370 

371 Default implementation assumes profile_dir and cluster_id are local. 

372 

373 .. versionchanged:: 8.7 

374 Accept `timeout=None` (default) to use `.connection_info_timeout` config. 

375 """ 

376 if timeout is None: 

377 timeout = self.connection_info_timeout 

378 connection_files = self.connection_files 

379 paths = list(connection_files.values()) 

380 start_time = time.monotonic() 

381 if timeout >= 0: 

382 deadline = start_time + timeout 

383 else: 

384 deadline = None 

385 

386 if not all(os.path.exists(f) for f in paths): 

387 self.log.debug(f"Waiting for {paths}") 

388 while not all(os.path.exists(f) for f in paths): 

389 if deadline is not None and time.monotonic() > deadline: 

390 missing_files = [f for f in paths if not os.path.exists(f)] 

391 raise TimeoutError( 

392 f"Connection files {missing_files} did not arrive in {timeout}s" 

393 ) 

394 await asyncio.sleep(0.1) 

395 status = self.poll() 

396 if inspect.isawaitable(status): 

397 status = await status 

398 if status is not None: 

399 raise RuntimeError( 

400 f"Controller stopped with {status} while waiting for {paths}" 

401 ) 

402 self.log.debug(f"Loading {paths}") 

403 connection_info = {} 

404 for key, path in connection_files.items(): 

405 try: 

406 with open(path) as f: 

407 connection_info[key] = json.load(f) 

408 except ValueError: 

409 # possible race while controller is still writing the file 

410 # give it half a second before trying again 

411 time.sleep(0.5) 

412 with open(path) as f: 

413 connection_info[key] = json.load(f) 

414 

415 return connection_info 

416 

417 

418class EngineLauncher(BaseLauncher): 

419 """Base class for launching one engine""" 

420 

421 name = Unicode("ipengine") 

422 

423 engine_cmd = List( 

424 ipengine_cmd_argv, config=True, help="""command to launch the Engine.""" 

425 ) 

426 # Command line arguments for ipengine. 

427 engine_args = List( 

428 Unicode(), 

429 config=True, 

430 help="command-line arguments to pass to ipengine", 

431 ) 

432 

433 n = Integer(1).tag(to_dict=True) 

434 

435 engine_set_id = Unicode() 

436 

437 

438# ----------------------------------------------------------------------------- 

439# Local process launchers 

440# ----------------------------------------------------------------------------- 

441 

442 

443class LocalProcessLauncher(BaseLauncher): 

444 """Start and stop an external process in an asynchronous manner. 

445 

446 This will launch the external process with a working directory of 

447 ``self.work_dir``. 

448 """ 

449 

450 # This is used to to construct self.args, which is passed to 

451 # spawnProcess. 

452 cmd_and_args = List(Unicode()) 

453 

454 poll_seconds = Integer( 

455 30, 

456 config=True, 

457 help="""Interval on which to poll processes (. 

458 

459 Note: process exit should be noticed immediately, 

460 due to use of Process.wait(), 

461 but this interval should ensure we aren't leaving threads running forever, 

462 as other signals/events are checked on this interval 

463 """, 

464 ) 

465 

466 pid = Integer(-1).tag(to_dict=True) 

467 

468 output_file = Unicode().tag(to_dict=True) 

469 

470 @default("output_file") 

471 def _default_output_file(self): 

472 log_dir = os.path.join(self.profile_dir, "log") 

473 os.makedirs(log_dir, exist_ok=True) 

474 return os.path.join(log_dir, f'{self.identifier}.log') 

475 

476 stop_seconds_until_kill = Integer( 

477 5, 

478 config=True, 

479 help="""The number of seconds to wait for a process to exit after sending SIGTERM before sending SIGKILL""", 

480 ) 

481 

482 stdout = None 

483 stderr = None 

484 process = None 

485 _wait_thread = None 

486 _popen_process = None 

487 

488 def find_args(self): 

489 return self.cmd_and_args 

490 

491 @classmethod 

492 def from_dict(cls, d, **kwargs): 

493 self = super().from_dict(d, **kwargs) 

494 self._reconstruct_process(d) 

495 return self 

496 

497 def _reconstruct_process(self, d): 

498 """Reconstruct our process""" 

499 if 'pid' in d and d['pid'] > 0: 

500 try: 

501 self.process = psutil.Process(d['pid']) 

502 except psutil.NoSuchProcess as e: 

503 raise NotRunning(f"Process {d['pid']}") 

504 self._start_waiting() 

505 

506 def _wait(self): 

507 """Background thread waiting for a process to exit""" 

508 exit_code = None 

509 while not self._stop_waiting.is_set() and self.state == 'running': 

510 try: 

511 # use a timeout so we can check the _stop_waiting event 

512 exit_code = self.process.wait(timeout=self.poll_seconds) 

513 except psutil.TimeoutExpired: 

514 continue 

515 else: 

516 break 

517 stop_data = dict(exit_code=exit_code, pid=self.pid, identifier=self.identifier) 

518 self.loop.add_callback(lambda: self.notify_stop(stop_data)) 

519 if self._popen_process: 

520 # wait avoids ResourceWarning if the process has exited 

521 self._popen_process.wait(0) 

522 

523 def _start_waiting(self): 

524 """Start background thread waiting on the process to exit""" 

525 # ensure self.loop is accessed on the main thread before waiting 

526 self.loop 

527 self._stop_waiting = threading.Event() 

528 self._wait_thread = Thread( 

529 target=self._wait, daemon=True, name=f"wait(pid={self.pid})" 

530 ) 

531 self._wait_thread.start() 

532 

533 def start(self): 

534 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args) 

535 if self.state != 'before': 

536 raise ProcessStateError( 

537 'The process was already started and has state: {self.state}' 

538 ) 

539 self.log.debug(f"Sending output for {self.identifier} to {self.output_file}") 

540 

541 env = os.environ.copy() 

542 env.update(self.get_env()) 

543 self.log.debug(f"Setting environment: {','.join(self.get_env())}") 

544 

545 with open(self.output_file, "ab") as f, open(os.devnull, "rb") as stdin: 

546 proc = self._popen_process = Popen( 

547 self.args, 

548 stdout=f.fileno(), 

549 stderr=STDOUT, 

550 stdin=stdin, 

551 env=env, 

552 cwd=self.work_dir, 

553 start_new_session=True, # don't forward signals 

554 ) 

555 self.pid = proc.pid 

556 # use psutil API for self.process 

557 self.process = psutil.Process(proc.pid) 

558 

559 self.notify_start(self.process.pid) 

560 self._start_waiting() 

561 if 1 <= self.log.getEffectiveLevel() <= logging.DEBUG: 

562 self._start_streaming() 

563 

564 async def join(self, timeout=None): 

565 """Wait for the process to exit""" 

566 if self._wait_thread is not None: 

567 self._wait_thread.join(timeout=timeout) 

568 if self._wait_thread.is_alive(): 

569 raise TimeoutError( 

570 f"Process {self.process.pid} did not exit in {timeout} seconds." 

571 ) 

572 

573 def _stream_file(self, path): 

574 """Stream one file""" 

575 with open(path) as f: 

576 while self.state == 'running' and not self._stop_waiting.is_set(): 

577 line = f.readline() 

578 # log prefix? 

579 # or stream directly to sys.stderr 

580 if line: 

581 sys.stderr.write(line) 

582 else: 

583 # pause while we are at the end of the file 

584 time.sleep(0.1) 

585 

586 def _start_streaming(self): 

587 self._stream_thread = t = Thread( 

588 target=partial(self._stream_file, self.output_file), 

589 name=f"Stream Output {self.identifier}", 

590 daemon=True, 

591 ) 

592 t.start() 

593 

594 _output = None 

595 

596 def get_output(self, remove=False): 

597 if self._output is None: 

598 if self.output_file: 

599 try: 

600 with open(self.output_file) as f: 

601 self._output = f.read() 

602 except FileNotFoundError: 

603 self.log.debug(f"Missing output file: {self.output_file}") 

604 self._output = "" 

605 else: 

606 self._output = "" 

607 

608 if remove and os.path.isfile(self.output_file): 

609 self.log.debug(f"Removing {self.output_file}") 

610 try: 

611 os.remove(self.output_file) 

612 except Exception as e: 

613 # don't crash on failure to remove a file, 

614 # e.g. due to another processing having it open 

615 self.log.error(f"Failed to remove {self.output_file}: {e}") 

616 

617 return self._output 

618 

619 async def stop(self): 

620 try: 

621 self.signal(SIGTERM) 

622 except Exception as e: 

623 self.log.debug(f"TERM failed: {e!r}") 

624 

625 try: 

626 await self.join(timeout=self.stop_seconds_until_kill) 

627 except TimeoutError: 

628 self.log.warning( 

629 f"Process {self.pid} did not exit in {self.stop_seconds_until_kill} seconds after TERM" 

630 ) 

631 else: 

632 return 

633 

634 try: 

635 self.signal(SIGKILL) 

636 except Exception as e: 

637 self.log.debug(f"KILL failed: {e!r}") 

638 

639 await self.join(timeout=self.stop_timeout) 

640 

641 def signal(self, sig): 

642 if self.state == 'running': 

643 if WINDOWS and sig in {SIGTERM, SIGKILL}: 

644 # use Windows tree-kill for better child cleanup 

645 cmd = ['taskkill', '/pid', str(self.process.pid), '/t', '/F'] 

646 check_output(cmd) 

647 else: 

648 self.process.send_signal(sig) 

649 

650 # callbacks, etc: 

651 

652 def handle_stdout(self, fd, events): 

653 if WINDOWS: 

654 line = self.stdout.recv().decode('utf8', 'replace') 

655 else: 

656 line = self.stdout.readline().decode('utf8', 'replace') 

657 # a stopped process will be readable but return empty strings 

658 if line: 

659 self.log.debug(line.rstrip()) 

660 

661 def handle_stderr(self, fd, events): 

662 if WINDOWS: 

663 line = self.stderr.recv().decode('utf8', 'replace') 

664 else: 

665 line = self.stderr.readline().decode('utf8', 'replace') 

666 # a stopped process will be readable but return empty strings 

667 if line: 

668 self.log.debug(line.rstrip()) 

669 else: 

670 self.poll() 

671 

672 def poll(self): 

673 if self.process.is_running(): 

674 return None 

675 

676 status = self.process.wait(0) 

677 if status is None: 

678 # return code cannot always be retrieved. 

679 # but we need to not return None if it's still running 

680 status = 'unknown' 

681 self.notify_stop( 

682 dict(exit_code=status, pid=self.process.pid, identifier=self.identifier) 

683 ) 

684 return status 

685 

686 

687class LocalControllerLauncher(LocalProcessLauncher, ControllerLauncher): 

688 """Launch a controller as a regular external process.""" 

689 

690 def find_args(self): 

691 return self.controller_cmd + self.cluster_args + self.controller_args 

692 

693 def start(self): 

694 """Start the controller by profile_dir.""" 

695 return super().start() 

696 

697 

698class LocalEngineLauncher(LocalProcessLauncher, EngineLauncher): 

699 """Launch a single engine as a regular external process.""" 

700 

701 def find_args(self): 

702 return self.engine_cmd + self.cluster_args + self.engine_args 

703 

704 

705class LocalEngineSetLauncher(LocalEngineLauncher): 

706 """Launch a set of engines as regular external processes.""" 

707 

708 delay = Float( 

709 0.1, 

710 config=True, 

711 help="""delay (in seconds) between starting each engine after the first. 

712 This can help force the engines to get their ids in order, or limit 

713 process flood when starting many engines.""", 

714 ) 

715 

716 # launcher class 

717 launcher_class = LocalEngineLauncher 

718 

719 launchers = Dict() 

720 stop_data = Dict() 

721 outputs = Dict() 

722 output_file = "" # no output file for me 

723 

724 def __init__(self, work_dir='.', config=None, **kwargs): 

725 super().__init__(work_dir=work_dir, config=config, **kwargs) 

726 

727 def to_dict(self): 

728 d = super().to_dict() 

729 d['engines'] = {i: launcher.to_dict() for i, launcher in self.launchers.items()} 

730 return d 

731 

732 @classmethod 

733 def from_dict(cls, d, **kwargs): 

734 self = super().from_dict(d, **kwargs) 

735 n = 0 

736 for i, engine_dict in d['engines'].items(): 

737 try: 

738 self.launchers[i] = el = self.launcher_class.from_dict( 

739 engine_dict, identifier=i, parent=self 

740 ) 

741 except NotRunning as e: 

742 self.log.error(f"Engine {i} not running: {e}") 

743 else: 

744 n += 1 

745 el.on_stop(self._notice_engine_stopped) 

746 if n == 0: 

747 raise NotRunning("No engines left") 

748 else: 

749 self.n = n 

750 return self 

751 

752 def start(self, n): 

753 """Start n engines by profile or profile_dir.""" 

754 self.n = n 

755 dlist = [] 

756 for i in range(n): 

757 identifier = str(i) 

758 if i > 0: 

759 time.sleep(self.delay) 

760 el = self.launchers[identifier] = self.launcher_class( 

761 work_dir=self.work_dir, 

762 parent=self, 

763 log=self.log, 

764 profile_dir=self.profile_dir, 

765 cluster_id=self.cluster_id, 

766 environment=self.environment, 

767 identifier=identifier, 

768 output_file=os.path.join( 

769 self.profile_dir, 

770 "log", 

771 f"ipengine-{self.cluster_id}-{self.engine_set_id}-{i}.log", 

772 ), 

773 ) 

774 

775 # Copy the engine args over to each engine launcher. 

776 el.engine_cmd = copy.deepcopy(self.engine_cmd) 

777 el.engine_args = copy.deepcopy(self.engine_args) 

778 el.on_stop(self._notice_engine_stopped) 

779 d = el.start() 

780 dlist.append(d) 

781 self.notify_start(dlist) 

782 return dlist 

783 

784 def find_args(self): 

785 return ['engine set'] 

786 

787 def signal(self, sig): 

788 for el in list(self.launchers.values()): 

789 el.signal(sig) 

790 

791 async def stop(self): 

792 futures = [] 

793 for el in list(self.launchers.values()): 

794 f = el.stop() 

795 if inspect.isawaitable(f): 

796 futures.append(asyncio.ensure_future(f)) 

797 

798 if futures: 

799 await asyncio.gather(*futures) 

800 

801 def _notice_engine_stopped(self, data): 

802 identifier = data['identifier'] 

803 launcher = self.launchers.pop(identifier) 

804 engines = self.stop_data.setdefault("engines", {}) 

805 if launcher is not None: 

806 self.outputs[identifier] = launcher.get_output() 

807 engines[identifier] = data 

808 if not self.launchers: 

809 # get exit code from engine exit codes 

810 # set error code if any engine has an error 

811 self.stop_data["exit_code"] = None 

812 for engine in engines.values(): 

813 if 'exit_code' in engine: 

814 if self.stop_data['exit_code'] is None: 

815 self.stop_data['exit_code'] = engine['exit_code'] 

816 if engine['exit_code']: 

817 # save the first nonzero exit code 

818 self.stop_data['exit_code'] = engine['exit_code'] 

819 break 

820 

821 self.notify_stop(self.stop_data) 

822 

823 def _log_output(self, stop_data=None): 

824 # avoid double-logging output, already logged by each engine 

825 # that will be a lot if all 100 engines fail! 

826 pass 

827 

828 def get_output(self, remove=False): 

829 """Get the output of all my child Launchers""" 

830 for identifier, launcher in self.launchers.items(): 

831 # remaining launchers 

832 self.outputs[identifier] = launcher.get_output(remove=remove) 

833 

834 joined_output = [] 

835 for identifier, engine_output in self.outputs.items(): 

836 if engine_output: 

837 joined_output.append(f"Output for engine {identifier}") 

838 if self.output_limit: 

839 engine_output = "".join( 

840 engine_output.splitlines(True)[-self.output_limit :] 

841 ) 

842 joined_output.append(indent(engine_output, ' ')) 

843 return '\n'.join(joined_output) 

844 

845 

846# ----------------------------------------------------------------------------- 

847# MPI launchers 

848# ----------------------------------------------------------------------------- 

849 

850 

851class MPILauncher(LocalProcessLauncher): 

852 """Launch an external process using mpiexec.""" 

853 

854 mpi_cmd = List( 

855 ['mpiexec'], 

856 config=True, 

857 help="The mpiexec command to use in starting the process.", 

858 ) 

859 mpi_args = List( 

860 [], config=True, help="The command line arguments to pass to mpiexec." 

861 ) 

862 program = List(['date'], help="The program to start via mpiexec.") 

863 program_args = List([], help="The command line argument to the program.") 

864 

865 def __init__(self, *args, **kwargs): 

866 # deprecation for old MPIExec names: 

867 config = kwargs.get('config') or {} 

868 for oldname in ( 

869 'MPIExecLauncher', 

870 'MPIExecControllerLauncher', 

871 'MPIExecEngineSetLauncher', 

872 ): 

873 deprecated = config.get(oldname) 

874 if deprecated: 

875 newname = oldname.replace('MPIExec', 'MPI') 

876 config[newname].update(deprecated) 

877 self.log.warning( 

878 "WARNING: %s name has been deprecated, use %s", oldname, newname 

879 ) 

880 

881 super().__init__(*args, **kwargs) 

882 

883 def find_args(self): 

884 """Build self.args using all the fields.""" 

885 return ( 

886 self.mpi_cmd 

887 + ['-n', str(self.n)] 

888 + self.mpi_args 

889 + self.program 

890 + self.program_args 

891 ) 

892 

893 def start(self, n=1): 

894 """Start n instances of the program using mpiexec.""" 

895 self.n = n 

896 return super().start() 

897 

898 def _log_output(self, stop_data): 

899 """Try to log mpiexec error output, if any, at warning level""" 

900 super()._log_output(stop_data) 

901 

902 if stop_data and self.stop_data.get("exit_code", 0) != 0: 

903 # if this is True, super()._log_output would have already logged the full output 

904 # no need to extract from MPI 

905 return 

906 

907 output = self.get_output(remove=False) 

908 mpiexec_lines = [] 

909 

910 in_mpi = False 

911 after_mpi = False 

912 mpi_tail = 0 

913 for line in output.splitlines(True): 

914 if line.startswith("======="): 

915 # mpich output looks like one block, 

916 # with a few lines trailing after 

917 # ========= 

918 # = message 

919 # = 

920 # ========= 

921 # YOUR APPLICATION TERMINATED WITH... 

922 if in_mpi: 

923 after_mpi = True 

924 mpi_tail = 2 

925 in_mpi = False 

926 else: 

927 in_mpi = True 

928 elif not in_mpi and line.startswith("-----"): 

929 # openmpi has less clear boundaries; 

930 # potentially several blocks that start and end with `----` 

931 # and error messages can show up after one or more blocks 

932 # once we see one of these lines, capture everything after it 

933 # toggle on each such line 

934 if not in_mpi: 

935 in_mpi = True 

936 # this would let us only capture messages inside blocks 

937 # but doing so would exclude most useful error output 

938 # else: 

939 # # show the trailing delimiter line 

940 # mpiexec_lines.append(line) 

941 # in_mpi = False 

942 # continue 

943 

944 if in_mpi: 

945 mpiexec_lines.append(line) 

946 elif after_mpi: 

947 if mpi_tail <= 0: 

948 break 

949 else: 

950 mpi_tail -= 1 

951 mpiexec_lines.append(line) 

952 

953 if mpiexec_lines: 

954 self.log.warning("mpiexec error output:\n" + "".join(mpiexec_lines)) 

955 

956 

957class MPIControllerLauncher(MPILauncher, ControllerLauncher): 

958 """Launch a controller using mpiexec.""" 

959 

960 # alias back to *non-configurable* program[_args] for use in find_args() 

961 # this way all Controller/EngineSetLaunchers have the same form, rather 

962 # than *some* having `program_args` and others `controller_args` 

963 @property 

964 def program(self): 

965 return self.controller_cmd 

966 

967 @property 

968 def program_args(self): 

969 return self.cluster_args + self.controller_args 

970 

971 

972class MPIEngineSetLauncher(MPILauncher, EngineLauncher): 

973 """Launch engines using mpiexec""" 

974 

975 # alias back to *non-configurable* program[_args] for use in find_args() 

976 # this way all Controller/EngineSetLaunchers have the same form, rather 

977 # than *some* having `program_args` and others `controller_args` 

978 @property 

979 def program(self): 

980 return self.engine_cmd + ['--mpi'] 

981 

982 @property 

983 def program_args(self): 

984 return self.cluster_args + self.engine_args 

985 

986 def start(self, n): 

987 """Start n engines by profile or profile_dir.""" 

988 self.n = n 

989 return super().start(n) 

990 

991 

992# deprecated MPIExec names 

993class DeprecatedMPILauncher: 

994 def warn(self): 

995 oldname = self.__class__.__name__ 

996 newname = oldname.replace('MPIExec', 'MPI') 

997 self.log.warning("WARNING: %s name is deprecated, use %s", oldname, newname) 

998 

999 

1000class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher): 

1001 """Deprecated, use MPILauncher""" 

1002 

1003 def __init__(self, *args, **kwargs): 

1004 super().__init__(*args, **kwargs) 

1005 self.warn() 

1006 

1007 

1008class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher): 

1009 """Deprecated, use MPIControllerLauncher""" 

1010 

1011 def __init__(self, *args, **kwargs): 

1012 super().__init__(*args, **kwargs) 

1013 self.warn() 

1014 

1015 

1016class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher): 

1017 """Deprecated, use MPIEngineSetLauncher""" 

1018 

1019 def __init__(self, *args, **kwargs): 

1020 super().__init__(*args, **kwargs) 

1021 self.warn() 

1022 

1023 

1024# ----------------------------------------------------------------------------- 

1025# SSH launchers 

1026# ----------------------------------------------------------------------------- 

1027 

1028ssh_output_pattern = re.compile(r"__([a-z][a-z0-9_]+)=([a-z0-9\-\.]+)__", re.IGNORECASE) 

1029 

1030 

1031def _ssh_outputs(out): 

1032 """Extract ssh output variables from process output""" 

1033 return dict(ssh_output_pattern.findall(out)) 

1034 

1035 

1036def sshx(ssh_cmd, cmd, env, remote_output_file, log=None): 

1037 """Launch a remote process, returning its remote pid 

1038 

1039 Uses nohup and pipes to put it in the background 

1040 """ 

1041 remote_cmd = shlex_join(cmd) 

1042 

1043 nohup_start = f"nohup {remote_cmd} > {remote_output_file} 2>&1 </dev/null & echo __remote_pid=$!__" 

1044 full_cmd = ssh_cmd + ["--", "sh -"] 

1045 

1046 input_script = "\n".join( 

1047 [ 

1048 "set -eu", 

1049 ] 

1050 + [f"export {key}={shlex.quote(value)}" for key, value in env.items()] 

1051 + ["", f"exec {nohup_start}"] 

1052 ) 

1053 if log: 

1054 log.info(f"Running `{remote_cmd}`") 

1055 log.debug("Running script via ssh:\n%s", input_script) 

1056 out = check_output(full_cmd, input=input_script.encode("utf8")).decode( 

1057 "utf8", "replace" 

1058 ) 

1059 values = _ssh_outputs(out) 

1060 if 'remote_pid' in values: 

1061 return int(values['remote_pid']) 

1062 else: 

1063 raise RuntimeError("Failed to get pid for {full_cmd}: {out}") 

1064 

1065 

1066def ssh_waitpid(pid, timeout=None): 

1067 """To be called on a remote host, waiting on a pid""" 

1068 try: 

1069 p = psutil.Process(pid) 

1070 exit_code = p.wait(timeout) 

1071 except psutil.NoSuchProcess: 

1072 print("__process_running=0__") 

1073 print("__exit_code=-1__") 

1074 except psutil.TimeoutExpired: 

1075 print("__process_running=1__") 

1076 else: 

1077 print("__process_running=0__") 

1078 print("__exit_code=-1__") 

1079 

1080 

1081class SSHLauncher(LocalProcessLauncher): 

1082 """A minimal launcher for ssh. 

1083 

1084 To be useful this will probably have to be extended to use the ``sshx`` 

1085 idea for environment variables. There could be other things this needs 

1086 as well. 

1087 """ 

1088 

1089 ssh_cmd = List(['ssh'], config=True, help="command for starting ssh").tag( 

1090 to_dict=True 

1091 ) 

1092 ssh_args = List([], config=True, help="args to pass to ssh").tag(to_dict=True) 

1093 scp_cmd = List(['scp'], config=True, help="command for sending files").tag( 

1094 to_dict=True 

1095 ) 

1096 scp_args = List([], config=True, help="args to pass to scp").tag(to_dict=True) 

1097 program = List([], help="Program to launch via ssh") 

1098 program_args = List([], help="args to pass to remote program") 

1099 hostname = Unicode( 

1100 '', config=True, help="hostname on which to launch the program" 

1101 ).tag(to_dict=True) 

1102 user = Unicode('', config=True, help="username for ssh").tag(to_dict=True) 

1103 location = Unicode( 

1104 '', config=True, help="user@hostname location for ssh in one setting" 

1105 ) 

1106 to_fetch = List( 

1107 [], config=True, help="List of (remote, local) files to fetch after starting" 

1108 ) 

1109 to_send = List( 

1110 [], config=True, help="List of (local, remote) files to send before starting" 

1111 ) 

1112 

1113 @default("poll_seconds") 

1114 def _default_poll_seconds(self): 

1115 # slower poll for ssh 

1116 return 60 

1117 

1118 @observe('hostname') 

1119 def _hostname_changed(self, change): 

1120 if self.user: 

1121 self.location = '{}@{}'.format(self.user, change['new']) 

1122 else: 

1123 self.location = change['new'] 

1124 

1125 @observe('user') 

1126 def _user_changed(self, change): 

1127 self.location = '{}@{}'.format(change['new'], self.hostname) 

1128 

1129 def find_args(self): 

1130 # not really used except in logging 

1131 return list(self.ssh_cmd) 

1132 

1133 remote_output_file = Unicode( 

1134 help="""The remote file to store output""", 

1135 ).tag(to_dict=True) 

1136 

1137 @default("remote_output_file") 

1138 def _default_remote_output_file(self): 

1139 full_program = ' '.join(self.program) 

1140 if 'engine' in full_program: 

1141 name = 'ipengine' 

1142 elif 'controller' in full_program: 

1143 name = 'ipcontroller' 

1144 else: 

1145 name = self.program[0] 

1146 return os.path.join( 

1147 self.remote_profile_dir, 

1148 'log', 

1149 os.path.basename(name) + f"-{time.time():.4f}.out", 

1150 ) 

1151 

1152 remote_profile_dir = Unicode( 

1153 '', 

1154 config=True, 

1155 help="""The remote profile_dir to use. 

1156 

1157 If not specified, use calling profile, stripping out possible leading homedir. 

1158 """, 

1159 ).tag(to_dict=True) 

1160 

1161 @observe('profile_dir') 

1162 def _profile_dir_changed(self, change): 

1163 if not self.remote_profile_dir: 

1164 # trigger remote_profile_dir_default logic again, 

1165 # in case it was already triggered before profile_dir was set 

1166 self.remote_profile_dir = self._strip_home(change['new']) 

1167 

1168 remote_python = Unicode( 

1169 "python3", config=True, help="""Remote path to Python interpreter, if needed""" 

1170 ).tag(to_dict=True) 

1171 

1172 @staticmethod 

1173 def _strip_home(path): 

1174 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo""" 

1175 home = get_home_dir() 

1176 if not home.endswith('/'): 

1177 home = home + '/' 

1178 

1179 if path.startswith(home): 

1180 return path[len(home) :] 

1181 else: 

1182 return path 

1183 

1184 @default("remote_profile_dir") 

1185 def _remote_profile_dir_default(self): 

1186 return self._strip_home(self.profile_dir) 

1187 

1188 @property 

1189 def cluster_env(self): 

1190 # use remote profile dir in env 

1191 env = super().cluster_env 

1192 env['IPP_PROFILE_DIR'] = self.remote_profile_dir 

1193 return env 

1194 

1195 _output = None 

1196 

1197 _ssh_sender = None 

1198 

1199 @property 

1200 def ssh_sender(self): 

1201 """instantiate ShellCommandSend object if needed""" 

1202 if self._ssh_sender: 

1203 return self._ssh_sender 

1204 

1205 self.log.info( 

1206 f'Create ShellCommandSend object ({self.ssh_cmd}, {self.ssh_args + [self.location]}, {self.remote_python} )' 

1207 ) 

1208 self._ssh_sender = ShellCommandSend( 

1209 self.ssh_cmd, 

1210 self.ssh_args + [self.location], 

1211 self.remote_python, 

1212 log=self.log, 

1213 ) 

1214 return self._ssh_sender 

1215 

1216 def _reconstruct_process(self, d): 

1217 # called in from_dict 

1218 # override from LocalProcessLauncher which invokes psutil.Process 

1219 if 'pid' in d and d['pid'] > 0: 

1220 self._start_waiting() 

1221 

1222 def poll(self): 

1223 """Override poll""" 

1224 if self.state == 'running': 

1225 return None 

1226 else: 

1227 return 0 

1228 

1229 def get_output(self, remove=False): 

1230 """Retrieve engine output from the remote file""" 

1231 if self._output is None: 

1232 with TemporaryDirectory() as td: 

1233 output_file = os.path.join( 

1234 td, os.path.basename(self.remote_output_file) 

1235 ) 

1236 try: 

1237 self._fetch_file(self.remote_output_file, output_file) 

1238 except Exception as e: 

1239 self.log.error( 

1240 f"Failed to get output file {self.remote_output_file}: {e}" 

1241 ) 

1242 self._output = '' 

1243 else: 

1244 if remove: 

1245 # remove the file after we retrieve it 

1246 self.log.info( 

1247 f"Removing {self.location}:{self.remote_output_file}" 

1248 ) 

1249 self.ssh_sender.cmd_remove(self.remote_output_file) 

1250 with open(output_file) as f: 

1251 self._output = f.read() 

1252 return self._output 

1253 

1254 def _send_file(self, local, remote, wait=True): 

1255 """send a single file""" 

1256 full_remote = f"{self.location}:{remote}".replace(os.path.sep, "/") 

1257 for i in range(10 if wait else 0): 

1258 if not os.path.exists(local): 

1259 self.log.debug(f"waiting for {local}") 

1260 time.sleep(1) 

1261 else: 

1262 break 

1263 remote_dir = os.path.dirname(remote) 

1264 self.log.info("ensuring remote %s:%s/ exists", self.location, remote_dir) 

1265 if not self.ssh_sender.cmd_exists(remote_dir): 

1266 self.ssh_sender.cmd_mkdir(remote_dir) 

1267 self.log.info("sending %s to %s", local, full_remote) 

1268 check_output(self.scp_cmd + self.scp_args + [local, full_remote], input=None) 

1269 

1270 def send_files(self): 

1271 """send our files (called before start)""" 

1272 if not self.to_send: 

1273 return 

1274 for local_file, remote_file in self.to_send: 

1275 self._send_file(local_file, remote_file) 

1276 

1277 def _fetch_file(self, remote, local, wait=True): 

1278 """fetch a single file""" 

1279 full_remote = f"{self.location}:{remote}".replace(os.path.sep, "/") 

1280 self.log.info("fetching %s from %s", local, full_remote) 

1281 for i in range(10 if wait else 0): 

1282 # wait up to 10s for remote file to exist 

1283 check = self.ssh_sender.cmd_exists(remote) 

1284 if check is False: 

1285 time.sleep(1) 

1286 elif check is True: 

1287 break 

1288 else: 

1289 raise ValueError(f"cmd_exists expects bool, got {check!r}") 

1290 local_dir = os.path.dirname(local) 

1291 ensure_dir_exists(local_dir, 700) 

1292 check_output(self.scp_cmd + self.scp_args + [full_remote, local]) 

1293 

1294 def fetch_files(self): 

1295 """fetch remote files (called after start)""" 

1296 if not self.to_fetch: 

1297 return 

1298 for remote_file, local_file in self.to_fetch: 

1299 self._fetch_file(remote_file, local_file) 

1300 

1301 def start(self, hostname=None, user=None, port=None): 

1302 if hostname is not None: 

1303 self.hostname = hostname 

1304 if user is not None: 

1305 self.user = user 

1306 if port is not None: 

1307 if '-p' not in self.ssh_args: 

1308 self.ssh_args.append('-p') 

1309 self.ssh_args.append(str(port)) 

1310 if '-P' not in self.scp_args: 

1311 self.scp_args.append('-P') 

1312 self.scp_args.append(str(port)) 

1313 

1314 # do some checks that setting are correct 

1315 shell_info = self.ssh_sender.get_shell_info() 

1316 python_ok = self.ssh_sender.has_python() 

1317 self.log.debug( 

1318 f"ssh sender object initiated (break_away_support={self.ssh_sender.breakaway_support})" 

1319 ) 

1320 

1321 # create remote profile dir 

1322 self.ssh_sender.check_output_python_module( 

1323 ["IPython", "profile", "create", "--profile-dir", self.remote_profile_dir] 

1324 ) 

1325 self.send_files() 

1326 self.pid = self.ssh_sender.cmd_start( 

1327 self.program + self.program_args, 

1328 env=self.get_env(), 

1329 output_file=self.remote_output_file, 

1330 ) 

1331 remote_cmd = ' '.join(self.program + self.program_args) 

1332 self.log.debug("Running `%s` (pid=%s)", remote_cmd, self.pid) 

1333 self.notify_start({'host': self.location, 'pid': self.pid}) 

1334 self._start_waiting() 

1335 self.fetch_files() 

1336 

1337 def _wait(self): 

1338 """Background thread waiting for a process to exit""" 

1339 exit_code = None 

1340 while not self._stop_waiting.is_set() and self.state == 'running': 

1341 try: 

1342 # use a timeout so we can check the _stop_waiting event 

1343 exit_code = self.wait_one(timeout=self.poll_seconds) 

1344 except TimeoutError: 

1345 continue 

1346 else: 

1347 break 

1348 stop_data = dict(exit_code=exit_code, pid=self.pid, identifier=self.identifier) 

1349 self.loop.add_callback(lambda: self.notify_stop(stop_data)) 

1350 

1351 def _start_waiting(self): 

1352 """Start background thread waiting on the process to exit""" 

1353 # ensure self.loop is accessed on the main thread before waiting 

1354 self.loop 

1355 self._stop_waiting = threading.Event() 

1356 self._wait_thread = Thread( 

1357 target=self._wait, 

1358 daemon=True, 

1359 name=f"wait(host={self.location}, pid={self.pid})", 

1360 ) 

1361 self._wait_thread.start() 

1362 

1363 def wait_one(self, timeout): 

1364 python_code = f"from ipyparallel.cluster.launcher import ssh_waitpid; ssh_waitpid({self.pid}, timeout={timeout})" 

1365 out = self.ssh_sender.check_output_python_code(python_code) 

1366 values = _ssh_outputs(out) 

1367 if 'process_running' not in values: 

1368 raise RuntimeError(out) 

1369 running = int(values.get("process_running", 0)) 

1370 if running: 

1371 raise TimeoutError("still running") 

1372 return int(values.get("exit_code", -1)) 

1373 

1374 async def join(self, timeout=None): 

1375 with ThreadPoolExecutor(1) as pool: 

1376 wait = partial(self.wait_one, timeout=timeout) 

1377 try: 

1378 future = pool.submit(wait) 

1379 except RuntimeError: 

1380 # e.g. called during process shutdown, 

1381 # which raises 

1382 # RuntimeError: cannot schedule new futures after interpreter shutdown 

1383 # Instead, do the blocking call 

1384 wait() 

1385 else: 

1386 await asyncio.wrap_future(future) 

1387 if getattr(self, '_stop_waiting', None) and self._wait_thread: 

1388 self._stop_waiting.set() 

1389 # got here, should be done 

1390 # wait for wait_thread to cleanup 

1391 self._wait_thread.join() 

1392 

1393 def signal(self, sig): 

1394 if self.state == 'running': 

1395 self.ssh_sender.cmd_kill(self.pid, sig) 

1396 

1397 @property 

1398 def remote_connection_files(self): 

1399 """Return remote paths for connection files""" 

1400 return { 

1401 key: self.remote_profile_dir + local_path[len(self.profile_dir) :] 

1402 for key, local_path in self.connection_files.items() 

1403 } 

1404 

1405 

1406class SSHControllerLauncher(SSHLauncher, ControllerLauncher): 

1407 # alias back to *non-configurable* program[_args] for use in find_args() 

1408 # this way all Controller/EngineSetLaunchers have the same form, rather 

1409 # than *some* having `program_args` and others `controller_args` 

1410 

1411 def _controller_cmd_default(self): 

1412 return [self.remote_python, "-m", 'ipyparallel.controller'] 

1413 

1414 @property 

1415 def program(self): 

1416 return self.controller_cmd 

1417 

1418 @property 

1419 def program_args(self): 

1420 return self.cluster_args + self.controller_args 

1421 

1422 @default("to_fetch") 

1423 def _to_fetch_default(self): 

1424 to_fetch = [] 

1425 return [ 

1426 (self.remote_connection_files[key], local_path) 

1427 for key, local_path in self.connection_files.items() 

1428 ] 

1429 

1430 

1431class SSHEngineLauncher(SSHLauncher, EngineLauncher): 

1432 # alias back to *non-configurable* program[_args] for use in find_args() 

1433 # this way all Controller/EngineSetLaunchers have the same form, rather 

1434 # than *some* having `program_args` and others `controller_args` 

1435 

1436 def _engine_cmd_default(self): 

1437 return [self.remote_python, "-m", "ipyparallel.engine"] 

1438 

1439 @property 

1440 def program(self): 

1441 return self.engine_cmd 

1442 

1443 @property 

1444 def program_args(self): 

1445 return self.cluster_args + self.engine_args 

1446 

1447 @default("to_send") 

1448 def _to_send_default(self): 

1449 return [ 

1450 (local_path, self.remote_connection_files[key]) 

1451 for key, local_path in self.connection_files.items() 

1452 ] 

1453 

1454 

1455class SSHEngineSetLauncher(LocalEngineSetLauncher, SSHLauncher): 

1456 launcher_class = SSHEngineLauncher 

1457 engines = Dict( 

1458 config=True, 

1459 help="""dict of engines to launch. This is a dict by hostname of ints, 

1460 corresponding to the number of engines to start on that host.""", 

1461 ).tag(to_dict=True) 

1462 

1463 def _engine_cmd_default(self): 

1464 return [self.remote_python, "-m", "ipyparallel.engine"] 

1465 

1466 # unset some traits we inherit but don't use 

1467 remote_output_file = "" 

1468 

1469 def start(self, n): 

1470 """Start engines by profile or profile_dir. 

1471 `n` is an *upper limit* of engines. 

1472 The `engines` config property is used to assign slots to hosts. 

1473 """ 

1474 

1475 dlist = [] 

1476 # traits to inherit: 

1477 # + all common config traits 

1478 # - traits set per-engine via engines dict 

1479 # + some non-configurable traits such as cluster_id 

1480 engine_traits = self.launcher_class.class_traits(config=True) 

1481 my_traits = self.traits(config=True) 

1482 shared_traits = set(my_traits).intersection(engine_traits) 

1483 # in addition to shared traits, pass some derived traits 

1484 # and exclude some composite traits 

1485 inherited_traits = shared_traits.difference( 

1486 {"location", "user", "hostname", "to_send", "to_fetch"} 

1487 ).union({"profile_dir", "cluster_id"}) 

1488 

1489 requested_n = n 

1490 started_n = 0 

1491 for host, n_or_config in self.engines.items(): 

1492 if isinstance(n_or_config, dict): 

1493 overrides = n_or_config 

1494 n = overrides.pop("n", 1) 

1495 else: 

1496 overrides = {} 

1497 n = n_or_config 

1498 

1499 full_host = host 

1500 

1501 if '@' in host: 

1502 user, host = host.split('@', 1) 

1503 else: 

1504 user = None 

1505 if ':' in host: 

1506 host, port = host.split(':', 1) 

1507 else: 

1508 port = None 

1509 

1510 for i in range(min(n, requested_n - started_n)): 

1511 if i > 0: 

1512 time.sleep(self.delay) 

1513 # pass all common traits to the launcher 

1514 kwargs = {attr: getattr(self, attr) for attr in inherited_traits} 

1515 # overrides from engine config 

1516 kwargs.update(overrides) 

1517 # explicit per-engine values 

1518 kwargs['parent'] = self 

1519 kwargs['identifier'] = key = f"{full_host}/{i}" 

1520 el = self.launchers[key] = self.launcher_class(**kwargs) 

1521 if i > 0: 

1522 # only send files for the first engine on each host 

1523 el.to_send = [] 

1524 

1525 el.on_stop(self._notice_engine_stopped) 

1526 d = el.start(user=user, hostname=host, port=port) 

1527 dlist.append(key) 

1528 started_n += 1 

1529 if started_n >= requested_n: 

1530 break 

1531 self.notify_start(dlist) 

1532 self.n = started_n 

1533 return dlist 

1534 

1535 

1536class SSHProxyEngineSetLauncher(SSHLauncher, EngineLauncher): 

1537 """Launcher for calling 

1538 `ipcluster engines` on a remote machine. 

1539 

1540 Requires that remote profile is already configured. 

1541 """ 

1542 

1543 n = Integer().tag(to_dict=True) 

1544 ipcluster_cmd = List(Unicode(), config=True) 

1545 

1546 @default("ipcluster_cmd") 

1547 def _default_ipcluster_cmd(self): 

1548 return [self.remote_python, "-m", "ipyparallel.cluster"] 

1549 

1550 ipcluster_args = List( 

1551 Unicode(), 

1552 config=True, 

1553 help="""Extra CLI arguments to pass to ipcluster engines""", 

1554 ) 

1555 

1556 @property 

1557 def program(self): 

1558 return self.ipcluster_cmd + ['engines'] 

1559 

1560 @property 

1561 def program_args(self): 

1562 return [ 

1563 '-n', 

1564 str(self.n), 

1565 '--profile-dir', 

1566 self.remote_profile_dir, 

1567 '--cluster-id', 

1568 self.cluster_id, 

1569 ] + self.ipcluster_args 

1570 

1571 @default("to_send") 

1572 def _to_send_default(self): 

1573 return [ 

1574 (local_path, self.remote_connection_files[key]) 

1575 for key, local_path in self.connection_files.items() 

1576 ] 

1577 

1578 def start(self, n): 

1579 self.n = n 

1580 super().start() 

1581 

1582 

1583# ----------------------------------------------------------------------------- 

1584# Windows HPC Server 2008 scheduler launchers 

1585# ----------------------------------------------------------------------------- 

1586 

1587 

1588class WindowsHPCLauncher(BaseLauncher): 

1589 job_id_regexp = CRegExp( 

1590 r'\d+', 

1591 config=True, 

1592 help="""A regular expression used to get the job id from the output of the 

1593 submit_command. """, 

1594 ) 

1595 job_file_name = Unicode( 

1596 'ipython_job.xml', 

1597 config=True, 

1598 help="The filename of the instantiated job script.", 

1599 ) 

1600 scheduler = Unicode( 

1601 '', config=True, help="The hostname of the scheduler to submit the job to." 

1602 ) 

1603 job_cmd = Unicode(config=True, help="The command for submitting jobs.") 

1604 

1605 @default("job_cmd") 

1606 def _default_job(self): 

1607 return shutil.which("job") or "job" 

1608 

1609 @property 

1610 def job_file(self): 

1611 return os.path.join(self.work_dir, self.job_file_name) 

1612 

1613 def write_job_file(self, n): 

1614 raise NotImplementedError("Implement write_job_file in a subclass.") 

1615 

1616 def find_args(self): 

1617 return ['job.exe'] 

1618 

1619 def parse_job_id(self, output): 

1620 """Take the output of the submit command and return the job id.""" 

1621 m = self.job_id_regexp.search(output) 

1622 if m is not None: 

1623 job_id = m.group() 

1624 else: 

1625 raise LauncherError(f"Job id couldn't be determined: {output}") 

1626 self.job_id = job_id 

1627 self.log.info(f'Job started with id: {job_id}') 

1628 return job_id 

1629 

1630 def start(self, n): 

1631 """Start n copies of the process using the Win HPC job scheduler.""" 

1632 self.write_job_file(n) 

1633 args = [ 

1634 'submit', 

1635 f'/jobfile:{self.job_file}', 

1636 f'/scheduler:{self.scheduler}', 

1637 ] 

1638 self.log.debug( 

1639 "Starting Win HPC Job: {}".format(self.job_cmd + ' ' + ' '.join(args)) 

1640 ) 

1641 

1642 output = check_output( 

1643 [self.job_cmd] + args, env=os.environ, cwd=self.work_dir, stderr=STDOUT 

1644 ) 

1645 output = output.decode("utf8", 'replace') 

1646 job_id = self.parse_job_id(output) 

1647 self.notify_start(job_id) 

1648 return job_id 

1649 

1650 def stop(self): 

1651 args = ['cancel', self.job_id, f'/scheduler:{self.scheduler}'] 

1652 self.log.info( 

1653 "Stopping Win HPC Job: {}".format(self.job_cmd + ' ' + ' '.join(args)) 

1654 ) 

1655 try: 

1656 output = check_output( 

1657 [self.job_cmd] + args, env=os.environ, cwd=self.work_dir, stderr=STDOUT 

1658 ) 

1659 output = output.decode("utf8", 'replace') 

1660 except Exception: 

1661 output = f'The job already appears to be stopped: {self.job_id}' 

1662 self.notify_stop( 

1663 dict(job_id=self.job_id, output=output) 

1664 ) # Pass the output of the kill cmd 

1665 return output 

1666 

1667 

1668class WindowsHPCControllerLauncher(WindowsHPCLauncher): 

1669 job_file_name = Unicode( 

1670 'ipcontroller_job.xml', config=True, help="WinHPC xml job file." 

1671 ) 

1672 controller_args = List([], config=False, help="extra args to pass to ipcontroller") 

1673 

1674 def write_job_file(self, n): 

1675 job = IPControllerJob(parent=self) 

1676 

1677 t = IPControllerTask(parent=self) 

1678 # The tasks work directory is *not* the actual work directory of 

1679 # the controller. It is used as the base path for the stdout/stderr 

1680 # files that the scheduler redirects to. 

1681 t.work_directory = self.profile_dir 

1682 # Add the profile_dir and from self.start(). 

1683 t.controller_args.extend(self.cluster_args) 

1684 t.controller_args.extend(self.controller_args) 

1685 job.add_task(t) 

1686 

1687 self.log.debug("Writing job description file: %s", self.job_file) 

1688 job.write(self.job_file) 

1689 

1690 

1691class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): 

1692 job_file_name = Unicode( 

1693 'ipengineset_job.xml', config=True, help="jobfile for ipengines job" 

1694 ) 

1695 engine_args = List(Unicode(), config=False, help="extra args to pas to ipengine") 

1696 

1697 def write_job_file(self, n): 

1698 job = IPEngineSetJob(parent=self) 

1699 

1700 for i in range(n): 

1701 t = IPEngineTask(parent=self) 

1702 # The tasks work directory is *not* the actual work directory of 

1703 # the engine. It is used as the base path for the stdout/stderr 

1704 # files that the scheduler redirects to. 

1705 t.work_directory = self.profile_dir 

1706 # Add the profile_dir and from self.start(). 

1707 t.engine_args.extend(self.cluster_args) 

1708 t.engine_args.extend(self.engine_args) 

1709 job.add_task(t) 

1710 

1711 self.log.debug("Writing job description file: %s", self.job_file) 

1712 job.write(self.job_file) 

1713 

1714 def start(self, n): 

1715 """Start the controller by profile_dir.""" 

1716 return super().start(n) 

1717 

1718 

1719# ----------------------------------------------------------------------------- 

1720# Batch (PBS) system launchers 

1721# ----------------------------------------------------------------------------- 

1722 

1723 

1724class BatchSystemLauncher(BaseLauncher): 

1725 """Launch an external process using a batch system. 

1726 

1727 This class is designed to work with UNIX batch systems like PBS, LSF, 

1728 GridEngine, etc. The overall model is that there are different commands 

1729 like qsub, qdel, etc. that handle the starting and stopping of the process. 

1730 

1731 This class also has the notion of a batch script. The ``batch_template`` 

1732 attribute can be set to a string that is a template for the batch script. 

1733 This template is instantiated using string formatting. Thus the template can 

1734 use {n} for the number of instances. Subclasses can add additional variables 

1735 to the template dict. 

1736 """ 

1737 

1738 # load cluster args into context instead of cli 

1739 

1740 output_file = Unicode( 

1741 config=True, help="File in which to store stdout/err of processes" 

1742 ).tag(to_dict=True) 

1743 

1744 @default("output_file") 

1745 def _default_output_file(self): 

1746 log_dir = os.path.join(self.profile_dir, "log") 

1747 os.makedirs(log_dir, exist_ok=True) 

1748 return os.path.join(log_dir, f'{self.identifier}.log') 

1749 

1750 # Subclasses must fill these in. See PBSEngineSet 

1751 submit_command = List( 

1752 [''], 

1753 config=True, 

1754 help="The name of the command line program used to submit jobs.", 

1755 ) 

1756 delete_command = List( 

1757 [''], 

1758 config=True, 

1759 help="The name of the command line program used to delete jobs.", 

1760 ) 

1761 

1762 signal_command = List( 

1763 [''], 

1764 config=True, 

1765 help="The name of the command line program used to send signals to jobs.", 

1766 ) 

1767 

1768 job_id = Unicode().tag(to_dict=True) 

1769 

1770 job_id_regexp = CRegExp( 

1771 '', 

1772 config=True, 

1773 help="""A regular expression used to get the job id from the output of the 

1774 submit_command.""", 

1775 ) 

1776 job_id_regexp_group = Integer( 

1777 0, 

1778 config=True, 

1779 help="""The group we wish to match in job_id_regexp (0 to match all)""", 

1780 ) 

1781 batch_template = Unicode( 

1782 '', config=True, help="The string that is the batch script template itself." 

1783 ).tag(to_dict=True) 

1784 batch_template_file = Unicode( 

1785 '', config=True, help="The file that contains the batch template." 

1786 ) 

1787 batch_file_name = Unicode( 

1788 'batch_script', 

1789 config=True, 

1790 help="The filename of the instantiated batch script.", 

1791 ).tag(to_dict=True) 

1792 queue = Unicode('', config=True, help="The batch queue.").tag(to_dict=True) 

1793 

1794 n = Integer(1).tag(to_dict=True) 

1795 

1796 @observe('queue', 'n', 'cluster_id', 'profile_dir', 'output_file') 

1797 def _context_field_changed(self, change): 

1798 self._update_context(change) 

1799 

1800 # not configurable, override in subclasses 

1801 # Job Array regex 

1802 job_array_regexp = CRegExp('') 

1803 job_array_template = Unicode('') 

1804 

1805 # Queue regex 

1806 queue_regexp = CRegExp('') 

1807 queue_template = Unicode('') 

1808 

1809 # Output file 

1810 output_regexp = CRegExp('') 

1811 output_template = Unicode('') 

1812 

1813 # The default batch template, override in subclasses 

1814 default_template = Unicode('') 

1815 # The full path to the instantiated batch script. 

1816 batch_file = Unicode('') 

1817 # the format dict used with batch_template: 

1818 context = Dict() 

1819 

1820 namespace = Dict( 

1821 config=True, 

1822 help="""Extra variables to pass to the template. 

1823 

1824 This lets you parameterize additional options, 

1825 such as wall_time with a custom template. 

1826 """, 

1827 ).tag(to_dict=True) 

1828 

1829 @default("context") 

1830 def _context_default(self): 

1831 """load the default context with the default values for the basic keys 

1832 

1833 because the _trait_changed methods only load the context if they 

1834 are set to something other than the default value. 

1835 """ 

1836 return dict( 

1837 n=self.n, 

1838 queue=self.queue, 

1839 profile_dir=self.profile_dir, 

1840 cluster_id=self.cluster_id, 

1841 output_file=self.output_file, 

1842 ) 

1843 

1844 program = List(Unicode()) 

1845 program_args = List(Unicode()) 

1846 

1847 @observe("program", "program_args") 

1848 def _program_changed(self, change=None): 

1849 self.context['program'] = shlex_join(self.program) 

1850 self.context['program_args'] = shlex_join(self.program_args) 

1851 self.context['program_and_args'] = shlex_join(self.program + self.program_args) 

1852 

1853 @observe("n", "queue") 

1854 def _update_context(self, change): 

1855 self.context[change['name']] = change['new'] 

1856 

1857 # the Formatter instance for rendering the templates: 

1858 formatter = Instance(EvalFormatter, (), {}) 

1859 

1860 def find_args(self): 

1861 return self.submit_command + [self.batch_file] 

1862 

1863 def __init__(self, work_dir='.', config=None, **kwargs): 

1864 super().__init__(work_dir=work_dir, config=config, **kwargs) 

1865 self.batch_file = os.path.join(self.work_dir, self.batch_file_name) 

1866 # trigger program_changed to populate default context arguments 

1867 self._program_changed() 

1868 

1869 def _run_command(self, command, **kwargs): 

1870 joined_command = shlex_join(command) 

1871 self.log.debug("Running command: %s", joined_command) 

1872 output = check_output( 

1873 command, 

1874 stdin=None, 

1875 **kwargs, 

1876 ).decode("utf8", "replace") 

1877 self.log.debug("Command %s output: %s", command[0], output) 

1878 return output 

1879 

1880 def parse_job_id(self, output): 

1881 """Take the output of the submit command and return the job id.""" 

1882 m = self.job_id_regexp.search(output) 

1883 if m is not None: 

1884 job_id = m.group(self.job_id_regexp_group) 

1885 else: 

1886 raise LauncherError(f"Job id couldn't be determined: {output}") 

1887 self.job_id = job_id 

1888 self.log.info('Job submitted with job id: %r', job_id) 

1889 return job_id 

1890 

1891 def write_batch_script(self, n=1): 

1892 """Instantiate and write the batch script to the work_dir.""" 

1893 self.n = n 

1894 self.context['environment_json'] = json.dumps(self.get_env()) 

1895 

1896 # first priority is batch_template if set 

1897 if self.batch_template_file and not self.batch_template: 

1898 # second priority is batch_template_file 

1899 with open(self.batch_template_file) as f: 

1900 self.batch_template = f.read() 

1901 if not self.batch_template: 

1902 # third (last) priority is default_template 

1903 self.batch_template = self.default_template 

1904 # add jobarray or queue lines to user-specified template 

1905 # note that this is *only* when user did not specify a template. 

1906 self._insert_options_in_script() 

1907 self._insert_job_array_in_script() 

1908 ns = {} 

1909 # internally generated 

1910 ns.update(self.context) 

1911 # from user config 

1912 ns.update(self.namespace) 

1913 script_as_string = self.formatter.format(self.batch_template, **ns) 

1914 self.log.debug(f'Writing batch script: {self.batch_file}\n{script_as_string}') 

1915 with open(self.batch_file, 'w') as f: 

1916 f.write(script_as_string) 

1917 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) 

1918 

1919 def _insert_options_in_script(self): 

1920 """Inserts a queue if required into the batch script.""" 

1921 inserts = [] 

1922 if self.queue and not self.queue_regexp.search(self.batch_template): 

1923 self.log.debug(f"Adding queue={self.queue} to {self.batch_file}") 

1924 inserts.append(self.queue_template) 

1925 

1926 if ( 

1927 self.output_file 

1928 and self.output_template 

1929 and not self.output_regexp.search(self.batch_template) 

1930 ): 

1931 self.log.debug(f"Adding output={self.output_file} to {self.batch_file}") 

1932 inserts.append(self.output_template) 

1933 

1934 if inserts: 

1935 firstline, rest = self.batch_template.split('\n', 1) 

1936 self.batch_template = '\n'.join([firstline] + inserts + [rest]) 

1937 

1938 def _insert_job_array_in_script(self): 

1939 """Inserts a job array if required into the batch script.""" 

1940 if not self.job_array_regexp.search(self.batch_template): 

1941 self.log.debug("adding job array settings to batch script") 

1942 firstline, rest = self.batch_template.split('\n', 1) 

1943 self.batch_template = '\n'.join([firstline, self.job_array_template, rest]) 

1944 

1945 def start(self, n=1): 

1946 """Start n copies of the process using a batch system.""" 

1947 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args) 

1948 # Here we save profile_dir in the context so they 

1949 # can be used in the batch script template as {profile_dir} 

1950 self.write_batch_script(n) 

1951 

1952 env = os.environ.copy() 

1953 env.update(self.get_env()) 

1954 output = self._run_command(self.args, env=env) 

1955 

1956 job_id = self.parse_job_id(output) 

1957 self.notify_start(job_id) 

1958 return job_id 

1959 

1960 def stop(self): 

1961 command = self.delete_command + [self.job_id] 

1962 output = self._run_command(command) 

1963 

1964 self.notify_stop( 

1965 dict(job_id=self.job_id, output=output) 

1966 ) # Pass the output of the kill cmd 

1967 return output 

1968 

1969 def signal(self, sig): 

1970 command = self.signal_command + [str(sig), self.job_id] 

1971 self._run_command(command) 

1972 

1973 # same local-file implementation as LocalProcess 

1974 # should this be on the base class? 

1975 _output = None 

1976 

1977 def get_output(self, remove=True): 

1978 return LocalProcessLauncher.get_output(self, remove=remove) 

1979 

1980 def poll(self): 

1981 """Poll not implemented 

1982 

1983 Need to use `squeue` and friends to check job status 

1984 """ 

1985 return None 

1986 

1987 

1988class BatchControllerLauncher(BatchSystemLauncher, ControllerLauncher): 

1989 @default("program") 

1990 def _default_program(self): 

1991 return self.controller_cmd 

1992 

1993 @observe("controller_cmd") 

1994 def _controller_cmd_changed(self, change): 

1995 self.program = self._default_program() 

1996 

1997 @default("program_args") 

1998 def _default_program_args(self): 

1999 return self.cluster_args + self.controller_args 

2000 

2001 @observe("controller_args") 

2002 def _controller_args_changed(self, change): 

2003 self.program_args = self._default_program_args() 

2004 

2005 def start(self): 

2006 return super().start(n=1) 

2007 

2008 

2009class BatchEngineSetLauncher(BatchSystemLauncher, EngineLauncher): 

2010 @default("program") 

2011 def _default_program(self): 

2012 return self.engine_cmd 

2013 

2014 @observe("engine_cmd") 

2015 def _engine_cmd_changed(self, change): 

2016 self.program = self._default_program() 

2017 

2018 @default("program_args") 

2019 def _default_program_args(self): 

2020 return self.cluster_args + self.engine_args 

2021 

2022 @observe("engine_args") 

2023 def _engine_args_changed(self, change): 

2024 self.program_args = self._default_program_args() 

2025 

2026 

2027class PBSLauncher(BatchSystemLauncher): 

2028 """A BatchSystemLauncher subclass for PBS.""" 

2029 

2030 submit_command = List(['qsub'], config=True, help="The PBS submit command ['qsub']") 

2031 delete_command = List(['qdel'], config=True, help="The PBS delete command ['qdel']") 

2032 signal_command = List( 

2033 ['qsig', '-s'], config=True, help="The PBS signal command ['qsig']" 

2034 ) 

2035 job_id_regexp = CRegExp( 

2036 r'\d+', 

2037 config=True, 

2038 help=r"Regular expresion for identifying the job ID [r'\d+']", 

2039 ) 

2040 

2041 batch_file = Unicode('') 

2042 job_array_regexp = CRegExp(r'#PBS\W+-t\W+[\w\d\-\$]+') 

2043 job_array_template = Unicode('#PBS -t 1-{n}') 

2044 queue_regexp = CRegExp(r'#PBS\W+-q\W+\$?\w+') 

2045 queue_template = Unicode('#PBS -q {queue}') 

2046 output_regexp = CRegExp(r'#PBS\W+(?:-o)\W+\$?\w+') 

2047 output_template = Unicode('#PBS -j oe\n#PBS -o {output_file}') 

2048 

2049 

2050class PBSControllerLauncher(PBSLauncher, BatchControllerLauncher): 

2051 """Launch a controller using PBS.""" 

2052 

2053 batch_file_name = Unicode( 

2054 'pbs_controller', config=True, help="batch file name for the controller job." 

2055 ) 

2056 default_template = Unicode( 

2057 """#!/bin/sh 

2058#PBS -V 

2059#PBS -N ipcontroller 

2060{program_and_args} 

2061""" 

2062 ) 

2063 

2064 

2065class PBSEngineSetLauncher(PBSLauncher, BatchEngineSetLauncher): 

2066 """Launch Engines using PBS""" 

2067 

2068 batch_file_name = Unicode( 

2069 'pbs_engines', config=True, help="batch file name for the engine(s) job." 

2070 ) 

2071 default_template = Unicode( 

2072 """#!/bin/sh 

2073#PBS -V 

2074#PBS -N ipengine 

2075{program_and_args} 

2076""" 

2077 ) 

2078 

2079 

2080# Slurm is very similar to PBS 

2081 

2082 

2083class SlurmLauncher(BatchSystemLauncher): 

2084 """A BatchSystemLauncher subclass for slurm.""" 

2085 

2086 submit_command = List( 

2087 ['sbatch'], config=True, help="The slurm submit command ['sbatch']" 

2088 ) 

2089 delete_command = List( 

2090 ['scancel'], config=True, help="The slurm delete command ['scancel']" 

2091 ) 

2092 signal_command = List( 

2093 ['scancel', '-s'], 

2094 config=True, 

2095 help="The slurm signal command ['scancel', '-s']", 

2096 ) 

2097 job_id_regexp = CRegExp( 

2098 r'\d+', 

2099 config=True, 

2100 help=r"Regular expresion for identifying the job ID [r'\d+']", 

2101 ) 

2102 

2103 account = Unicode("", config=True, help="Slurm account to be used") 

2104 

2105 qos = Unicode("", config=True, help="Slurm QoS to be used") 

2106 

2107 # Note: from the man page: 

2108 #'Acceptable time formats include "minutes", "minutes:seconds", 

2109 # "hours:minutes:seconds", "days-hours", "days-hours:minutes" 

2110 # and "days-hours:minutes:seconds". 

2111 timelimit = Any("", config=True, help="Slurm timelimit to be used") 

2112 

2113 options = Unicode("", config=True, help="Extra Slurm options") 

2114 

2115 @observe('account') 

2116 def _account_changed(self, change): 

2117 self._update_context(change) 

2118 

2119 @observe('qos') 

2120 def _qos_changed(self, change): 

2121 self._update_context(change) 

2122 

2123 @observe('timelimit') 

2124 def _timelimit_changed(self, change): 

2125 self._update_context(change) 

2126 

2127 @observe('options') 

2128 def _options_changed(self, change): 

2129 self._update_context(change) 

2130 

2131 batch_file = Unicode('') 

2132 

2133 job_array_regexp = CRegExp(r'#SBATCH\W+(?:--ntasks|-n)[\w\d\-\$]+') 

2134 job_array_template = Unicode('''#SBATCH --ntasks={n}''') 

2135 

2136 queue_regexp = CRegExp(r'#SBATCH\W+(?:--partition|-p)\W+\$?\w+') 

2137 queue_template = Unicode('#SBATCH --partition={queue}') 

2138 

2139 account_regexp = CRegExp(r'#SBATCH\W+(?:--account|-A)\W+\$?\w+') 

2140 account_template = Unicode('#SBATCH --account={account}') 

2141 

2142 qos_regexp = CRegExp(r'#SBATCH\W+--qos\W+\$?\w+') 

2143 qos_template = Unicode('#SBATCH --qos={qos}') 

2144 

2145 timelimit_regexp = CRegExp(r'#SBATCH\W+(?:--time|-t)\W+\$?\w+') 

2146 timelimit_template = Unicode('#SBATCH --time={timelimit}') 

2147 

2148 output_regexp = CRegExp(r'#SBATCH\W+(?:--output)\W+\$?\w+') 

2149 output_template = Unicode('#SBATCH --output={output_file}') 

2150 

2151 def _insert_options_in_script(self): 

2152 """Insert 'partition' (slurm name for queue), 'account', 'time' and other options if necessary""" 

2153 super()._insert_options_in_script() 

2154 inserts = [] 

2155 if self.account and not self.account_regexp.search(self.batch_template): 

2156 self.log.debug("adding slurm account settings to batch script") 

2157 inserts.append(self.account_template) 

2158 

2159 if self.qos and not self.qos_regexp.search(self.batch_template): 

2160 self.log.debug("adding Slurm qos settings to batch script") 

2161 firstline, rest = self.batch_template.split('\n', 1) 

2162 inserts.append(self.qos_template) 

2163 

2164 if self.timelimit and not self.timelimit_regexp.search(self.batch_template): 

2165 self.log.debug("adding slurm time limit settings to batch script") 

2166 inserts.append(self.timelimit_template) 

2167 

2168 if inserts: 

2169 firstline, rest = self.batch_template.split('\n', 1) 

2170 self.batch_template = '\n'.join([firstline] + inserts + [rest]) 

2171 

2172 

2173class SlurmControllerLauncher(SlurmLauncher, BatchControllerLauncher): 

2174 """Launch a controller using Slurm.""" 

2175 

2176 batch_file_name = Unicode( 

2177 'slurm_controller.sbatch', 

2178 config=True, 

2179 help="batch file name for the controller job.", 

2180 ) 

2181 default_template = Unicode( 

2182 """#!/bin/sh 

2183#SBATCH --export=ALL 

2184#SBATCH --job-name=ipcontroller-{cluster_id} 

2185#SBATCH --ntasks=1 

2186{program_and_args} 

2187""" 

2188 ) 

2189 

2190 

2191class SlurmEngineSetLauncher(SlurmLauncher, BatchEngineSetLauncher): 

2192 """Launch Engines using Slurm""" 

2193 

2194 batch_file_name = Unicode( 

2195 'slurm_engine.sbatch', 

2196 config=True, 

2197 help="batch file name for the engine(s) job.", 

2198 ) 

2199 default_template = Unicode( 

2200 """#!/bin/sh 

2201#SBATCH --export=ALL 

2202#SBATCH --job-name=ipengine-{cluster_id} 

2203srun {program_and_args} 

2204""" 

2205 ) 

2206 

2207 

2208# SGE is very similar to PBS 

2209 

2210 

2211class SGELauncher(PBSLauncher): 

2212 """Sun GridEngine is a PBS clone with slightly different syntax""" 

2213 

2214 job_array_regexp = CRegExp(r'#\$\W+\-t') 

2215 job_array_template = Unicode('#$ -t 1-{n}') 

2216 queue_regexp = CRegExp(r'#\$\W+-q\W+\$?\w+') 

2217 queue_template = Unicode('#$ -q {queue}') 

2218 

2219 

2220class SGEControllerLauncher(SGELauncher, BatchControllerLauncher): 

2221 """Launch a controller using SGE.""" 

2222 

2223 batch_file_name = Unicode( 

2224 'sge_controller', config=True, help="batch file name for the ipontroller job." 

2225 ) 

2226 default_template = Unicode( 

2227 """#$ -V 

2228#$ -S /bin/sh 

2229#$ -N ipcontroller 

2230{program_and_args} 

2231""" 

2232 ) 

2233 

2234 

2235class SGEEngineSetLauncher(SGELauncher, BatchEngineSetLauncher): 

2236 """Launch Engines with SGE""" 

2237 

2238 batch_file_name = Unicode( 

2239 'sge_engines', config=True, help="batch file name for the engine(s) job." 

2240 ) 

2241 default_template = Unicode( 

2242 """#$ -V 

2243#$ -S /bin/sh 

2244#$ -N ipengine 

2245{program_and_args} 

2246""" 

2247 ) 

2248 

2249 

2250# LSF launchers 

2251 

2252 

2253class LSFLauncher(BatchSystemLauncher): 

2254 """A BatchSystemLauncher subclass for LSF.""" 

2255 

2256 submit_command = List(['bsub'], config=True, help="The LSF submit command ['bsub']") 

2257 delete_command = List( 

2258 ['bkill'], config=True, help="The LSF delete command ['bkill']" 

2259 ) 

2260 signal_command = List( 

2261 ['bkill', '-s'], config=True, help="The LSF signal command ['bkill', '-s']" 

2262 ) 

2263 job_id_regexp = CRegExp( 

2264 r'\d+', 

2265 config=True, 

2266 help=r"Regular expresion for identifying the job ID [r'\d+']", 

2267 ) 

2268 

2269 batch_file = Unicode('') 

2270 job_array_regexp = CRegExp(r'#BSUB\s+-J+\w+\[\d+-\d+\]') 

2271 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]') 

2272 queue_regexp = CRegExp(r'#BSUB\s+-q\s+\w+') 

2273 queue_template = Unicode('#BSUB -q {queue}') 

2274 output_regexp = CRegExp(r'#BSUB\s+-oo?\s+\w+') 

2275 output_template = Unicode('#BSUB -o {output_file}\n#BSUB -e {output_file}\n') 

2276 

2277 def start(self, n=1): 

2278 """Start n copies of the process using LSF batch system. 

2279 This cant inherit from the base class because bsub expects 

2280 to be piped a shell script in order to honor the #BSUB directives : 

2281 bsub < script 

2282 """ 

2283 # Here we save profile_dir in the context so they 

2284 # can be used in the batch script template as {profile_dir} 

2285 self.write_batch_script(n) 

2286 piped_cmd = self.args[0] + '<"' + self.args[1] + '"' 

2287 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd) 

2288 p = Popen(piped_cmd, shell=True, env=os.environ, stdout=PIPE) 

2289 output, err = p.communicate() 

2290 output = output.decode("utf8", 'replace') 

2291 job_id = self.parse_job_id(output) 

2292 self.notify_start(job_id) 

2293 return job_id 

2294 

2295 

2296class LSFControllerLauncher(LSFLauncher, BatchControllerLauncher): 

2297 """Launch a controller using LSF.""" 

2298 

2299 batch_file_name = Unicode( 

2300 'lsf_controller', config=True, help="batch file name for the controller job." 

2301 ) 

2302 default_template = Unicode( 

2303 """#!/bin/sh 

2304 #BSUB -env all 

2305 #BSUB -J ipcontroller-{cluster_id} 

2306 {program_and_args} 

2307 """ 

2308 ) 

2309 

2310 

2311class LSFEngineSetLauncher(LSFLauncher, BatchEngineSetLauncher): 

2312 """Launch Engines using LSF""" 

2313 

2314 batch_file_name = Unicode( 

2315 'lsf_engines', config=True, help="batch file name for the engine(s) job." 

2316 ) 

2317 default_template = Unicode( 

2318 """#!/bin/sh 

2319 #BSUB -J ipengine-{cluster_id} 

2320 #BSUB -env all 

2321 {program_and_args} 

2322 """ 

2323 ) 

2324 

2325 def get_env(self): 

2326 # write directly to output files 

2327 # otherwise, will copy and clobber merged stdout/err 

2328 env = {"LSB_STDOUT_DIRECT": "Y"} 

2329 env.update(super().get_env()) 

2330 return env 

2331 

2332 

2333class HTCondorLauncher(BatchSystemLauncher): 

2334 """A BatchSystemLauncher subclass for HTCondor. 

2335 

2336 HTCondor requires that we launch the ipengine/ipcontroller scripts rather 

2337 that the python instance but otherwise is very similar to PBS. This is because 

2338 HTCondor destroys sys.executable when launching remote processes - a launched 

2339 python process depends on sys.executable to effectively evaluate its 

2340 module search paths. Without it, regardless of which python interpreter you launch 

2341 you will get the to built in module search paths. 

2342 

2343 We use the ip{cluster, engine, controller} scripts as our executable to circumvent 

2344 this - the mechanism of shebanged scripts means that the python binary will be 

2345 launched with argv[0] set to the *location of the ip{cluster, engine, controller} 

2346 scripts on the remote node*. This means you need to take care that: 

2347 

2348 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller 

2349 of the python environment you wish to execute code in having top precedence. 

2350 b. This functionality is untested on Windows. 

2351 

2352 If you need different behavior, consider making you own template. 

2353 """ 

2354 

2355 submit_command = List( 

2356 ['condor_submit'], 

2357 config=True, 

2358 help="The HTCondor submit command ['condor_submit']", 

2359 ) 

2360 delete_command = List( 

2361 ['condor_rm'], config=True, help="The HTCondor delete command ['condor_rm']" 

2362 ) 

2363 job_id_regexp = CRegExp( 

2364 r'(\d+)\.$', 

2365 config=True, 

2366 help=r"Regular expression for identifying the job ID [r'(\d+)\.$']", 

2367 ) 

2368 job_id_regexp_group = Integer( 

2369 1, config=True, help="""The group we wish to match in job_id_regexp [1]""" 

2370 ) 

2371 

2372 job_array_regexp = CRegExp(r'queue\W+\$') 

2373 job_array_template = Unicode('queue {n}') 

2374 

2375 def _insert_job_array_in_script(self): 

2376 """Inserts a job array if required into the batch script.""" 

2377 if not self.job_array_regexp.search(self.batch_template): 

2378 self.log.debug("adding job array settings to batch script") 

2379 # HTCondor requires that the job array goes at the bottom of the script 

2380 self.batch_template = '\n'.join( 

2381 [self.batch_template, self.job_array_template] 

2382 ) 

2383 

2384 def _insert_options_in_script(self): 

2385 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be 

2386 specified in the script. 

2387 """ 

2388 super()._insert_options_in_script() 

2389 

2390 

2391class HTCondorControllerLauncher(HTCondorLauncher, BatchControllerLauncher): 

2392 """Launch a controller using HTCondor.""" 

2393 

2394 batch_file_name = Unicode( 

2395 'htcondor_controller', 

2396 config=True, 

2397 help="batch file name for the controller job.", 

2398 ) 

2399 default_template = Unicode( 

2400 r""" 

2401universe = vanilla 

2402executable = ipcontroller 

2403# by default we expect a shared file system 

2404transfer_executable = False 

2405arguments = {program_args} 

2406""" 

2407 ) 

2408 

2409 

2410class HTCondorEngineSetLauncher(HTCondorLauncher, BatchEngineSetLauncher): 

2411 """Launch Engines using HTCondor""" 

2412 

2413 batch_file_name = Unicode( 

2414 'htcondor_engines', config=True, help="batch file name for the engine(s) job." 

2415 ) 

2416 default_template = Unicode( 

2417 """ 

2418universe = vanilla 

2419executable = ipengine 

2420# by default we expect a shared file system 

2421transfer_executable = False 

2422arguments = "{program_args}" 

2423""" 

2424 ) 

2425 

2426 

2427# ----------------------------------------------------------------------------- 

2428# Collections of launchers 

2429# ----------------------------------------------------------------------------- 

2430 

2431local_launchers = [ 

2432 LocalControllerLauncher, 

2433 LocalEngineLauncher, 

2434 LocalEngineSetLauncher, 

2435] 

2436mpi_launchers = [ 

2437 MPILauncher, 

2438 MPIControllerLauncher, 

2439 MPIEngineSetLauncher, 

2440] 

2441ssh_launchers = [ 

2442 SSHLauncher, 

2443 SSHControllerLauncher, 

2444 SSHEngineLauncher, 

2445 SSHEngineSetLauncher, 

2446 SSHProxyEngineSetLauncher, 

2447] 

2448winhpc_launchers = [ 

2449 WindowsHPCLauncher, 

2450 WindowsHPCControllerLauncher, 

2451 WindowsHPCEngineSetLauncher, 

2452] 

2453pbs_launchers = [ 

2454 PBSLauncher, 

2455 PBSControllerLauncher, 

2456 PBSEngineSetLauncher, 

2457] 

2458slurm_launchers = [ 

2459 SlurmLauncher, 

2460 SlurmControllerLauncher, 

2461 SlurmEngineSetLauncher, 

2462] 

2463sge_launchers = [ 

2464 SGELauncher, 

2465 SGEControllerLauncher, 

2466 SGEEngineSetLauncher, 

2467] 

2468lsf_launchers = [ 

2469 LSFLauncher, 

2470 LSFControllerLauncher, 

2471 LSFEngineSetLauncher, 

2472] 

2473htcondor_launchers = [ 

2474 HTCondorLauncher, 

2475 HTCondorControllerLauncher, 

2476 HTCondorEngineSetLauncher, 

2477] 

2478all_launchers = ( 

2479 local_launchers 

2480 + mpi_launchers 

2481 + ssh_launchers 

2482 + winhpc_launchers 

2483 + pbs_launchers 

2484 + slurm_launchers 

2485 + sge_launchers 

2486 + lsf_launchers 

2487 + htcondor_launchers 

2488) 

2489 

2490 

2491def find_launcher_class(name, kind): 

2492 """Return a launcher class for a given name and kind. 

2493 

2494 Parameters 

2495 ---------- 

2496 name : str 

2497 The full name of the launcher class, either with or without the 

2498 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, HTCondor 

2499 Slurm, WindowsHPC). 

2500 kind : str 

2501 Either 'EngineSet' or 'Controller'. 

2502 """ 

2503 if kind == 'engine': 

2504 group_name = 'ipyparallel.engine_launchers' 

2505 elif kind == 'controller': 

2506 group_name = 'ipyparallel.controller_launchers' 

2507 else: 

2508 raise ValueError(f"kind must be 'engine' or 'controller', not {kind!r}") 

2509 group = entry_points(group=group_name) 

2510 # make it case-insensitive 

2511 registry = {entrypoint.name.lower(): entrypoint for entrypoint in group} 

2512 return registry[name.lower()].load() 

2513 

2514 

2515@lru_cache 

2516def abbreviate_launcher_class(cls): 

2517 """Abbreviate a launcher class back to its entrypoint name""" 

2518 cls_key = f"{cls.__module__}:{cls.__name__}" 

2519 # allow entrypoint_name attribute in case the definition module 

2520 # is not the same as the 'import' module 

2521 if getattr(cls, 'entrypoint_name', None): 

2522 return getattr(cls, 'entrypoint_name') 

2523 

2524 for kind in ('controller', 'engine'): 

2525 group_name = f'ipyparallel.{kind}_launchers' 

2526 group = entry_points(group=group_name) 

2527 for entrypoint in group: 

2528 if entrypoint.value == cls_key: 

2529 return entrypoint.name.lower() 

2530 return cls_key