Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/process_utils.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

158 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Utilities for running or stopping processes.""" 

19 

20from __future__ import annotations 

21 

22import errno 

23import logging 

24import os 

25import select 

26import shlex 

27import signal 

28import subprocess 

29import sys 

30 

31from airflow.utils.platform import IS_WINDOWS 

32 

33if not IS_WINDOWS: 

34 import pty 

35 import termios 

36 import tty 

37 

38from contextlib import contextmanager 

39from typing import Generator 

40 

41import psutil 

42from lockfile.pidlockfile import PIDLockFile 

43 

44from airflow.configuration import conf 

45from airflow.exceptions import AirflowException 

46 

47log = logging.getLogger(__name__) 

48 

49# When killing processes, time to wait after issuing a SIGTERM before issuing a 

50# SIGKILL. 

51DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint("core", "KILLED_TASK_CLEANUP_TIME") 

52 

53 

54def reap_process_group( 

55 process_group_id: int, 

56 logger, 

57 sig: signal.Signals = signal.SIGTERM, 

58 timeout: int = DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM, 

59) -> dict[int, int]: 

60 """ 

61 Send sig (SIGTERM) to the process group of pid. 

62 

63 Tries really hard to terminate all processes in the group (including grandchildren). Will send 

64 sig (SIGTERM) to the process group of pid. If any process is alive after timeout 

65 a SIGKILL will be send. 

66 

67 :param process_group_id: process group id to kill. 

68 The process that wants to create the group should run 

69 `airflow.utils.process_utils.set_new_process_group()` as the first command 

70 it executes which will set group id = process_id. Effectively the process that is the 

71 "root" of the group has pid = gid and all other processes in the group have different 

72 pids but the same gid (equal the pid of the root process) 

73 :param logger: log handler 

74 :param sig: signal type 

75 :param timeout: how much time a process has to terminate 

76 """ 

77 returncodes = {} 

78 

79 def on_terminate(p): 

80 logger.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode) 

81 returncodes[p.pid] = p.returncode 

82 

83 def signal_procs(sig): 

84 if IS_WINDOWS: 

85 return 

86 try: 

87 logger.info("Sending the signal %s to group %s", sig, process_group_id) 

88 os.killpg(process_group_id, sig) 

89 except OSError as err_killpg: 

90 # If operation not permitted error is thrown due to run_as_user, 

91 # use sudo -n(--non-interactive) to kill the process 

92 if err_killpg.errno == errno.EPERM: 

93 subprocess.check_call( 

94 ["sudo", "-n", "kill", "-" + str(int(sig))] 

95 + [str(p.pid) for p in all_processes_in_the_group] 

96 ) 

97 elif err_killpg.errno == errno.ESRCH: 

98 # There is a rare condition that the process has not managed yet to change its process 

99 # group. In this case os.killpg fails with ESRCH error 

100 # So we additionally send a kill signal to the process itself. 

101 logger.info( 

102 "Sending the signal %s to process %s as process group is missing.", sig, process_group_id 

103 ) 

104 try: 

105 os.kill(process_group_id, sig) 

106 except OSError as err_kill: 

107 if err_kill.errno == errno.EPERM: 

108 subprocess.check_call(["sudo", "-n", "kill", "-" + str(process_group_id)]) 

109 else: 

110 raise 

111 else: 

112 raise 

113 

114 if not IS_WINDOWS and process_group_id == os.getpgid(0): 

115 raise RuntimeError("I refuse to kill myself") 

116 

117 try: 

118 parent = psutil.Process(process_group_id) 

119 

120 all_processes_in_the_group = parent.children(recursive=True) 

121 all_processes_in_the_group.append(parent) 

122 except psutil.NoSuchProcess: 

123 # The process already exited, but maybe its children haven't. 

124 all_processes_in_the_group = [] 

125 for proc in psutil.process_iter(): 

126 try: 

127 if os.getpgid(proc.pid) == process_group_id and proc.pid != 0: 

128 all_processes_in_the_group.append(proc) 

129 except OSError: 

130 pass 

131 

132 logger.info( 

133 "Sending %s to group %s. PIDs of all processes in the group: %s", 

134 sig, 

135 process_group_id, 

136 [p.pid for p in all_processes_in_the_group], 

137 ) 

138 try: 

139 signal_procs(sig) 

140 except OSError as err: 

141 # No such process, which means there is no such process group - our job 

142 # is done 

143 if err.errno == errno.ESRCH: 

144 return returncodes 

145 

146 _, alive = psutil.wait_procs(all_processes_in_the_group, timeout=timeout, callback=on_terminate) 

147 

148 if alive: 

149 for proc in alive: 

150 logger.warning("process %s did not respond to SIGTERM. Trying SIGKILL", proc) 

151 

152 try: 

153 signal_procs(signal.SIGKILL) 

154 except OSError as err: 

155 if err.errno != errno.ESRCH: 

156 raise 

157 

158 _, alive = psutil.wait_procs(alive, timeout=timeout, callback=on_terminate) 

159 if alive: 

160 for proc in alive: 

161 logger.error("Process %s (%s) could not be killed. Giving up.", proc, proc.pid) 

162 return returncodes 

163 

164 

165def execute_in_subprocess(cmd: list[str], cwd: str | None = None) -> None: 

166 """ 

