1#!/usr/bin/env python
2"""
3Shell helper application for OS independent shell commands
4
5Currently, the following command are supported:
6* start: starts a process into background and returns its pid
7* running: check if a process with a given pid is running
8* kill: kill a process with a given pid
9* mkdir: creates directory recursively (no error if directory already exists)
10* rmdir: remove directory recursively
11* exists: checks if a file/directory exists
12* remove: removes a file
13"""
14
15import base64
16import inspect
17import json
18import logging
19import pathlib
20import re
21import sys
22import time
23import warnings
24from subprocess import CalledProcessError, Popen, TimeoutExpired, check_output
25from tempfile import NamedTemporaryFile
26
27from . import shellcmd_receive
28
29_shell_cmd_receive_tpl = '''
30import base64, json
31
32{define_receive}
33
34def _decode(b64_json):
35 return json.loads(base64.b64decode(b64_json).decode("utf8"))
36
37inputs_encoded = "{inputs_encoded}"
38inputs = _decode(inputs_encoded)
39receive_parameters=inputs["receive_parameters"]
40command_parameters=inputs["command_parameters"]
41
42with ShellCommandReceive(**receive_parameters) as r:
43 r.{method}(**command_parameters)
44'''
45
46_py_detached_tpl = '''
47import json
48input_json = """{json_params}"""
49inputs = json.loads(json_params)
50
51from subprocess import run, PIPE, STDOUT
52
53with open(inputs['input_filename'], 'r') as f:
54 stdin = f.read()
55
56with open(inputs['output_filename'], 'w') as f:
57 run(inputs["cmd_args"], input=stdin, stdout=f, stderr=STDOUT, check=True, text=True)
58'''
59
60
61def _encode(inputs):
62 """return encoded input parameters
63
64 Should be shell escaping-safe
65 inverse is defined in _shell_cmd_receive_tpl
66 """
67 return base64.b64encode(json.dumps(inputs).encode("utf8")).decode("ascii")
68
69
70class ShellCommandSend:
71 """
72 Helper class for sending shell commands in generic and OS independent form
73
74 The class is designed to send shell commands to an ssh connection. Nevertheless, it can be
75 used for send commands to different local shell as well, which is useful for testing. Since
76 the concept uses the ipyparallel package (class ipyparallel.cluster.ShellCommandReceive) for
77 executing the commands, it is necessary that a valid python installation (python_path) is provided.
78 By calling has_python this can be evaluated. Furthermore, get_shell_info can be called to retrieve OS and
79 shell information independent of a valid python installation.
80
81 Since some operation require different handling for different platforms/shells the class gathers
82 necessary information during initialization. This is done per default at object instantiation.
83 Nevertheless, it can be postponed (initialize=False) but before any member function is called, the
84 initialize function has to be called.
85
86 Beside generic check_output[...] functions (equivalent to subprocess.check_output), the class provides
87 specific shell commands which have a cmd_ prefix. When adding new functions make sure that an
88 equivalent is added in the ShellCommandReceive class as well.
89
90 To start processes through an ssh connection on a windows server that stays alife after the ssh connection
91 is closed, the process must be started with the breakaway creation flag set. Which works fine on 'normal'
92 machine and VMs is denied on windows github runners (I guess because of security reasons). Hence, it was
93 necessary to implement a work-a-round to enable CI in github. In case breakaway is not supported, a detached
94 ssh connection is started (by the ShellCommandSend object) which stays open until the process to start has
95 finished. see _cmd_send for further details.
96
97 The class supports a special send receiver mode (see self.send_receiver_code), which allows command sending
98 without ipyparallel being installed on the 'other side'. In this mode basically the shellcmd_receive.py file
99 is also transferred, which makes development and testing much easier.
100 """
101
102 package_name = "ipyparallel.shellcmd" # package name for send the command
103 output_template = re.compile(
104 r"__([a-z][a-z0-9_]+)=([a-z0-9\-\.]+)__", re.IGNORECASE
105 )
106 receiver_code = pathlib.Path(
107 inspect.getfile(shellcmd_receive)
108 ).read_text() # get full code of receiver side
109 receiver_import = (
110 "from ipyparallel.cluster.shellcmd_receive import ShellCommandReceive"
111 )
112
113 def __init__(
114 self,
115 shell,
116 args,
117 python_path,
118 initialize=True,
119 send_receiver_code=False,
120 log=None,
121 ):
122 self.shell = shell
123 self.args = args
124 self.python_path = python_path
125
126 # shell dependent values. Those values are determined in the initialize function
127 self.shell_info = None
128 self._win = sys.platform.lower().startswith("win")
129 self.is_powershell = None # flag if shell is windows powershell (requires special parameter quoting)
130 self.breakaway_support = None # flag if process creation support the break_away flag (relevant for windows only; None under linux)
131 self.join_params = True # join all cmd params into a single param. does NOT work with windows cmd
132 # equivalent to os.pathsep (will be changed during initialization)
133 self.pathsep = "/"
134
135 # should be activated when developing...
136 self.send_receiver_code = send_receiver_code
137 self.debugging = False # if activated an output log (~/ipp_shellcmd.log) is created on receiver side
138 self.log = log or logging.getLogger(__name__)
139
140 if initialize:
141 self.initialize()
142
143 def _check_output(self, cmd, **kwargs):
144 kwargs.setdefault("text", True)
145 self.log.debug("check_output %s", cmd)
146 return check_output(cmd, **kwargs)
147
148 def _runs_successful(self, cmd):
149 self.log.debug("Checking if %s runs successfully", cmd)
150 try:
151 check_output(cmd, input="")
152 except CalledProcessError as e:
153 return False
154 return True
155
156 @staticmethod
157 def _as_list(cmd):
158 if isinstance(cmd, str):
159 return [cmd]
160 elif isinstance(cmd, list):
161 return cmd
162 else:
163 raise TypeError(f"Unknown command type: {cmd!r}")
164
165 def _cmd_start_windows_no_breakaway(self, cmd_args, py_cmd):
166 # if windows platform doesn't support breakaway flag (e.g. Github Runner)
167 # we need to start a detached process (as work-a-round), the runs until the
168 # 'remote' process has finished. But we cannot directly start the command as detached
169 # process, since redirection (for retrieving the pid) doesn't work. We need a detached
170 # proxy process that redirects output the to file, that can be read by current process
171 # to retrieve the pid.
172
173 assert self._win
174 from subprocess import DETACHED_PROCESS
175
176 tmp = NamedTemporaryFile(
177 mode="w", delete=False
178 ) # use python to generate a tempfile name
179 fo_name = tmp.name
180 tmp.close()
181 fi_name = (
182 fo_name + "_stdin.py"
183 ) # use tempfile name as basis for python script input
184 with open(fi_name, "w") as f:
185 f.write(py_cmd)
186
187 # simple python code that starts the actual cmd in a detached process
188 inputs = dict(
189 input_filename=fi_name,
190 output_filename=fo_name,
191 cmd_args=cmd_args,
192 )
193 self.log.debug("Starting detached process with inputs %s", inputs)
194 input_json = json.dumps(inputs)
195 py_detached = _py_detached_tpl.format(input_json=input_json)
196
197 # now start proxy process detached
198 self.log.debug("[ShellCommandSend._cmd_send] starting detached process...")
199 self.log.debug("[ShellCommandSend._cmd_send] python command: \n%s", py_cmd)
200 try:
201 p = Popen(
202 [sys.executable, '-c', py_detached],
203 close_fds=True,
204 creationflags=DETACHED_PROCESS,
205 )
206 except Exception as e:
207 self.log.error(f"[ShellCommandSend._cmd_send] detached process failed: {e}")
208 raise e
209 self.log.debug(
210 "[ShellCommandSend._cmd_send] detached process started successful. Waiting for redirected output (pid)..."
211 )
212
213 # retrieve (remote) pid from output file
214 output = ""
215 while True:
216 with open(fo_name) as f:
217 output = f.read()
218 if len(output) > 0:
219 break
220 if p.poll() is not None:
221 if p.returncode != 0:
222 raise CalledProcessError(p.returncode, "cmd_start")
223 else:
224 raise Exception(
225 "internal error: no pid returned, although exit code of process was 0"
226 )
227
228 time.sleep(0.1) # wait a 0.1s and repeat
229
230 if not output:
231 self.log.error("[ShellCommandSend._cmd_send] no output received!")
232 else:
233 self.log.debug("[ShellCommandSend._cmd_send] output received: %s", output)
234
235 return output
236
237 def _cmd_send(self, method, **kwargs):
238 # in send receiver mode it is not required that the ipyparallel.cluster.shellcmd
239 # exists (or is update to date) on the 'other' side of the shell. This is particular
240 # useful when doing further development without copying the adapted file before each
241 # test run. Furthermore, the calls are much faster.
242 receive_params = {}
243
244 # make sure that env is a dictionary with only str entries (for key and value; value can be null as well)
245
246 if self.debugging:
247 receive_params["debugging"] = True
248 receive_params["log"] = '~/ipp_shellcmd.log'
249 if self.breakaway_support is False:
250 receive_params["use_breakaway"] = False
251
252 py_cmd = _shell_cmd_receive_tpl.format(
253 define_receive=self.receiver_code
254 if self.send_receiver_code
255 else self.receiver_import,
256 inputs_encoded=_encode(
257 {
258 "receive_parameters": receive_params,
259 "command_parameters": kwargs,
260 }
261 ),
262 method=method,
263 )
264 cmd_args = self.shell + self.args + [self.python_path]
265 if method == 'start' and self.breakaway_support is False:
266 return self._cmd_start_windows_no_breakaway(cmd_args, py_cmd)
267 else:
268 return self._check_output(cmd_args, universal_newlines=True, input=py_cmd)
269
270 def _get_pid(self, output):
271 # need to extract pid value
272 values = dict(self.output_template.findall(output))
273 if 'remote_pid' in values:
274 return int(values['remote_pid'])
275 else:
276 raise RuntimeError(f"Failed to get pid from output: {output}")
277
278 def _check_for_break_away_flag(self):
279 assert self._win
280 assert self.python_path is not None
281 py_code = "import subprocess; subprocess.Popen(['cmd.exe', '/C'], close_fds=True, creationflags=subprocess.CREATE_BREAKAWAY_FROM_JOB); print('successful')"
282 cmd = (
283 self.shell
284 + self.args
285 + [
286 self.python_path,
287 '-c',
288 f'"{py_code}"',
289 ]
290 )
291 try:
292 # non-zero return code (in pwsh) or empty output (in cmd), if break_away test fails
293 output = self._check_output(cmd).strip()
294 except Exception as e:
295 # just to see which exception was thrown
296 warnings.warn(
297 f"Break away test exception: {e!r}",
298 UserWarning,
299 stacklevel=4,
300 )
301 output = ""
302
303 return output == "successful"
304
305 def initialize(self):
306 """initialize necessary variables by sending an echo command that works on all OS and shells"""
307 if self.shell_info:
308 return
309 # example outputs on
310 # windows-powershell: OS-WIN-CMD=%OS%;OS-WIN-PW=Windows_NT;OS-LINUX=;SHELL=
311 # windows-cmd : "OS-WIN-CMD=Windows_NT;OS-WIN-PW=$env:OS;OS-LINUX=$OSTYPE;SHELL=$SHELL"
312 # ubuntu-bash : OS-WIN-CMD=Windows_NT;OS-WIN-PW=:OS;OS-LINUX=linux-gnu;SHELL=/bin/bash
313 # macos 11 : OS-WIN-CMD=%OS%;OS-WIN-PW=:OS;OS-LINUX=darwin20;SHELL=/bin/bash
314 cmd = (
315 self.shell
316 + self.args
317 + ['echo "OS-WIN-CMD=%OS%;OS-WIN-PW=$env:OS;OS-LINUX=$OSTYPE;SHELL=$SHELL"']
318 )
319 timeout = 10
320 try:
321 output = self._check_output(
322 cmd, timeout=timeout
323 ) # timeout for command is set to 10s
324 except CalledProcessError as e:
325 raise Exception(
326 "Unable to get remote shell information. Are the ssh connection data correct?"
327 )
328 except TimeoutExpired as e:
329 raise Exception(
330 f"Timeout of {timeout}s reached while retrieving remote shell information. Are the ssh connection data correct?"
331 )
332
333 entries = output.strip().strip('\\"').strip('"').split(";")
334 # filter output non valid entries: contains =$ or =% or has no value assign (.....=)
335 valid_entries = list(
336 filter(
337 lambda e: not ("=$" in e or "=%" in e or "=:" in e or e[-1] == '='),
338 entries,
339 )
340 )
341 system = shell = None
342
343 # currently we do not check if double entries are found
344 for e in valid_entries:
345 key, val = e.split("=")
346 if key == "OS-WIN-CMD":
347 system = val
348 shell = "cmd.exe"
349 self.is_powershell = False
350 self._win = True
351 self.join_params = (
352 False # disable joining, since it does not work for windows cmd.exe
353 )
354 self.pathsep = "\\"
355 elif key == "OS-WIN-PW":
356 system = val
357 shell = "powershell.exe"
358 self.is_powershell = True
359 self._win = True
360 self.pathsep = "\\"
361 elif key == "OS-LINUX":
362 system = val
363 self.is_powershell = False
364 self._win = False
365 elif key == "SHELL":
366 shell = val
367
368 if self._win and self.python_path is not None:
369 self.breakaway_support = self._check_for_break_away_flag() # check if break away flag is available (its not in windows github runners)
370
371 self.shell_info = (system, shell)
372
373 def get_shell_info(self):
374 """
375 get shell information
376 :return: (str, str): string of system and shell
377 """
378 assert self.shell_info # make sure that initialize was called already
379 return self.shell_info
380
381 def has_python(self, python_path=None):
382 """Check if remote python can be started ('python --version')
383 :return: bool: flag if python start was found
384 """
385 assert self.shell_info # make sure that initialize was called already
386 if not python_path:
387 python_path = self.python_path
388 cmd = self.shell + self.args + [python_path, '--version']
389 return self._runs_successful(cmd)
390
391 def check_output(self, cmd, **kwargs):
392 """subprocess.check_output call using the shell connection
393 :param cmd: command (str or list of strs) that should be executed
394 :param kwargs: additional parameters that are passed to subprocess.check_output
395 :return: output of executed command"""
396 assert self.shell_info # make sure that initialize was called already
397 full_cmd = self.shell + self.args + self._as_list(cmd)
398 return self._check_output(full_cmd, **kwargs)
399
400 def check_output_python_module(self, module_params, **kwargs):
401 """subprocess.check_output call based on python module call
402 :param module_params: python module and parameters (str or list of strs) that should be executed
403 :param kwargs: additional parameters that are passed to subprocess.check_output
404 :return: output of executed command
405 """
406 assert self.shell_info # make sure that initialize was called already
407 cmd = (
408 self.shell
409 + self.args
410 + [self.python_path, "-m"]
411 + self._as_list(module_params)
412 )
413 return self._check_output(cmd, **kwargs)
414
415 def check_output_python_code(self, python_code, **kwargs):
416 """subprocess.check_output call running the provided python code
417 :param python_code: code that should be executed
418 :param kwargs: additional parameters that are passed to subprocess.check_output
419 :return: output of executed command
420 """
421 assert self.shell_info # make sure that initialize was called already
422 assert "input" not in kwargs # must not be specified
423 assert "universal_newlines" not in kwargs # must not be specified
424 cmd = self.shell + self.args + [self.python_path]
425 return self._check_output(
426 cmd, universal_newlines=True, input=python_code, **kwargs
427 )
428
429 def cmd_start(self, cmd, env=None, output_file=None):
430 """starts command into background and return remote pid
431 :param cmd: command (str or list of strs) that should be started
432 :param env: dictionary of environment variable that should be set/unset before starting the process
433 :param output_file: stdout and stderr will be redirected to the (remote) file
434 :return: pid of started process
435 """
436 # join commands into a single parameter. otherwise
437 assert self.shell_info # make sure that initialize was called already
438 return self._get_pid(
439 self._cmd_send("cmd_start", cmd=cmd, env=env, output_file=output_file)
440 )
441
442 def cmd_start_python_module(self, module_params, env=None, output_file=None):
443 """start python module into background and return remote pid
444 :param module_params: python module and parameters (str or list of strs) that should be executed
445 :param env: dictionary of environment variable that should be set before starting the process
446 :param output_file: stdout and stderr will be redirected to the (remote) file
447 :return: pid of started process
448 """
449 assert self.shell_info # make sure that initialize was called already
450 cmd = [self.python_path, "-m"] + self._as_list(module_params)
451 return self._get_pid(
452 self._cmd_send("cmd_start", cmd=cmd, env=env, output_file=output_file)
453 )
454
455 def cmd_start_python_code(self, python_code, env=None, output_file=None):
456 """start python with provided code into background and return remote pid
457 :param python_code: code that should be executed
458 :param env: dictionary of environment variable that should be set before starting the process
459 :param output_file: stdout and stderr will be redirected to the (remote) file
460 :return: pid of started process
461 """
462 assert self.shell_info # make sure that initialize was called already
463 # encoding the python code as base64 stream and decoding on the other side, remove any complicated
464 # quoting strategy. We do not check the length of the code, which could exceed the shell parameter
465 # string limit. Since the limit is typically > 2k, little code snippets should not cause any problems.
466 encoded = base64.b64encode(python_code.encode())
467 py_cmd = f'import base64;exec(base64.b64decode({encoded}).decode())'
468 if self._win:
469 py_cmd = f'"{py_cmd}"'
470 cmd = [self.python_path, "-c", py_cmd]
471 return self._get_pid(
472 self._cmd_send("cmd_start", cmd=cmd, env=env, output_file=output_file)
473 )
474
475 def cmd_running(self, pid):
476 """check if given (remote) pid is running"""
477 assert self.shell_info # make sure that initialize was called already
478 output = self._cmd_send("cmd_running", pid=pid)
479 # check output
480 if "__running=1__" in output:
481 return True
482 elif "__running=0__" in output:
483 return False
484 else:
485 raise Exception(
486 f"Unexpected output ({output}) returned from by the running shell command"
487 )
488
489 def cmd_kill(self, pid, sig=None):
490 """kill (remote) process with the given pid"""
491 assert self.shell_info # make sure that initialize was called already
492 if sig:
493 self._cmd_send("cmd_kill", pid=pid, sig=int(sig))
494 else:
495 self._cmd_send("cmd_kill", pid=pid)
496
497 def cmd_mkdir(self, path):
498 """make directory recursively"""
499 assert self.shell_info # make sure that initialize was called already
500 self._cmd_send("cmd_mkdir", path=path)
501
502 def cmd_rmdir(self, path):
503 """remove directory recursively"""
504 assert self.shell_info # make sure that initialize was called already
505 self._cmd_send("cmd_rmdir", path=path)
506
507 def cmd_exists(self, path):
508 """check if file/path exists"""
509 assert self.shell_info # make sure that initialize was called already
510 output = self._cmd_send("cmd_exists", path=path)
511 # check output
512 if "__exists=1__" in output:
513 return True
514 elif "__exists=0__" in output:
515 return False
516 else:
517 raise Exception(
518 f"Unexpected output ({output}) returned from by the exists shell command"
519 )
520
521 def cmd_remove(self, path):
522 """delete remote file"""
523 assert self.shell_info # make sure that initialize was called already
524 output = self._cmd_send("cmd_remove", path=path)