Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipyparallel/cluster/shellcmd.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

226 statements  

1#!/usr/bin/env python 

2""" 

3Shell helper application for OS independent shell commands 

4 

5Currently, the following command are supported: 

6* start: starts a process into background and returns its pid 

7* running: check if a process with a given pid is running 

8* kill: kill a process with a given pid 

9* mkdir: creates directory recursively (no error if directory already exists) 

10* rmdir: remove directory recursively 

11* exists: checks if a file/directory exists 

12* remove: removes a file 

13""" 

14 

15import base64 

16import inspect 

17import json 

18import logging 

19import pathlib 

20import re 

21import sys 

22import time 

23import warnings 

24from subprocess import CalledProcessError, Popen, TimeoutExpired, check_output 

25from tempfile import NamedTemporaryFile 

26 

27from . import shellcmd_receive 

28 

29_shell_cmd_receive_tpl = ''' 

30import base64, json 

31 

32{define_receive} 

33 

34def _decode(b64_json): 

35 return json.loads(base64.b64decode(b64_json).decode("utf8")) 

36 

37inputs_encoded = "{inputs_encoded}" 

38inputs = _decode(inputs_encoded) 

39receive_parameters=inputs["receive_parameters"] 

40command_parameters=inputs["command_parameters"] 

41 

42with ShellCommandReceive(**receive_parameters) as r: 

43 r.{method}(**command_parameters) 

44''' 

45 

46_py_detached_tpl = ''' 

47import json 

48input_json = """{json_params}""" 

49inputs = json.loads(json_params) 

50 

51from subprocess import run, PIPE, STDOUT 

52 

53with open(inputs['input_filename'], 'r') as f: 

54 stdin = f.read() 

55 

56with open(inputs['output_filename'], 'w') as f: 

57 run(inputs["cmd_args"], input=stdin, stdout=f, stderr=STDOUT, check=True, text=True) 

58''' 

59 

60 

61def _encode(inputs): 

62 """return encoded input parameters 

63 

64 Should be shell escaping-safe 

65 inverse is defined in _shell_cmd_receive_tpl 

66 """ 

67 return base64.b64encode(json.dumps(inputs).encode("utf8")).decode("ascii") 

68 

69 

70class ShellCommandSend: 

71 """ 

72 Helper class for sending shell commands in generic and OS independent form 

73 

74 The class is designed to send shell commands to an ssh connection. Nevertheless, it can be 

75 used for send commands to different local shell as well, which is useful for testing. Since 

76 the concept uses the ipyparallel package (class ipyparallel.cluster.ShellCommandReceive) for 

77 executing the commands, it is necessary that a valid python installation (python_path) is provided. 

78 By calling has_python this can be evaluated. Furthermore, get_shell_info can be called to retrieve OS and 

79 shell information independent of a valid python installation. 

80 

81 Since some operation require different handling for different platforms/shells the class gathers 

82 necessary information during initialization. This is done per default at object instantiation. 

83 Nevertheless, it can be postponed (initialize=False) but before any member function is called, the 

84 initialize function has to be called. 

85 

86 Beside generic check_output[...] functions (equivalent to subprocess.check_output), the class provides 

87 specific shell commands which have a cmd_ prefix. When adding new functions make sure that an 

88 equivalent is added in the ShellCommandReceive class as well. 

89 

90 To start processes through an ssh connection on a windows server that stays alife after the ssh connection 

91 is closed, the process must be started with the breakaway creation flag set. Which works fine on 'normal' 

92 machine and VMs is denied on windows github runners (I guess because of security reasons). Hence, it was 

93 necessary to implement a work-a-round to enable CI in github. In case breakaway is not supported, a detached 

94 ssh connection is started (by the ShellCommandSend object) which stays open until the process to start has 

95 finished. see _cmd_send for further details. 

96 

97 The class supports a special send receiver mode (see self.send_receiver_code), which allows command sending 

98 without ipyparallel being installed on the 'other side'. In this mode basically the shellcmd_receive.py file 

99 is also transferred, which makes development and testing much easier. 

100 """ 

101 

102 package_name = "ipyparallel.shellcmd" # package name for send the command 

103 output_template = re.compile( 

104 r"__([a-z][a-z0-9_]+)=([a-z0-9\-\.]+)__", re.IGNORECASE 

105 ) 