167 Execute a process and stream output to logger. 

168 

169 :param cmd: command and arguments to run 

170 :param cwd: Current working directory passed to the Popen constructor 

171 """ 

172 execute_in_subprocess_with_kwargs(cmd, cwd=cwd) 

173 

174 

175def execute_in_subprocess_with_kwargs(cmd: list[str], **kwargs) -> None: 

176 """ 

177 Execute a process and stream output to logger. 

178 

179 :param cmd: command and arguments to run 

180 

181 All other keyword args will be passed directly to subprocess.Popen 

182 """ 

183 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd)) 

184 with subprocess.Popen( 

185 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, close_fds=True, **kwargs 

186 ) as proc: 

187 log.info("Output:") 

188 if proc.stdout: 

189 with proc.stdout: 

190 for line in iter(proc.stdout.readline, b""): 

191 log.info("%s", line.decode().rstrip()) 

192 

193 exit_code = proc.wait() 

194 if exit_code != 0: 

195 raise subprocess.CalledProcessError(exit_code, cmd) 

196 

197 

198def execute_interactive(cmd: list[str], **kwargs) -> None: 

199 """ 

200 Run the new command as a subprocess. 

201 

202 Runs the new command as a subprocess and ensures that the terminal's state is restored to its original 

203 state after the process is completed e.g. if the subprocess hides the cursor, it will be restored after 

204 the process is completed. 

205 """ 

206 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd)) 

207 

208 old_tty = termios.tcgetattr(sys.stdin) 

209 old_sigint_handler = signal.getsignal(signal.SIGINT) 

210 tty.setcbreak(sys.stdin.fileno()) 

211 

212 # open pseudo-terminal to interact with subprocess 

213 primary_fd, secondary_fd = pty.openpty() 

214 try: 

215 with subprocess.Popen( 

216 cmd, 

217 stdin=secondary_fd, 

218 stdout=secondary_fd, 

219 stderr=secondary_fd, 

220 universal_newlines=True, 

221 **kwargs, 

222 ) as proc: 

223 # ignore SIGINT in the parent process 

224 signal.signal(signal.SIGINT, signal.SIG_IGN) 

225 while proc.poll() is None: 

226 readable_fbs, _, _ = select.select([sys.stdin, primary_fd], [], [], 0) 

227 if sys.stdin in readable_fbs: 

228 input_data = os.read(sys.stdin.fileno(), 10240) 

229 os.write(primary_fd, input_data) 

230 if primary_fd in readable_fbs: 

231 output_data = os.read(primary_fd, 10240) 

232 if output_data: 

233 os.write(sys.stdout.fileno(), output_data) 

234 finally: 

235 # restore tty settings back 

236 signal.signal(signal.SIGINT, old_sigint_handler) 

237 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) 

238 

239 

240def kill_child_processes_by_pids(pids_to_kill: list[int], timeout: int = 5) -> None: 

241 """ 

