Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/process_utils.py: 21%

153 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Utilities for running or stopping processes.""" 

19from __future__ import annotations 

20 

21import errno 

22import logging 

23import os 

24import select 

25import shlex 

26import signal 

27import subprocess 

28import sys 

29 

30from airflow.utils.platform import IS_WINDOWS 

31 

32if not IS_WINDOWS: 

33 import tty 

34 import termios 

35 import pty 

36 

37from contextlib import contextmanager 

38from typing import Generator 

39 

40import psutil 

41from lockfile.pidlockfile import PIDLockFile 

42 

43from airflow.configuration import conf 

44from airflow.exceptions import AirflowException 

45 

46log = logging.getLogger(__name__) 

47 

48# When killing processes, time to wait after issuing a SIGTERM before issuing a 

49# SIGKILL. 

50DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint("core", "KILLED_TASK_CLEANUP_TIME") 

51 

52 

53def reap_process_group( 

54 process_group_id: int, 

55 logger, 

56 sig: signal.Signals = signal.SIGTERM, 

57 timeout: int = DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM, 

58) -> dict[int, int]: 

59 """ 

60 Send sig (SIGTERM) to the process group of pid. 

61 

62 Tries really hard to terminate all processes in the group (including grandchildren). Will send 

63 sig (SIGTERM) to the process group of pid. If any process is alive after timeout 

64 a SIGKILL will be send. 

65 

66 :param process_group_id: process group id to kill. 

67 The process that wants to create the group should run 

68 `airflow.utils.process_utils.set_new_process_group()` as the first command 

69 it executes which will set group id = process_id. Effectively the process that is the 

70 "root" of the group has pid = gid and all other processes in the group have different 

71 pids but the same gid (equal the pid of the root process) 

72 :param logger: log handler 

73 :param sig: signal type 

74 :param timeout: how much time a process has to terminate 

75 """ 

76 returncodes = {} 

77 

78 def on_terminate(p): 

79 logger.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode) 

80 returncodes[p.pid] = p.returncode 

81 

82 def signal_procs(sig): 

83 try: 

84 logger.info("Sending the signal %s to group %s", sig, process_group_id) 

85 os.killpg(process_group_id, sig) 

86 except OSError as err_killpg: 

87 # If operation not permitted error is thrown due to run_as_user, 

88 # use sudo -n(--non-interactive) to kill the process 

89 if err_killpg.errno == errno.EPERM: 

90 subprocess.check_call( 

91 ["sudo", "-n", "kill", "-" + str(int(sig))] 

92 + [str(p.pid) for p in all_processes_in_the_group] 

93 ) 

94 elif err_killpg.errno == errno.ESRCH: 

95 # There is a rare condition that the process has not managed yet to change it's process 

96 # group. In this case os.killpg fails with ESRCH error 

97 # So we additionally send a kill signal to the process itself. 

98 logger.info( 

99 "Sending the signal %s to process %s as process group is missing.", sig, process_group_id 

100 ) 

101 try: 

102 os.kill(process_group_id, sig) 

103 except OSError as err_kill: 

104 if err_kill.errno == errno.EPERM: 

105 subprocess.check_call(["sudo", "-n", "kill", "-" + str(process_group_id)]) 

106 else: 

107 raise 

108 else: 

109 raise 

110 

111 if process_group_id == os.getpgid(0): 

112 raise RuntimeError("I refuse to kill myself") 

113 

114 try: 

115 parent = psutil.Process(process_group_id) 

116 

117 all_processes_in_the_group = parent.children(recursive=True) 

118 all_processes_in_the_group.append(parent) 

119 except psutil.NoSuchProcess: 

120 # The process already exited, but maybe it's children haven't. 

121 all_processes_in_the_group = [] 

122 for proc in psutil.process_iter(): 

123 try: 

124 if os.getpgid(proc.pid) == process_group_id and proc.pid != 0: 

125 all_processes_in_the_group.append(proc) 

126 except OSError: 

127 pass 

128 

129 logger.info( 

130 "Sending %s to group %s. PIDs of all processes in the group: %s", 

131 sig, 

132 process_group_id, 

133 [p.pid for p in all_processes_in_the_group], 

134 ) 

135 try: 

136 signal_procs(sig) 

137 except OSError as err: 

138 # No such process, which means there is no such process group - our job 

139 # is done 

140 if err.errno == errno.ESRCH: 

141 return returncodes 

142 

143 _, alive = psutil.wait_procs(all_processes_in_the_group, timeout=timeout, callback=on_terminate) 

144 

145 if alive: 

146 for proc in alive: 

147 logger.warning("process %s did not respond to SIGTERM. Trying SIGKILL", proc) 

148 

149 try: 

150 signal_procs(signal.SIGKILL) 

151 except OSError as err: 

152 if err.errno != errno.ESRCH: 

153 raise 

154 

155 _, alive = psutil.wait_procs(alive, timeout=timeout, callback=on_terminate) 

156 if alive: 

157 for proc in alive: 

158 logger.error("Process %s (%s) could not be killed. Giving up.", proc, proc.pid) 

159 return returncodes 

160 

161 

162def execute_in_subprocess(cmd: list[str], cwd: str | None = None) -> None: 

163 """ 

164 Execute a process and stream output to logger. 

165 :param cmd: command and arguments to run 

166 :param cwd: Current working directory passed to the Popen constructor 

167 """ 

168 execute_in_subprocess_with_kwargs(cmd, cwd=cwd) 

169 

170 

171def execute_in_subprocess_with_kwargs(cmd: list[str], **kwargs) -> None: 

172 """ 

173 Execute a process and stream output to logger. 

174 

175 :param cmd: command and arguments to run 

176 