106 receiver_code = pathlib.Path( 

107 inspect.getfile(shellcmd_receive) 

108 ).read_text() # get full code of receiver side 

109 receiver_import = ( 

110 "from ipyparallel.cluster.shellcmd_receive import ShellCommandReceive" 

111 ) 

112 

113 def __init__( 

114 self, 

115 shell, 

116 args, 

117 python_path, 

118 initialize=True, 

119 send_receiver_code=False, 

120 log=None, 

121 ): 

122 self.shell = shell 

123 self.args = args 

124 self.python_path = python_path 

125 

126 # shell dependent values. Those values are determined in the initialize function 

127 self.shell_info = None 

128 self._win = sys.platform.lower().startswith("win") 

129 self.is_powershell = None # flag if shell is windows powershell (requires special parameter quoting) 

130 self.breakaway_support = None # flag if process creation support the break_away flag (relevant for windows only; None under linux) 

131 self.join_params = True # join all cmd params into a single param. does NOT work with windows cmd 

132 # equivalent to os.pathsep (will be changed during initialization) 

133 self.pathsep = "/" 

134 

135 # should be activated when developing... 

136 self.send_receiver_code = send_receiver_code 

137 self.debugging = False # if activated an output log (~/ipp_shellcmd.log) is created on receiver side 

138 self.log = log or logging.getLogger(__name__) 

139 

140 if initialize: 

141 self.initialize() 

142 

143 def _check_output(self, cmd, **kwargs): 

144 kwargs.setdefault("text", True) 

145 self.log.debug("check_output %s", cmd) 

146 return check_output(cmd, **kwargs) 

147 

148 def _runs_successful(self, cmd): 

149 self.log.debug("Checking if %s runs successfully", cmd) 

150 try: 

151 check_output(cmd, input="") 

152 except CalledProcessError as e: 

153 return False 

154 return True 

155 

156 @staticmethod 

157 def _as_list(cmd): 

158 if isinstance(cmd, str): 

159 return [cmd] 

160 elif isinstance(cmd, list): 

161 return cmd 

162 else: 

163 raise TypeError(f"Unknown command type: {cmd!r}") 

164 

165 def _cmd_start_windows_no_breakaway(self, cmd_args, py_cmd): 

166 # if windows platform doesn't support breakaway flag (e.g. Github Runner) 

167 # we need to start a detached process (as work-a-round), the runs until the 

168 # 'remote' process has finished. But we cannot directly start the command as detached 

169 # process, since redirection (for retrieving the pid) doesn't work. We need a detached 

170 # proxy process that redirects output the to file, that can be read by current process 

171 # to retrieve the pid. 

172 

173 assert self._win 

174 from subprocess import DETACHED_PROCESS 

175 

176 tmp = NamedTemporaryFile( 

177 mode="w", delete=False 

178 ) # use python to generate a tempfile name 

179 fo_name = tmp.name 

180 tmp.close() 

181 fi_name = ( 

182 fo_name + "_stdin.py" 

183 ) # use tempfile name as basis for python script input 

184 with open(fi_name, "w") as f: 

185 f.write(py_cmd) 

186 

187 # simple python code that starts the actual cmd in a detached process 

188 inputs = dict( 

189 input_filename=fi_name, 

190 output_filename=fo_name, 

191 cmd_args=cmd_args, 

192 ) 

193 self.log.debug("Starting detached process with inputs %s", inputs) 

194 input_json = json.dumps(inputs) 

195 py_detached = _py_detached_tpl.format(input_json=input_json) 

196 

197 # now start proxy process detached 

198 self.log.debug("[ShellCommandSend._cmd_send] starting detached process...") 

199 self.log.debug("[ShellCommandSend._cmd_send] python command: \n%s", py_cmd) 

200 try: 

201 p = Popen( 

202 [sys.executable, '-c', py_detached], 

203 close_fds=True, 

204 creationflags=DETACHED_PROCESS, 

205 ) 

206 except Exception as e: 

207 self.log.error(f"[ShellCommandSend._cmd_send] detached process failed: {e}") 

208 raise e 

209 self.log.debug( 

210 "[ShellCommandSend._cmd_send] detached process started successful. Waiting for redirected output (pid)..." 

211 ) 

212 

213 # retrieve (remote) pid from output file 

