Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipyparallel/cluster/shellcmd_receive.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

153 statements  

1#!/usr/bin/env python 

2""" 

3Receiver side of OS independent shell commands 

4 

5For a list of supported commands see top of file shellcmd.py. 

6 

7Important: The shellcmd concept also supports receiver code sending (useful for testing and developing) 

8which transfers this file to the 'other side'. However, this limits the imports to standard python 

9packages. Hence, DO NOT USE ANY ipyparallel IMPORTS IN THIS FILE! 

10""" 

11 

12import logging 

13import os 

14import shutil 

15import sys 

16from abc import ABCMeta, abstractmethod 

17from contextlib import contextmanager 

18from pathlib import Path 

19from random import randint 

20from subprocess import DEVNULL, Popen 

21 

22 

23class ShellCommandReceiveBase(metaclass=ABCMeta): 

24 """ 

25 Base class for receiving and performing shell commands in a platform independent form 

26 

27 All supported shell commands have a cmd_ prefix. When adding new functions make sure that there is an 

28 equivalent in the ShellCommandSend class. When a command failed a non-zero exit code will be returned. 

29 Hence, the ShellCommandSend class always uses subprocess.check_output for assessing if the command was 

30 successful. Some command require information to be returned (cmd_exists, cmd_start, cmd_running) which 

31 is written to stdout in the following form: __<key>=<value>__ 

32 

33 This base class contains all platform independent code and commands. Only, cmd_start, cmd_running and 

34 cmd_kill needs to be overwritten in childern classes (see ShellCommandReceiveWindows and 

35 ShellCommandReceivePosix) 

36 """ 

37 

38 def _log(self, msg): 

39 if not self.log: 

40 return 

41 self.log.info(f"[id={self.ranid}] {msg}") 

42 

43 def __init__(self, debugging=False, log=None): 

44 self.debugging = debugging 

45 self.log = None 

46 if log is None: 

47 log = os.getenv("SHELLCMD_LOG") 

48 if log: 

49 log_file = Path(log).expanduser() 

50 self.log = logging.getLogger("shellcmd") 

51 self.log.setLevel(logging.DEBUG if self.debugging else logging.INFO) 

52 self.log.addHandler(logging.FileHandler(log_file, mode="a")) 

53 

54 self.ranid = None 

55 if self.log: 

56 self.ranid = randint(0, 999) 

57 self._log(f"{self.__class__.__name__} instance created") 

58 

59 def close(self): 

60 # perform possible clean up actions (currently not required) 

61 if self.log: 

62 self._log("ShellCommandReceiveBase closed") 

63 

64 def _prepare_cmd_start(self, start_cmd, env): 

65 if env: 

66 self._log(f"env={env!r}") 

67 if not isinstance(env, dict): 

68 raise TypeError(f"env must be a dict, got {env!r}") 

69 

70 # update environment 

71 for key, value in env.items(): 

72 if value is not None and value != '': 

73 # set entry 

74 os.environ[key] = str(value) 

75 else: 

76 # unset entry if needed 

77 if key in os.environ: 

78 del os.environ[key] 

79 

80 if isinstance(start_cmd, str): 

81 start_cmd = [start_cmd] 

82 

83 if not all(isinstance(item, str) for item in start_cmd): 

84 raise TypeError(f"Only str in start_cmd allowed ({start_cmd!r})") 

85 

86 return start_cmd 

87 

88 @abstractmethod 

89 def cmd_start(self, cmd, env=None, output_file=None): 

90 pass 

91 

92 @abstractmethod 

93 def cmd_running(self, pid): 

94 pass 

95 

96 @abstractmethod 

97 def cmd_kill(self, pid, sig=None): 

98 pass 

99 

100 def cmd_mkdir(self, path): 

101 self._log(f"Make directory '{path}'") 

102 os.makedirs(path, exist_ok=True) # we allow that the directory already exists 

103 

104 def cmd_rmdir(self, path): 

105 self._log(f"Remove directory '{path}'") 

106 shutil.rmtree(path) 

107 

108 def cmd_exists(self, path): 

109 self._log(f"Check if path exists '{path}'") 

110 if os.path.exists(path): 

111 print("__exists=1__") 

112 else: 

113 print("__exists=0__") 

114 

115 def cmd_remove(self, path): 

116 self._log(f"Remove file '{path}'") 

117 os.remove(path) 

118 

119 

120class ShellCommandReceiveWindows(ShellCommandReceiveBase): 

121 """Windows Implementation of ShellCommandReceive class""" 

122 

123 def __init__(self, debugging=False, use_breakaway=True, log=None): 

124 super().__init__(debugging, log) 

125 self.use_breakaway = use_breakaway 

126 

127 def cmd_start(self, cmd, env=None, output_file=None): 

128 start_cmd = self._prepare_cmd_start(cmd, env) 

129 

130 # under windows we need to remove embracing double quotes 

131 for idx, p in enumerate(start_cmd): 

132 if p[0] == '"' and p[-1] == '"': 

133 start_cmd[idx] = p.strip('"') 

134 

