Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/process_utils.py: 21%
155 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Utilities for running or stopping processes."""
19from __future__ import annotations
21import errno
22import logging
23import os
24import select
25import shlex
26import signal
27import subprocess
28import sys
30from airflow.utils.platform import IS_WINDOWS
32if not IS_WINDOWS:
33 import pty
34 import termios
35 import tty
37from contextlib import contextmanager
38from typing import Generator
40import psutil
41from lockfile.pidlockfile import PIDLockFile
43from airflow.configuration import conf
44from airflow.exceptions import AirflowException
46log = logging.getLogger(__name__)
48# When killing processes, time to wait after issuing a SIGTERM before issuing a
49# SIGKILL.
50DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint("core", "KILLED_TASK_CLEANUP_TIME")
53def reap_process_group(
54 process_group_id: int,
55 logger,
56 sig: signal.Signals = signal.SIGTERM,
57 timeout: int = DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM,
58) -> dict[int, int]:
59 """
60 Send sig (SIGTERM) to the process group of pid.
62 Tries really hard to terminate all processes in the group (including grandchildren). Will send
63 sig (SIGTERM) to the process group of pid. If any process is alive after timeout
64 a SIGKILL will be send.
66 :param process_group_id: process group id to kill.
67 The process that wants to create the group should run
68 `airflow.utils.process_utils.set_new_process_group()` as the first command
69 it executes which will set group id = process_id. Effectively the process that is the
70 "root" of the group has pid = gid and all other processes in the group have different
71 pids but the same gid (equal the pid of the root process)
72 :param logger: log handler
73 :param sig: signal type
74 :param timeout: how much time a process has to terminate
75 """
76 returncodes = {}
78 def on_terminate(p):
79 logger.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode)
80 returncodes[p.pid] = p.returncode
82 def signal_procs(sig):
83 if IS_WINDOWS:
84 return
85 try:
86 logger.info("Sending the signal %s to group %s", sig, process_group_id)
87 os.killpg(process_group_id, sig)
88 except OSError as err_killpg:
89 # If operation not permitted error is thrown due to run_as_user,
90 # use sudo -n(--non-interactive) to kill the process
91 if err_killpg.errno == errno.EPERM:
92 subprocess.check_call(
93 ["sudo", "-n", "kill", "-" + str(int(sig))]
94 + [str(p.pid) for p in all_processes_in_the_group]
95 )
96 elif err_killpg.errno == errno.ESRCH:
97 # There is a rare condition that the process has not managed yet to change it's process
98 # group. In this case os.killpg fails with ESRCH error
99 # So we additionally send a kill signal to the process itself.
100 logger.info(
101 "Sending the signal %s to process %s as process group is missing.", sig, process_group_id
102 )
103 try:
104 os.kill(process_group_id, sig)
105 except OSError as err_kill:
106 if err_kill.errno == errno.EPERM:
107 subprocess.check_call(["sudo", "-n", "kill", "-" + str(process_group_id)])
108 else:
109 raise
110 else:
111 raise
113 if not IS_WINDOWS and process_group_id == os.getpgid(0):
114 raise RuntimeError("I refuse to kill myself")
116 try:
117 parent = psutil.Process(process_group_id)
119 all_processes_in_the_group = parent.children(recursive=True)
120 all_processes_in_the_group.append(parent)
121 except psutil.NoSuchProcess:
122 # The process already exited, but maybe it's children haven't.
123 all_processes_in_the_group = []
124 for proc in psutil.process_iter():
125 try:
126 if os.getpgid(proc.pid) == process_group_id and proc.pid != 0:
127 all_processes_in_the_group.append(proc)
128 except OSError:
129 pass
131 logger.info(
132 "Sending %s to group %s. PIDs of all processes in the group: %s",
133 sig,
134 process_group_id,
135 [p.pid for p in all_processes_in_the_group],
136 )
137 try:
138 signal_procs(sig)
139 except OSError as err:
140 # No such process, which means there is no such process group - our job
141 # is done
142 if err.errno == errno.ESRCH:
143 return returncodes
145 _, alive = psutil.wait_procs(all_processes_in_the_group, timeout=timeout, callback=on_terminate)
147 if alive:
148 for proc in alive:
149 logger.warning("process %s did not respond to SIGTERM. Trying SIGKILL", proc)
151 try:
152 signal_procs(signal.SIGKILL)
153 except OSError as err:
154 if err.errno != errno.ESRCH:
155 raise
157 _, alive = psutil.wait_procs(alive, timeout=timeout, callback=on_terminate)
158 if alive:
159 for proc in alive:
160 logger.error("Process %s (%s) could not be killed. Giving up.", proc, proc.pid)
161 return returncodes
164def execute_in_subprocess(cmd: list[str], cwd: str | None = None) -> None:
165 """
166 Execute a process and stream output to logger.
168 :param cmd: command and arguments to run
169 :param cwd: Current working directory passed to the Popen constructor
170 """
171 execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
174def execute_in_subprocess_with_kwargs(cmd: list[str], **kwargs) -> None:
175 """
176 Execute a process and stream output to logger.
178 :param cmd: command and arguments to run
180 All other keyword args will be passed directly to subprocess.Popen
181 """
182 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
183 with subprocess.Popen(
184 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, close_fds=True, **kwargs
185 ) as proc:
186 log.info("Output:")
187 if proc.stdout:
188 with proc.stdout:
189 for line in iter(proc.stdout.readline, b""):
190 log.info("%s", line.decode().rstrip())
192 exit_code = proc.wait()
193 if exit_code != 0:
194 raise subprocess.CalledProcessError(exit_code, cmd)
197def execute_interactive(cmd: list[str], **kwargs) -> None:
198 """
199 Run the new command as a subprocess.
201 Runs the new command as a subprocess and ensures that the terminal's state is restored to its original
202 state after the process is completed e.g. if the subprocess hides the cursor, it will be restored after
203 the process is completed.
204 """
205 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
207 old_tty = termios.tcgetattr(sys.stdin)
208 tty.setraw(sys.stdin.fileno())
210 # open pseudo-terminal to interact with subprocess
211 primary_fd, secondary_fd = pty.openpty()
212 try:
213 # use os.setsid() make it run in a new process group, or bash job control will not be enabled
214 with subprocess.Popen(
215 cmd,
216 stdin=secondary_fd,
217 stdout=secondary_fd,
218 stderr=secondary_fd,
219 universal_newlines=True,
220 **kwargs,
221 ) as proc:
222 while proc.poll() is None:
223 readable_fbs, _, _ = select.select([sys.stdin, primary_fd], [], [])
224 if sys.stdin in readable_fbs:
225 input_data = os.read(sys.stdin.fileno(), 10240)
226 os.write(primary_fd, input_data)
227 if primary_fd in readable_fbs:
228 output_data = os.read(primary_fd, 10240)
229 if output_data:
230 os.write(sys.stdout.fileno(), output_data)
231 finally:
232 # restore tty settings back
233 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
236def kill_child_processes_by_pids(pids_to_kill: list[int], timeout: int = 5) -> None:
237 """
238 Kills child processes for the current process.
240 First, it sends the SIGTERM signal, and after the time specified by the `timeout` parameter, sends
241 the SIGKILL signal, if the process is still alive.
243 :param pids_to_kill: List of PID to be killed.
244 :param timeout: The time to wait before sending the SIGKILL signal.
245 """
246 this_process = psutil.Process(os.getpid())
247 # Only check child processes to ensure that we don't have a case
248 # where we kill the wrong process because a child process died
249 # but the PID got reused.
250 child_processes = [
251 x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill
252 ]
254 # First try SIGTERM
255 for child in child_processes:
256 log.info("Terminating child PID: %s", child.pid)
257 child.terminate()
259 log.info("Waiting up to %s seconds for processes to exit...", timeout)
260 try:
261 psutil.wait_procs(
262 child_processes, timeout=timeout, callback=lambda x: log.info("Terminated PID %s", x.pid)
263 )
264 except psutil.TimeoutExpired:
265 log.debug("Ran out of time while waiting for processes to exit")
267 # Then SIGKILL
268 child_processes = [
269 x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill
270 ]
271 if child_processes:
272 log.info("SIGKILL processes that did not terminate gracefully")
273 for child in child_processes:
274 log.info("Killing child PID: %s", child.pid)
275 child.kill()
276 child.wait()
279@contextmanager
280def patch_environ(new_env_variables: dict[str, str]) -> Generator[None, None, None]:
281 """
282 Set environment variables in context.
284 After leaving the context, it restores its original state.
285 :param new_env_variables: Environment variables to set
286 """
287 current_env_state = {key: os.environ.get(key) for key in new_env_variables.keys()}
288 os.environ.update(new_env_variables)
289 try:
290 yield
291 finally:
292 for key, old_value in current_env_state.items():
293 if old_value is None:
294 if key in os.environ:
295 del os.environ[key]
296 else:
297 os.environ[key] = old_value
300def check_if_pidfile_process_is_running(pid_file: str, process_name: str):
301 """
302 Checks if a pidfile already exists and process is still running.
303 If process is dead then pidfile is removed.
305 :param pid_file: path to the pidfile
306 :param process_name: name used in exception if process is up and
307 running
308 """
309 pid_lock_file = PIDLockFile(path=pid_file)
310 # If file exists
311 if pid_lock_file.is_locked():
312 # Read the pid
313 pid = pid_lock_file.read_pid()
314 if pid is None:
315 return
316 try:
317 # Check if process is still running
318 proc = psutil.Process(pid)
319 if proc.is_running():
320 raise AirflowException(f"The {process_name} is already running under PID {pid}.")
321 except psutil.NoSuchProcess:
322 # If process is dead remove the pidfile
323 pid_lock_file.break_lock()
326def set_new_process_group() -> None:
327 """Try to set current process to a new process group.
329 That makes it easy to kill all sub-process of this at the OS-level,
330 rather than having to iterate the child processes.
332 If current process was spawned by system call ``exec()``, the current
333 process group is kept.
334 """
335 if os.getpid() == os.getsid(0):
336 # If PID = SID than process a session leader, and it is not possible to change process group
337 return
339 os.setpgid(0, 0)