214 output = "" 

215 while True: 

216 with open(fo_name) as f: 

217 output = f.read() 

218 if len(output) > 0: 

219 break 

220 if p.poll() is not None: 

221 if p.returncode != 0: 

222 raise CalledProcessError(p.returncode, "cmd_start") 

223 else: 

224 raise Exception( 

225 "internal error: no pid returned, although exit code of process was 0" 

226 ) 

227 

228 time.sleep(0.1) # wait a 0.1s and repeat 

229 

230 if not output: 

231 self.log.error("[ShellCommandSend._cmd_send] no output received!") 

232 else: 

233 self.log.debug("[ShellCommandSend._cmd_send] output received: %s", output) 

234 

235 return output 

236 

237 def _cmd_send(self, method, **kwargs): 

238 # in send receiver mode it is not required that the ipyparallel.cluster.shellcmd 

239 # exists (or is update to date) on the 'other' side of the shell. This is particular 

240 # useful when doing further development without copying the adapted file before each 

241 # test run. Furthermore, the calls are much faster. 

242 receive_params = {} 

243 

244 # make sure that env is a dictionary with only str entries (for key and value; value can be null as well) 

245 

246 if self.debugging: 

247 receive_params["debugging"] = True 

248 receive_params["log"] = '~/ipp_shellcmd.log' 

249 if self.breakaway_support is False: 

250 receive_params["use_breakaway"] = False 

251 

252 py_cmd = _shell_cmd_receive_tpl.format( 

253 define_receive=self.receiver_code 

254 if self.send_receiver_code 

255 else self.receiver_import, 

256 inputs_encoded=_encode( 

257 { 

258 "receive_parameters": receive_params, 

259 "command_parameters": kwargs, 

260 } 

261 ), 

262 method=method, 

263 ) 

264 cmd_args = self.shell + self.args + [self.python_path] 

265 if method == 'start' and self.breakaway_support is False: 

266 return self._cmd_start_windows_no_breakaway(cmd_args, py_cmd) 

267 else: 

268 return self._check_output(cmd_args, universal_newlines=True, input=py_cmd) 

269 

270 def _get_pid(self, output): 

271 # need to extract pid value 

272 values = dict(self.output_template.findall(output)) 

273 if 'remote_pid' in values: 

274 return int(values['remote_pid']) 

275 else: 

276 raise RuntimeError(f"Failed to get pid from output: {output}") 

277 

278 def _check_for_break_away_flag(self): 

279 assert self._win 

280 assert self.python_path is not None 

281 py_code = "import subprocess; subprocess.Popen(['cmd.exe', '/C'], close_fds=True, creationflags=subprocess.CREATE_BREAKAWAY_FROM_JOB); print('successful')" 

282 cmd = ( 

283 self.shell 

284 + self.args 

285 + [ 

286 self.python_path, 

287 '-c', 

288 f'"{py_code}"', 

289 ] 

290 ) 

291 try: 

292 # non-zero return code (in pwsh) or empty output (in cmd), if break_away test fails 

293 output = self._check_output(cmd).strip() 

294 except Exception as e: 

295 # just to see which exception was thrown 

296 warnings.warn( 

297 f"Break away test exception: {e!r}", 

298 UserWarning, 

299 stacklevel=4, 

300 ) 

301 output = "" 

302 

303 return output == "successful" 

304 

305 def initialize(self): 

306 """initialize necessary variables by sending an echo command that works on all OS and shells""" 

307 if self.shell_info: 

308 return 

309 # example outputs on 

310 # windows-powershell: OS-WIN-CMD=%OS%;OS-WIN-PW=Windows_NT;OS-LINUX=;SHELL= 

311 # windows-cmd : "OS-WIN-CMD=Windows_NT;OS-WIN-PW=$env:OS;OS-LINUX=$OSTYPE;SHELL=$SHELL" 

312 # ubuntu-bash : OS-WIN-CMD=Windows_NT;OS-WIN-PW=:OS;OS-LINUX=linux-gnu;SHELL=/bin/bash 

313 # macos 11 : OS-WIN-CMD=%OS%;OS-WIN-PW=:OS;OS-LINUX=darwin20;SHELL=/bin/bash 