135 self._log(f"start_cmd={start_cmd} (use_breakaway={self.use_breakaway})") 

136 

137 from subprocess import CREATE_BREAKAWAY_FROM_JOB, CREATE_NEW_CONSOLE 

138 

139 flags = 0 

140 if self.use_breakaway: 

141 flags |= CREATE_NEW_CONSOLE 

142 flags |= CREATE_BREAKAWAY_FROM_JOB 

143 

144 pkwargs = { 

145 'close_fds': True, # close stdin/stdout/stderr on child 

146 'creationflags': flags, 

147 } 

148 if output_file: 

149 fo = open(output_file, "w") 

150 pkwargs['stdout'] = fo 

151 pkwargs['stderr'] = fo 

152 pkwargs['stdin'] = DEVNULL 

153 

154 self._log(f"Popen(**pkwargs={pkwargs})") 

155 p = Popen(start_cmd, **pkwargs) 

156 self._log(f"pid={p.pid}") 

157 

158 print(f'__remote_pid={p.pid}__') 

159 sys.stdout.flush() 

160 if not self.use_breakaway: 

161 self._log("before wait") 

162 p.wait() 

163 self._log("after wait") 

164 

165 def cmd_running(self, pid): 

166 self._log(f"Check if pid {pid} is running") 

167 

168 # taken from https://stackoverflow.com/questions/568271/how-to-check-if-there-exists-a-process-with-a-given-pid-in-python 

169 import ctypes 

170 

171 PROCESS_QUERY_INFROMATION = ( 

172 0x1000 # if actually PROCESS_QUERY_LIMITED_INFORMATION 

173 ) 

174 STILL_ACTIVE = 259 

175 processHandle = ctypes.windll.kernel32.OpenProcess( 

176 PROCESS_QUERY_INFROMATION, 0, pid 

177 ) 

178 if processHandle == 0: 

179 print('__running=0__') 

180 else: 

181 i = ctypes.c_int(0) 

182 pi = ctypes.pointer(i) 

183 if ctypes.windll.kernel32.GetExitCodeProcess(processHandle, pi) == 0: 

184 print('__running=0__') 

185 if i.value == STILL_ACTIVE: 

186 print('__running=1__') 

187 else: 

188 print('__running=0__') 

189 ctypes.windll.kernel32.CloseHandle(processHandle) 

190 

191 def cmd_kill(self, pid, sig=None): 

192 self._log(f"Kill pid {pid} (signal={sig})") 

193 

194 # os.kill doesn't work reliable under windows. also see 

195 # https://stackoverflow.com/questions/28551180/how-to-kill-subprocess-python-in-windows 

196 

197 # solution using taskill 

198 # import subprocess 

199 # subprocess.call(['taskkill', '/F', '/T', '/PID', str(pid)]) # /T kills all child processes as well 

200 

201 # use windows api to kill process (doesn't kill children processes) 

202 # To kill all children process things are more complicated. see e.g. 

203 # http://mackeblog.blogspot.com/2012/05/killing-subprocesses-on-windows-in.html 

204 import ctypes 

205 

206 PROCESS_TERMINATE = 0x0001 

207 kernel32 = ctypes.windll.kernel32 

208 processHandle = kernel32.OpenProcess(PROCESS_TERMINATE, 0, pid) 

209 if processHandle: 

210 kernel32.TerminateProcess( 

211 processHandle, 3 

212 ) # 3 is just an arbitrary exit code 

213 kernel32.CloseHandle(processHandle) 

214 

215 

216class ShellCommandReceivePosix(ShellCommandReceiveBase): 

217 """Posix implementation of the ShellCommandReceive class""" 

218 

219 def cmd_start(self, cmd, env=None, output_file=None): 

220 start_cmd = self._prepare_cmd_start(cmd, env) 

221 

222 fo = DEVNULL 

223 if output_file: 

224 fo = open(output_file, "w") 

225 

226 p = Popen( 

227 start_cmd, start_new_session=True, stdout=fo, stderr=fo, stdin=DEVNULL 

228 ) 

229 print(f'__remote_pid={p.pid}__') 

230 sys.stdout.flush() 

231 

232 def cmd_running(self, pid): 

233 self._log(f"Check if pid {pid} is running") 

234 try: 

235 # use os.kill with signal 0 to check if process is still running 

236 os.kill(pid, 0) 

237 print('__running=1__') 

238 except OSError: 

239 print('__running=0__') 

240 

241 def cmd_kill(self, pid, sig=None): 

242 self._log(f"Kill pid {pid} (signal={sig})") 

243 os.kill(pid, sig) 

244 

245 

246@contextmanager 

247def ShellCommandReceive(debugging=False, use_breakaway=True, log=None): 

248 """Generator returning the corresponding platform dependent ShellCommandReceive object (as Context Manager)""" 

249 if sys.platform.lower().startswith("win"): 

250 receiver = ShellCommandReceiveWindows(debugging, use_breakaway, log) 

251 else: 

252 receiver = ShellCommandReceivePosix(debugging, log) 

253 try: 

254 yield receiver 

255 finally: 

256 receiver.close()