Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/process_utils.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Utilities for running or stopping processes."""
20from __future__ import annotations
22import errno
23import logging
24import os
25import select
26import shlex
27import signal
28import subprocess
29import sys
31from airflow.utils.platform import IS_WINDOWS
33if not IS_WINDOWS:
34 import pty
35 import termios
36 import tty
38from contextlib import contextmanager
39from typing import Generator
41import psutil
42from lockfile.pidlockfile import PIDLockFile
44from airflow.configuration import conf
45from airflow.exceptions import AirflowException
47log = logging.getLogger(__name__)
49# When killing processes, time to wait after issuing a SIGTERM before issuing a
50# SIGKILL.
51DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint("core", "KILLED_TASK_CLEANUP_TIME")
54def reap_process_group(
55 process_group_id: int,
56 logger,
57 sig: signal.Signals = signal.SIGTERM,
58 timeout: int = DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM,
59) -> dict[int, int]:
60 """
61 Send sig (SIGTERM) to the process group of pid.
63 Tries really hard to terminate all processes in the group (including grandchildren). Will send
64 sig (SIGTERM) to the process group of pid. If any process is alive after timeout
65 a SIGKILL will be send.
67 :param process_group_id: process group id to kill.
68 The process that wants to create the group should run
69 `airflow.utils.process_utils.set_new_process_group()` as the first command
70 it executes which will set group id = process_id. Effectively the process that is the
71 "root" of the group has pid = gid and all other processes in the group have different
72 pids but the same gid (equal the pid of the root process)
73 :param logger: log handler
74 :param sig: signal type
75 :param timeout: how much time a process has to terminate
76 """
77 returncodes = {}
79 def on_terminate(p):
80 logger.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode)
81 returncodes[p.pid] = p.returncode
83 def signal_procs(sig):
84 if IS_WINDOWS:
85 return
86 try:
87 logger.info("Sending the signal %s to group %s", sig, process_group_id)
88 os.killpg(process_group_id, sig)
89 except OSError as err_killpg:
90 # If operation not permitted error is thrown due to run_as_user,
91 # use sudo -n(--non-interactive) to kill the process
92 if err_killpg.errno == errno.EPERM:
93 subprocess.check_call(
94 ["sudo", "-n", "kill", "-" + str(int(sig))]
95 + [str(p.pid) for p in all_processes_in_the_group]
96 )
97 elif err_killpg.errno == errno.ESRCH:
98 # There is a rare condition that the process has not managed yet to change its process
99 # group. In this case os.killpg fails with ESRCH error
100 # So we additionally send a kill signal to the process itself.
101 logger.info(
102 "Sending the signal %s to process %s as process group is missing.", sig, process_group_id
103 )
104 try:
105 os.kill(process_group_id, sig)
106 except OSError as err_kill:
107 if err_kill.errno == errno.EPERM:
108 subprocess.check_call(["sudo", "-n", "kill", "-" + str(process_group_id)])
109 else:
110 raise
111 else:
112 raise
114 if not IS_WINDOWS and process_group_id == os.getpgid(0):
115 raise RuntimeError("I refuse to kill myself")
117 try:
118 parent = psutil.Process(process_group_id)
120 all_processes_in_the_group = parent.children(recursive=True)
121 all_processes_in_the_group.append(parent)
122 except psutil.NoSuchProcess:
123 # The process already exited, but maybe its children haven't.
124 all_processes_in_the_group = []
125 for proc in psutil.process_iter():
126 try:
127 if os.getpgid(proc.pid) == process_group_id and proc.pid != 0:
128 all_processes_in_the_group.append(proc)
129 except OSError:
130 pass
132 logger.info(
133 "Sending %s to group %s. PIDs of all processes in the group: %s",
134 sig,
135 process_group_id,
136 [p.pid for p in all_processes_in_the_group],
137 )
138 try:
139 signal_procs(sig)
140 except OSError as err:
141 # No such process, which means there is no such process group - our job
142 # is done
143 if err.errno == errno.ESRCH:
144 return returncodes
146 _, alive = psutil.wait_procs(all_processes_in_the_group, timeout=timeout, callback=on_terminate)
148 if alive:
149 for proc in alive:
150 logger.warning("process %s did not respond to SIGTERM. Trying SIGKILL", proc)
152 try:
153 signal_procs(signal.SIGKILL)
154 except OSError as err:
155 if err.errno != errno.ESRCH:
156 raise
158 _, alive = psutil.wait_procs(alive, timeout=timeout, callback=on_terminate)
159 if alive:
160 for proc in alive:
161 logger.error("Process %s (%s) could not be killed. Giving up.", proc, proc.pid)
162 return returncodes
165def execute_in_subprocess(cmd: list[str], cwd: str | None = None) -> None:
166 """
167 Execute a process and stream output to logger.
169 :param cmd: command and arguments to run
170 :param cwd: Current working directory passed to the Popen constructor
171 """
172 execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
175def execute_in_subprocess_with_kwargs(cmd: list[str], **kwargs) -> None:
176 """
177 Execute a process and stream output to logger.
179 :param cmd: command and arguments to run
181 All other keyword args will be passed directly to subprocess.Popen
182 """
183 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
184 with subprocess.Popen(
185 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, close_fds=True, **kwargs
186 ) as proc:
187 log.info("Output:")
188 if proc.stdout:
189 with proc.stdout:
190 for line in iter(proc.stdout.readline, b""):
191 log.info("%s", line.decode().rstrip())
193 exit_code = proc.wait()
194 if exit_code != 0:
195 raise subprocess.CalledProcessError(exit_code, cmd)
198def execute_interactive(cmd: list[str], **kwargs) -> None:
199 """
200 Run the new command as a subprocess.
202 Runs the new command as a subprocess and ensures that the terminal's state is restored to its original
203 state after the process is completed e.g. if the subprocess hides the cursor, it will be restored after
204 the process is completed.
205 """
206 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
208 old_tty = termios.tcgetattr(sys.stdin)
209 old_sigint_handler = signal.getsignal(signal.SIGINT)
210 tty.setcbreak(sys.stdin.fileno())
212 # open pseudo-terminal to interact with subprocess
213 primary_fd, secondary_fd = pty.openpty()
214 try:
215 with subprocess.Popen(
216 cmd,
217 stdin=secondary_fd,
218 stdout=secondary_fd,
219 stderr=secondary_fd,
220 universal_newlines=True,
221 **kwargs,
222 ) as proc:
223 # ignore SIGINT in the parent process
224 signal.signal(signal.SIGINT, signal.SIG_IGN)
225 while proc.poll() is None:
226 readable_fbs, _, _ = select.select([sys.stdin, primary_fd], [], [], 0)
227 if sys.stdin in readable_fbs:
228 input_data = os.read(sys.stdin.fileno(), 10240)
229 os.write(primary_fd, input_data)
230 if primary_fd in readable_fbs:
231 output_data = os.read(primary_fd, 10240)
232 if output_data:
233 os.write(sys.stdout.fileno(), output_data)
234 finally:
235 # restore tty settings back
236 signal.signal(signal.SIGINT, old_sigint_handler)
237 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
240def kill_child_processes_by_pids(pids_to_kill: list[int], timeout: int = 5) -> None:
241 """
242 Kills child processes for the current process.
244 First, it sends the SIGTERM signal, and after the time specified by the `timeout` parameter, sends
245 the SIGKILL signal, if the process is still alive.
247 :param pids_to_kill: List of PID to be killed.
248 :param timeout: The time to wait before sending the SIGKILL signal.
249 """
250 this_process = psutil.Process(os.getpid())
251 # Only check child processes to ensure that we don't have a case
252 # where we kill the wrong process because a child process died
253 # but the PID got reused.
254 child_processes = [
255 x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill
256 ]
258 # First try SIGTERM
259 for child in child_processes:
260 log.info("Terminating child PID: %s", child.pid)
261 child.terminate()
263 log.info("Waiting up to %s seconds for processes to exit...", timeout)
264 try:
265 psutil.wait_procs(
266 child_processes, timeout=timeout, callback=lambda x: log.info("Terminated PID %s", x.pid)
267 )
268 except psutil.TimeoutExpired:
269 log.debug("Ran out of time while waiting for processes to exit")
271 # Then SIGKILL
272 child_processes = [
273 x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill
274 ]
275 if child_processes:
276 log.info("SIGKILL processes that did not terminate gracefully")
277 for child in child_processes:
278 log.info("Killing child PID: %s", child.pid)
279 child.kill()
280 child.wait()
283@contextmanager
284def patch_environ(new_env_variables: dict[str, str]) -> Generator[None, None, None]:
285 """
286 Set environment variables in context.
288 After leaving the context, it restores its original state.
289 :param new_env_variables: Environment variables to set
290 """
291 current_env_state = {key: os.environ.get(key) for key in new_env_variables}
292 os.environ.update(new_env_variables)
293 try:
294 yield
295 finally:
296 for key, old_value in current_env_state.items():
297 if old_value is None:
298 if key in os.environ:
299 del os.environ[key]
300 else:
301 os.environ[key] = old_value
304def check_if_pidfile_process_is_running(pid_file: str, process_name: str):
305 """
306 Check if a pidfile already exists and process is still running.
308 If process is dead then pidfile is removed.
310 :param pid_file: path to the pidfile
311 :param process_name: name used in exception if process is up and
312 running
313 """
314 pid_lock_file = PIDLockFile(path=pid_file)
315 # If file exists
316 if pid_lock_file.is_locked():
317 # Read the pid
318 pid = pid_lock_file.read_pid()
319 if pid is None:
320 return
321 try:
322 # Check if process is still running
323 proc = psutil.Process(pid)
324 if proc.is_running():
325 raise AirflowException(f"The {process_name} is already running under PID {pid}.")
326 except psutil.NoSuchProcess:
327 # If process is dead remove the pidfile
328 pid_lock_file.break_lock()
331def set_new_process_group() -> None:
332 """Try to set current process to a new process group.
334 That makes it easy to kill all sub-process of this at the OS-level,
335 rather than having to iterate the child processes.
337 If current process was spawned by system call ``exec()``, the current
338 process group is kept.
339 """
340 if os.getpid() == os.getsid(0):
341 # If PID = SID than process a session leader, and it is not possible to change process group
342 return
344 os.setpgid(0, 0)