314 cmd = ( 

315 self.shell 

316 + self.args 

317 + ['echo "OS-WIN-CMD=%OS%;OS-WIN-PW=$env:OS;OS-LINUX=$OSTYPE;SHELL=$SHELL"'] 

318 ) 

319 timeout = 10 

320 try: 

321 output = self._check_output( 

322 cmd, timeout=timeout 

323 ) # timeout for command is set to 10s 

324 except CalledProcessError as e: 

325 raise Exception( 

326 "Unable to get remote shell information. Are the ssh connection data correct?" 

327 ) 

328 except TimeoutExpired as e: 

329 raise Exception( 

330 f"Timeout of {timeout}s reached while retrieving remote shell information. Are the ssh connection data correct?" 

331 ) 

332 

333 entries = output.strip().strip('\\"').strip('"').split(";") 

334 # filter output non valid entries: contains =$ or =% or has no value assign (.....=) 

335 valid_entries = list( 

336 filter( 

337 lambda e: not ("=$" in e or "=%" in e or "=:" in e or e[-1] == '='), 

338 entries, 

339 ) 

340 ) 

341 system = shell = None 

342 

343 # currently we do not check if double entries are found 

344 for e in valid_entries: 

345 key, val = e.split("=") 

346 if key == "OS-WIN-CMD": 

347 system = val 

348 shell = "cmd.exe" 

349 self.is_powershell = False 

350 self._win = True 

351 self.join_params = ( 

352 False # disable joining, since it does not work for windows cmd.exe 

353 ) 

354 self.pathsep = "\\" 

355 elif key == "OS-WIN-PW": 

356 system = val 

357 shell = "powershell.exe" 

358 self.is_powershell = True 

359 self._win = True 

360 self.pathsep = "\\" 

361 elif key == "OS-LINUX": 

362 system = val 

363 self.is_powershell = False 

364 self._win = False 

365 elif key == "SHELL": 

366 shell = val 

367 

368 if self._win and self.python_path is not None: 

369 self.breakaway_support = self._check_for_break_away_flag() # check if break away flag is available (its not in windows github runners) 

370 

371 self.shell_info = (system, shell) 

372 

373 def get_shell_info(self): 

374 """ 

375 get shell information 

376 :return: (str, str): string of system and shell 

377 """ 

378 assert self.shell_info # make sure that initialize was called already 

379 return self.shell_info 

380 

381 def has_python(self, python_path=None): 

382 """Check if remote python can be started ('python --version') 

383 :return: bool: flag if python start was found 

384 """ 

385 assert self.shell_info # make sure that initialize was called already 

386 if not python_path: 

387 python_path = self.python_path 

388 cmd = self.shell + self.args + [python_path, '--version'] 

389 return self._runs_successful(cmd) 

390 

391 def check_output(self, cmd, **kwargs): 

392 """subprocess.check_output call using the shell connection 

393 :param cmd: command (str or list of strs) that should be executed 

394 :param kwargs: additional parameters that are passed to subprocess.check_output 

395 :return: output of executed command""" 

396 assert self.shell_info # make sure that initialize was called already 

397 full_cmd = self.shell + self.args + self._as_list(cmd) 

398 return self._check_output(full_cmd, **kwargs) 

399 

400 def check_output_python_module(self, module_params, **kwargs): 

401 """subprocess.check_output call based on python module call 

402 :param module_params: python module and parameters (str or list of strs) that should be executed 

403 :param kwargs: additional parameters that are passed to subprocess.check_output 

404 :return: output of executed command 

405 """ 

406 assert self.shell_info # make sure that initialize was called already 

407 cmd = ( 

408 self.shell 

409 + self.args 

410 + [self.python_path, "-m"] 

411 + self._as_list(module_params) 

412 ) 

413 return self._check_output(cmd, **kwargs) 

414 

415 def check_output_python_code(self, python_code, **kwargs): 

416 """subprocess.check_output call running the provided python code 

417 :param python_code: code that should be executed 

418 :param kwargs: additional parameters that are passed to subprocess.check_output 

419 :return: output of executed command 

420 """ 

421 assert self.shell_info # make sure that initialize was called already 

422 assert "input" not in kwargs # must not be specified 

423 assert "universal_newlines" not in kwargs # must not be specified 

424 cmd = self.shell + self.args + [self.python_path] 

425 return self._check_output( 

426 cmd, universal_newlines=True, input=python_code, **kwargs 

427 ) 