242 Kills child processes for the current process. 

243 

244 First, it sends the SIGTERM signal, and after the time specified by the `timeout` parameter, sends 

245 the SIGKILL signal, if the process is still alive. 

246 

247 :param pids_to_kill: List of PID to be killed. 

248 :param timeout: The time to wait before sending the SIGKILL signal. 

249 """ 

250 this_process = psutil.Process(os.getpid()) 

251 # Only check child processes to ensure that we don't have a case 

252 # where we kill the wrong process because a child process died 

253 # but the PID got reused. 

254 child_processes = [ 

255 x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill 

256 ] 

257 

258 # First try SIGTERM 

259 for child in child_processes: 

260 log.info("Terminating child PID: %s", child.pid) 

261 child.terminate() 

262 

263 log.info("Waiting up to %s seconds for processes to exit...", timeout) 

264 try: 

265 psutil.wait_procs( 

266 child_processes, timeout=timeout, callback=lambda x: log.info("Terminated PID %s", x.pid) 

267 ) 

268 except psutil.TimeoutExpired: 

269 log.debug("Ran out of time while waiting for processes to exit") 

270 

271 # Then SIGKILL 

272 child_processes = [ 

273 x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill 

274 ] 

275 if child_processes: 

276 log.info("SIGKILL processes that did not terminate gracefully") 

277 for child in child_processes: 

278 log.info("Killing child PID: %s", child.pid) 

279 child.kill() 

280 child.wait() 

281 

282 

283@contextmanager 

284def patch_environ(new_env_variables: dict[str, str]) -> Generator[None, None, None]: 

285 """ 

286 Set environment variables in context. 

287 

288 After leaving the context, it restores its original state. 

289 :param new_env_variables: Environment variables to set 

290 """ 

291 current_env_state = {key: os.environ.get(key) for key in new_env_variables} 

292 os.environ.update(new_env_variables) 

293 try: 

294 yield 

295 finally: 

296 for key, old_value in current_env_state.items(): 

297 if old_value is None: 

298 if key in os.environ: 

299 del os.environ[key] 

300 else: 

301 os.environ[key] = old_value 

302 

303 

304def check_if_pidfile_process_is_running(pid_file: str, process_name: str): 

305 """ 

306 Check if a pidfile already exists and process is still running. 

307 

308 If process is dead then pidfile is removed. 

309 

310 :param pid_file: path to the pidfile 

311 :param process_name: name used in exception if process is up and 

312 running 

313 """ 

314 pid_lock_file = PIDLockFile(path=pid_file) 

315 # If file exists 

316 if pid_lock_file.is_locked(): 

317 # Read the pid 

318 pid = pid_lock_file.read_pid() 

319 if pid is None: 

320 return 

321 try: 

322 # Check if process is still running 

323 proc = psutil.Process(pid) 

324 if proc.is_running(): 

325 raise AirflowException(f"The {process_name} is already running under PID {pid}.") 

326 except psutil.NoSuchProcess: 

327 # If process is dead remove the pidfile 

328 pid_lock_file.break_lock() 

329 

330 

331def set_new_process_group() -> None: 

332 """Try to set current process to a new process group. 

333 

334 That makes it easy to kill all sub-process of this at the OS-level, 

335 rather than having to iterate the child processes. 

336 

337 If current process was spawned by system call ``exec()``, the current 

338 process group is kept. 

339 """ 

340 if os.getpid() == os.getsid(0): 

341 # If PID = SID than process a session leader, and it is not possible to change process group 

342 return 

343 

344 os.setpgid(0, 0)