Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/process_utils.py: 21%

155 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Utilities for running or stopping processes.""" 

19from __future__ import annotations 

20 

21import errno 

22import logging 

23import os 

24import select 

25import shlex 

26import signal 

27import subprocess 

28import sys 

29 

30from airflow.utils.platform import IS_WINDOWS 

31 

32if not IS_WINDOWS: 

33 import pty 

34 import termios 

35 import tty 

36 

37from contextlib import contextmanager 

38from typing import Generator 

39 

40import psutil 

41from lockfile.pidlockfile import PIDLockFile 

42 

43from airflow.configuration import conf 

44from airflow.exceptions import AirflowException 

45 

46log = logging.getLogger(__name__) 

47 

48# When killing processes, time to wait after issuing a SIGTERM before issuing a 

49# SIGKILL. 

50DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint("core", "KILLED_TASK_CLEANUP_TIME") 

51 

52 

53def reap_process_group( 

54 process_group_id: int, 

55 logger, 

56 sig: signal.Signals = signal.SIGTERM, 

57 timeout: int = DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM, 

58) -> dict[int, int]: 

59 """ 

60 Send sig (SIGTERM) to the process group of pid. 

61 

62 Tries really hard to terminate all processes in the group (including grandchildren). Will send 

63 sig (SIGTERM) to the process group of pid. If any process is alive after timeout 

64 a SIGKILL will be send. 

65 

66 :param process_group_id: process group id to kill. 

67 The process that wants to create the group should run 

68 `airflow.utils.process_utils.set_new_process_group()` as the first command 

69 it executes which will set group id = process_id. Effectively the process that is the 

70 "root" of the group has pid = gid and all other processes in the group have different 

71 pids but the same gid (equal the pid of the root process) 

72 :param logger: log handler 

73 :param sig: signal type 

74 :param timeout: how much time a process has to terminate 

75 """ 

76 returncodes = {} 

77 

78 def on_terminate(p): 

79 logger.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode) 

80 returncodes[p.pid] = p.returncode 

81 

82 def signal_procs(sig): 

83 if IS_WINDOWS: 

84 return 

85 try: 

86 logger.info("Sending the signal %s to group %s", sig, process_group_id) 

87 os.killpg(process_group_id, sig) 

88 except OSError as err_killpg: 

89 # If operation not permitted error is thrown due to run_as_user, 

90 # use sudo -n(--non-interactive) to kill the process 

91 if err_killpg.errno == errno.EPERM: 

92 subprocess.check_call( 

93 ["sudo", "-n", "kill", "-" + str(int(sig))] 

94 + [str(p.pid) for p in all_processes_in_the_group] 

95 ) 

96 elif err_killpg.errno == errno.ESRCH: 

97 # There is a rare condition that the process has not managed yet to change it's process 

98 # group. In this case os.killpg fails with ESRCH error 

99 # So we additionally send a kill signal to the process itself. 

100 logger.info( 

101 "Sending the signal %s to process %s as process group is missing.", sig, process_group_id 

102 ) 

103 try: 

104 os.kill(process_group_id, sig) 

105 except OSError as err_kill: 

106 if err_kill.errno == errno.EPERM: 

107 subprocess.check_call(["sudo", "-n", "kill", "-" + str(process_group_id)]) 

108 else: 

109 raise 

110 else: 

111 raise 

112 

113 if not IS_WINDOWS and process_group_id == os.getpgid(0): 

114 raise RuntimeError("I refuse to kill myself") 

115 

116 try: 

117 parent = psutil.Process(process_group_id) 

118 

119 all_processes_in_the_group = parent.children(recursive=True) 

120 all_processes_in_the_group.append(parent) 

121 except psutil.NoSuchProcess: 

122 # The process already exited, but maybe it's children haven't. 

123 all_processes_in_the_group = [] 

124 for proc in psutil.process_iter(): 

125 try: 

126 if os.getpgid(proc.pid) == process_group_id and proc.pid != 0: 

127 all_processes_in_the_group.append(proc) 

128 except OSError: 

129 pass 

130 

131 logger.info( 

132 "Sending %s to group %s. PIDs of all processes in the group: %s", 

133 sig, 

134 process_group_id, 

135 [p.pid for p in all_processes_in_the_group], 

136 ) 

137 try: 

138 signal_procs(sig) 

139 except OSError as err: 

140 # No such process, which means there is no such process group - our job 

141 # is done 

142 if err.errno == errno.ESRCH: 

143 return returncodes 

144 

145 _, alive = psutil.wait_procs(all_processes_in_the_group, timeout=timeout, callback=on_terminate) 

146 

147 if alive: 

148 for proc in alive: 

149 logger.warning("process %s did not respond to SIGTERM. Trying SIGKILL", proc) 

150 

151 try: 

152 signal_procs(signal.SIGKILL) 

153 except OSError as err: 

154 if err.errno != errno.ESRCH: 

155 raise 

156 

157 _, alive = psutil.wait_procs(alive, timeout=timeout, callback=on_terminate) 

158 if alive: 

159 for proc in alive: 

160 logger.error("Process %s (%s) could not be killed. Giving up.", proc, proc.pid) 

161 return returncodes 

162 

163 

164def execute_in_subprocess(cmd: list[str], cwd: str | None = None) -> None: 

165 """ 

166 Execute a process and stream output to logger. 

167 

168 :param cmd: command and arguments to run 

169 :param cwd: Current working directory passed to the Popen constructor 

170 """ 

171 execute_in_subprocess_with_kwargs(cmd, cwd=cwd) 

172 

173 

174def execute_in_subprocess_with_kwargs(cmd: list[str], **kwargs) -> None: 

175 """ 

176 Execute a process and stream output to logger. 

177 

178 :param cmd: command and arguments to run 