428 

429 def cmd_start(self, cmd, env=None, output_file=None): 

430 """starts command into background and return remote pid 

431 :param cmd: command (str or list of strs) that should be started 

432 :param env: dictionary of environment variable that should be set/unset before starting the process 

433 :param output_file: stdout and stderr will be redirected to the (remote) file 

434 :return: pid of started process 

435 """ 

436 # join commands into a single parameter. otherwise 

437 assert self.shell_info # make sure that initialize was called already 

438 return self._get_pid( 

439 self._cmd_send("cmd_start", cmd=cmd, env=env, output_file=output_file) 

440 ) 

441 

442 def cmd_start_python_module(self, module_params, env=None, output_file=None): 

443 """start python module into background and return remote pid 

444 :param module_params: python module and parameters (str or list of strs) that should be executed 

445 :param env: dictionary of environment variable that should be set before starting the process 

446 :param output_file: stdout and stderr will be redirected to the (remote) file 

447 :return: pid of started process 

448 """ 

449 assert self.shell_info # make sure that initialize was called already 

450 cmd = [self.python_path, "-m"] + self._as_list(module_params) 

451 return self._get_pid( 

452 self._cmd_send("cmd_start", cmd=cmd, env=env, output_file=output_file) 

453 ) 

454 

455 def cmd_start_python_code(self, python_code, env=None, output_file=None): 

456 """start python with provided code into background and return remote pid 

457 :param python_code: code that should be executed 

458 :param env: dictionary of environment variable that should be set before starting the process 

459 :param output_file: stdout and stderr will be redirected to the (remote) file 

460 :return: pid of started process 

461 """ 

462 assert self.shell_info # make sure that initialize was called already 

463 # encoding the python code as base64 stream and decoding on the other side, remove any complicated 

464 # quoting strategy. We do not check the length of the code, which could exceed the shell parameter 

465 # string limit. Since the limit is typically > 2k, little code snippets should not cause any problems. 

466 encoded = base64.b64encode(python_code.encode()) 

467 py_cmd = f'import base64;exec(base64.b64decode({encoded}).decode())' 

468 if self._win: 

469 py_cmd = f'"{py_cmd}"' 

470 cmd = [self.python_path, "-c", py_cmd] 

471 return self._get_pid( 

472 self._cmd_send("cmd_start", cmd=cmd, env=env, output_file=output_file) 

473 ) 

474 

475 def cmd_running(self, pid): 

476 """check if given (remote) pid is running""" 

477 assert self.shell_info # make sure that initialize was called already 

478 output = self._cmd_send("cmd_running", pid=pid) 

479 # check output 

480 if "__running=1__" in output: 

481 return True 

482 elif "__running=0__" in output: 

483 return False 

484 else: 

485 raise Exception( 

486 f"Unexpected output ({output}) returned from by the running shell command" 

487 ) 

488 

489 def cmd_kill(self, pid, sig=None): 

490 """kill (remote) process with the given pid""" 

491 assert self.shell_info # make sure that initialize was called already 

492 if sig: 

493 self._cmd_send("cmd_kill", pid=pid, sig=int(sig)) 

494 else: 

495 self._cmd_send("cmd_kill", pid=pid) 

496 

497 def cmd_mkdir(self, path): 

498 """make directory recursively""" 

499 assert self.shell_info # make sure that initialize was called already 

500 self._cmd_send("cmd_mkdir", path=path) 

501 

502 def cmd_rmdir(self, path): 

503 """remove directory recursively""" 

504 assert self.shell_info # make sure that initialize was called already 

505 self._cmd_send("cmd_rmdir", path=path) 

506 

507 def cmd_exists(self, path): 

508 """check if file/path exists""" 

509 assert self.shell_info # make sure that initialize was called already 

510 output = self._cmd_send("cmd_exists", path=path) 

511 # check output 

512 if "__exists=1__" in output: 

513 return True 

514 elif "__exists=0__" in output: 

515 return False 

516 else: 

517 raise Exception( 

518 f"Unexpected output ({output}) returned from by the exists shell command" 

519 ) 

520 

521 def cmd_remove(self, path): 

522 """delete remote file""" 

523 assert self.shell_info # make sure that initialize was called already 

524 output = self._cmd_send("cmd_remove", path=path)