177 All other keyword args will be passed directly to subprocess.Popen 

178 """ 

179 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd)) 

180 with subprocess.Popen( 

181 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, close_fds=True, **kwargs 

182 ) as proc: 

183 log.info("Output:") 

184 if proc.stdout: 

185 with proc.stdout: 

186 for line in iter(proc.stdout.readline, b""): 

187 log.info("%s", line.decode().rstrip()) 

188 

189 exit_code = proc.wait() 

190 if exit_code != 0: 

191 raise subprocess.CalledProcessError(exit_code, cmd) 

192 

193 

194def execute_interactive(cmd: list[str], **kwargs) -> None: 

195 """ 

196 Run the new command as a subprocess. 

197 

198 Runs the new command as a subprocess and ensures that the terminal's state is restored to its original 

199 state after the process is completed e.g. if the subprocess hides the cursor, it will be restored after 

200 the process is completed. 

201 """ 

202 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd)) 

203 

204 old_tty = termios.tcgetattr(sys.stdin) 

205 tty.setraw(sys.stdin.fileno()) 

206 

207 # open pseudo-terminal to interact with subprocess 

208 primary_fd, secondary_fd = pty.openpty() 

209 try: 

210 # use os.setsid() make it run in a new process group, or bash job control will not be enabled 

211 with subprocess.Popen( 

212 cmd, 

213 stdin=secondary_fd, 

214 stdout=secondary_fd, 

215 stderr=secondary_fd, 

216 universal_newlines=True, 

217 **kwargs, 

218 ) as proc: 

219 while proc.poll() is None: 

220 readable_fbs, _, _ = select.select([sys.stdin, primary_fd], [], []) 

221 if sys.stdin in readable_fbs: 

222 input_data = os.read(sys.stdin.fileno(), 10240) 

223 os.write(primary_fd, input_data) 

224 if primary_fd in readable_fbs: 

225 output_data = os.read(primary_fd, 10240) 

226 if output_data: 

227 os.write(sys.stdout.fileno(), output_data) 

228 finally: 

229 # restore tty settings back 

230 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) 

231 

232 

233def kill_child_processes_by_pids(pids_to_kill: list[int], timeout: int = 5) -> None: 

234 """ 

235 Kills child processes for the current process. 

236 

237 First, it sends the SIGTERM signal, and after the time specified by the `timeout` parameter, sends 

238 the SIGKILL signal, if the process is still alive. 

239 

240 :param pids_to_kill: List of PID to be killed. 

241 :param timeout: The time to wait before sending the SIGKILL signal. 

242 """ 

243 this_process = psutil.Process(os.getpid()) 

244 # Only check child processes to ensure that we don't have a case 

245 # where we kill the wrong process because a child process died 

246 # but the PID got reused. 

247 child_processes = [ 

248 x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill 

249 ] 

250 

251 # First try SIGTERM 

252 for child in child_processes: 

253 log.info("Terminating child PID: %s", child.pid) 

254 child.terminate() 

255 

256 log.info("Waiting up to %s seconds for processes to exit...", timeout) 

257 try: 

258 psutil.wait_procs( 

259 child_processes, timeout=timeout, callback=lambda x: log.info("Terminated PID %s", x.pid) 

260 ) 

261 except psutil.TimeoutExpired: 

262 log.debug("Ran out of time while waiting for processes to exit") 

263 

264 # Then SIGKILL 

265 child_processes = [ 

266 x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill 

267 ] 

268 if child_processes: 

269 log.info("SIGKILL processes that did not terminate gracefully") 

270 for child in child_processes: 

271 log.info("Killing child PID: %s", child.pid) 

272 child.kill() 

273 child.wait() 

274 

275 

276@contextmanager 

277def patch_environ(new_env_variables: dict[str, str]) -> Generator[None, None, None]: 

278 """ 

279 Set environment variables in context. 

280 

281 After leaving the context, it restores its original state. 

282 :param new_env_variables: Environment variables to set 

283 """ 

284 current_env_state = {key: os.environ.get(key) for key in new_env_variables.keys()} 

285 os.environ.update(new_env_variables) 

286 try: 

287 yield 

288 finally: 

289 for key, old_value in current_env_state.items(): 

290 if old_value is None: 

291 if key in os.environ: 

292 del os.environ[key] 

293 else: 

294 os.environ[key] = old_value 

295 

296 

297def check_if_pidfile_process_is_running(pid_file: str, process_name: str): 

298 """ 

299 Checks if a pidfile already exists and process is still running. 

300 If process is dead then pidfile is removed. 

301 

302 :param pid_file: path to the pidfile 

303 :param process_name: name used in exception if process is up and 

304 running 

305 """ 

306 pid_lock_file = PIDLockFile(path=pid_file) 

307 # If file exists 

308 if pid_lock_file.is_locked(): 

309 # Read the pid 

310 pid = pid_lock_file.read_pid() 

311 if pid is None: 

312 return 

313 try: 

314 # Check if process is still running 

315 proc = psutil.Process(pid) 

316 if proc.is_running(): 

317 raise AirflowException(f"The {process_name} is already running under PID {pid}.") 

318 except psutil.NoSuchProcess: 

319 # If process is dead remove the pidfile 

320 pid_lock_file.break_lock() 

321 

322 

323def set_new_process_group() -> None: 

324 """ 

325 Try to set current process to a new process group. 

326 That makes it easy to kill all sub-process of this at the OS-level, 

327 rather than having to iterate the child processes. 

328 If current process spawn by system call ``exec()`` than keep current process group 

329 """ 

330 if os.getpid() == os.getsid(0): 

331 # If PID = SID than process a session leader, and it is not possible to change process group 

332 return 

333 

334 os.setpgid(0, 0)