179 

180 All other keyword args will be passed directly to subprocess.Popen 

181 """ 

182 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd)) 

183 with subprocess.Popen( 

184 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, close_fds=True, **kwargs 

185 ) as proc: 

186 log.info("Output:") 

187 if proc.stdout: 

188 with proc.stdout: 

189 for line in iter(proc.stdout.readline, b""): 

190 log.info("%s", line.decode().rstrip()) 

191 

192 exit_code = proc.wait() 

193 if exit_code != 0: 

194 raise subprocess.CalledProcessError(exit_code, cmd) 

195 

196 

197def execute_interactive(cmd: list[str], **kwargs) -> None: 

198 """ 

199 Run the new command as a subprocess. 

200 

201 Runs the new command as a subprocess and ensures that the terminal's state is restored to its original 

202 state after the process is completed e.g. if the subprocess hides the cursor, it will be restored after 

203 the process is completed. 

204 """ 

205 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd)) 

206 

207 old_tty = termios.tcgetattr(sys.stdin) 

208 tty.setraw(sys.stdin.fileno()) 

209 

210 # open pseudo-terminal to interact with subprocess 

211 primary_fd, secondary_fd = pty.openpty() 

212 try: 

213 # use os.setsid() make it run in a new process group, or bash job control will not be enabled 

214 with subprocess.Popen( 

215 cmd, 

216 stdin=secondary_fd, 

217 stdout=secondary_fd, 

218 stderr=secondary_fd, 

219 universal_newlines=True, 

220 **kwargs, 

221 ) as proc: 

222 while proc.poll() is None: 

223 readable_fbs, _, _ = select.select([sys.stdin, primary_fd], [], []) 

224 if sys.stdin in readable_fbs: 

225 input_data = os.read(sys.stdin.fileno(), 10240) 

226 os.write(primary_fd, input_data) 

227 if primary_fd in readable_fbs: 

228 output_data = os.read(primary_fd, 10240) 

229 if output_data: 

230 os.write(sys.stdout.fileno(), output_data) 

231 finally: 

232 # restore tty settings back 

233 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) 

234 

235 

236def kill_child_processes_by_pids(pids_to_kill: list[int], timeout: int = 5) -> None: 

237 """ 

238 Kills child processes for the current process. 

239 

240 First, it sends the SIGTERM signal, and after the time specified by the `timeout` parameter, sends 

241 the SIGKILL signal, if the process is still alive. 

242 

243 :param pids_to_kill: List of PID to be killed. 

244 :param timeout: The time to wait before sending the SIGKILL signal. 

245 """ 

246 this_process = psutil.Process(os.getpid()) 

247 # Only check child processes to ensure that we don't have a case 

248 # where we kill the wrong process because a child process died 

249 # but the PID got reused. 

250 child_processes = [ 

251 x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill 

252 ] 

253 

254 # First try SIGTERM 

255 for child in child_processes: 

256 log.info("Terminating child PID: %s", child.pid) 

257 child.terminate() 

258 

259 log.info("Waiting up to %s seconds for processes to exit...", timeout) 

260 try: 

261 psutil.wait_procs( 

262 child_processes, timeout=timeout, callback=lambda x: log.info("Terminated PID %s", x.pid) 

263 ) 

264 except psutil.TimeoutExpired: 

265 log.debug("Ran out of time while waiting for processes to exit") 

266 

267 # Then SIGKILL 

268 child_processes = [ 

269 x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill 

270 ] 

271 if child_processes: 

272 log.info("SIGKILL processes that did not terminate gracefully") 

273 for child in child_processes: 

274 log.info("Killing child PID: %s", child.pid) 

275 child.kill() 

276 child.wait() 

277 

278 

279@contextmanager 

280def patch_environ(new_env_variables: dict[str, str]) -> Generator[None, None, None]: 

281 """ 

282 Set environment variables in context. 

283 

284 After leaving the context, it restores its original state. 

285 :param new_env_variables: Environment variables to set 

286 """ 

287 current_env_state = {key: os.environ.get(key) for key in new_env_variables.keys()} 

288 os.environ.update(new_env_variables) 

289 try: 

290 yield 

291 finally: 

292 for key, old_value in current_env_state.items(): 

293 if old_value is None: 

294 if key in os.environ: 

295 del os.environ[key] 

296 else: 

297 os.environ[key] = old_value 

298 

299 

300def check_if_pidfile_process_is_running(pid_file: str, process_name: str): 

301 """ 

302 Checks if a pidfile already exists and process is still running. 

303 If process is dead then pidfile is removed. 

304 

305 :param pid_file: path to the pidfile 

306 :param process_name: name used in exception if process is up and 

307 running 

308 """ 

309 pid_lock_file = PIDLockFile(path=pid_file) 

310 # If file exists 

311 if pid_lock_file.is_locked(): 

312 # Read the pid 

313 pid = pid_lock_file.read_pid() 

314 if pid is None: 

315 return 

316 try: 

317 # Check if process is still running 

318 proc = psutil.Process(pid) 

319 if proc.is_running(): 

320 raise AirflowException(f"The {process_name} is already running under PID {pid}.") 

321 except psutil.NoSuchProcess: 

322 # If process is dead remove the pidfile 

323 pid_lock_file.break_lock() 

324 

325 

326def set_new_process_group() -> None: 

327 """Try to set current process to a new process group. 

328 

329 That makes it easy to kill all sub-process of this at the OS-level, 

330 rather than having to iterate the child processes. 

331 

332 If current process was spawned by system call ``exec()``, the current 

333 process group is kept. 

334 """ 

335 if os.getpid() == os.getsid(0): 

336 # If PID = SID than process a session leader, and it is not possible to change process group 

337 return 

338 

339 os.setpgid(0, 0)