Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/process_utils.py: 21%
153 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Utilities for running or stopping processes."""
19from __future__ import annotations
21import errno
22import logging
23import os
24import select
25import shlex
26import signal
27import subprocess
28import sys
30from airflow.utils.platform import IS_WINDOWS
32if not IS_WINDOWS:
33 import tty
34 import termios
35 import pty
37from contextlib import contextmanager
38from typing import Generator
40import psutil
41from lockfile.pidlockfile import PIDLockFile
43from airflow.configuration import conf
44from airflow.exceptions import AirflowException
46log = logging.getLogger(__name__)
48# When killing processes, time to wait after issuing a SIGTERM before issuing a
49# SIGKILL.
50DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint("core", "KILLED_TASK_CLEANUP_TIME")
53def reap_process_group(
54 process_group_id: int,
55 logger,
56 sig: signal.Signals = signal.SIGTERM,
57 timeout: int = DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM,
58) -> dict[int, int]:
59 """
60 Send sig (SIGTERM) to the process group of pid.
62 Tries really hard to terminate all processes in the group (including grandchildren). Will send
63 sig (SIGTERM) to the process group of pid. If any process is alive after timeout
64 a SIGKILL will be send.
66 :param process_group_id: process group id to kill.
67 The process that wants to create the group should run
68 `airflow.utils.process_utils.set_new_process_group()` as the first command
69 it executes which will set group id = process_id. Effectively the process that is the
70 "root" of the group has pid = gid and all other processes in the group have different
71 pids but the same gid (equal the pid of the root process)
72 :param logger: log handler
73 :param sig: signal type
74 :param timeout: how much time a process has to terminate
75 """
76 returncodes = {}
78 def on_terminate(p):
79 logger.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode)
80 returncodes[p.pid] = p.returncode
82 def signal_procs(sig):
83 try:
84 logger.info("Sending the signal %s to group %s", sig, process_group_id)
85 os.killpg(process_group_id, sig)
86 except OSError as err_killpg:
87 # If operation not permitted error is thrown due to run_as_user,
88 # use sudo -n(--non-interactive) to kill the process
89 if err_killpg.errno == errno.EPERM:
90 subprocess.check_call(
91 ["sudo", "-n", "kill", "-" + str(int(sig))]
92 + [str(p.pid) for p in all_processes_in_the_group]
93 )
94 elif err_killpg.errno == errno.ESRCH:
95 # There is a rare condition that the process has not managed yet to change it's process
96 # group. In this case os.killpg fails with ESRCH error
97 # So we additionally send a kill signal to the process itself.
98 logger.info(
99 "Sending the signal %s to process %s as process group is missing.", sig, process_group_id
100 )
101 try:
102 os.kill(process_group_id, sig)
103 except OSError as err_kill:
104 if err_kill.errno == errno.EPERM:
105 subprocess.check_call(["sudo", "-n", "kill", "-" + str(process_group_id)])
106 else:
107 raise
108 else:
109 raise
111 if process_group_id == os.getpgid(0):
112 raise RuntimeError("I refuse to kill myself")
114 try:
115 parent = psutil.Process(process_group_id)
117 all_processes_in_the_group = parent.children(recursive=True)
118 all_processes_in_the_group.append(parent)
119 except psutil.NoSuchProcess:
120 # The process already exited, but maybe it's children haven't.
121 all_processes_in_the_group = []
122 for proc in psutil.process_iter():
123 try:
124 if os.getpgid(proc.pid) == process_group_id and proc.pid != 0:
125 all_processes_in_the_group.append(proc)
126 except OSError:
127 pass
129 logger.info(
130 "Sending %s to group %s. PIDs of all processes in the group: %s",
131 sig,
132 process_group_id,
133 [p.pid for p in all_processes_in_the_group],
134 )
135 try:
136 signal_procs(sig)
137 except OSError as err:
138 # No such process, which means there is no such process group - our job
139 # is done
140 if err.errno == errno.ESRCH:
141 return returncodes
143 _, alive = psutil.wait_procs(all_processes_in_the_group, timeout=timeout, callback=on_terminate)
145 if alive:
146 for proc in alive:
147 logger.warning("process %s did not respond to SIGTERM. Trying SIGKILL", proc)
149 try:
150 signal_procs(signal.SIGKILL)
151 except OSError as err:
152 if err.errno != errno.ESRCH:
153 raise
155 _, alive = psutil.wait_procs(alive, timeout=timeout, callback=on_terminate)
156 if alive:
157 for proc in alive:
158 logger.error("Process %s (%s) could not be killed. Giving up.", proc, proc.pid)
159 return returncodes
162def execute_in_subprocess(cmd: list[str], cwd: str | None = None) -> None:
163 """
164 Execute a process and stream output to logger.
165 :param cmd: command and arguments to run
166 :param cwd: Current working directory passed to the Popen constructor
167 """
168 execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
171def execute_in_subprocess_with_kwargs(cmd: list[str], **kwargs) -> None:
172 """
173 Execute a process and stream output to logger.
175 :param cmd: command and arguments to run
177 All other keyword args will be passed directly to subprocess.Popen
178 """
179 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
180 with subprocess.Popen(
181 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, close_fds=True, **kwargs
182 ) as proc:
183 log.info("Output:")
184 if proc.stdout:
185 with proc.stdout:
186 for line in iter(proc.stdout.readline, b""):
187 log.info("%s", line.decode().rstrip())
189 exit_code = proc.wait()
190 if exit_code != 0:
191 raise subprocess.CalledProcessError(exit_code, cmd)
194def execute_interactive(cmd: list[str], **kwargs) -> None:
195 """
196 Run the new command as a subprocess.
198 Runs the new command as a subprocess and ensures that the terminal's state is restored to its original
199 state after the process is completed e.g. if the subprocess hides the cursor, it will be restored after
200 the process is completed.
201 """
202 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
204 old_tty = termios.tcgetattr(sys.stdin)
205 tty.setraw(sys.stdin.fileno())
207 # open pseudo-terminal to interact with subprocess
208 primary_fd, secondary_fd = pty.openpty()
209 try:
210 # use os.setsid() make it run in a new process group, or bash job control will not be enabled
211 with subprocess.Popen(
212 cmd,
213 stdin=secondary_fd,
214 stdout=secondary_fd,
215 stderr=secondary_fd,
216 universal_newlines=True,
217 **kwargs,
218 ) as proc:
219 while proc.poll() is None:
220 readable_fbs, _, _ = select.select([sys.stdin, primary_fd], [], [])
221 if sys.stdin in readable_fbs:
222 input_data = os.read(sys.stdin.fileno(), 10240)
223 os.write(primary_fd, input_data)
224 if primary_fd in readable_fbs:
225 output_data = os.read(primary_fd, 10240)
226 if output_data:
227 os.write(sys.stdout.fileno(), output_data)
228 finally:
229 # restore tty settings back
230 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
233def kill_child_processes_by_pids(pids_to_kill: list[int], timeout: int = 5) -> None:
234 """
235 Kills child processes for the current process.
237 First, it sends the SIGTERM signal, and after the time specified by the `timeout` parameter, sends
238 the SIGKILL signal, if the process is still alive.
240 :param pids_to_kill: List of PID to be killed.
241 :param timeout: The time to wait before sending the SIGKILL signal.
242 """
243 this_process = psutil.Process(os.getpid())
244 # Only check child processes to ensure that we don't have a case
245 # where we kill the wrong process because a child process died
246 # but the PID got reused.
247 child_processes = [
248 x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill
249 ]
251 # First try SIGTERM
252 for child in child_processes:
253 log.info("Terminating child PID: %s", child.pid)
254 child.terminate()
256 log.info("Waiting up to %s seconds for processes to exit...", timeout)
257 try:
258 psutil.wait_procs(
259 child_processes, timeout=timeout, callback=lambda x: log.info("Terminated PID %s", x.pid)
260 )
261 except psutil.TimeoutExpired:
262 log.debug("Ran out of time while waiting for processes to exit")
264 # Then SIGKILL
265 child_processes = [
266 x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill
267 ]
268 if child_processes:
269 log.info("SIGKILL processes that did not terminate gracefully")
270 for child in child_processes:
271 log.info("Killing child PID: %s", child.pid)
272 child.kill()
273 child.wait()
276@contextmanager
277def patch_environ(new_env_variables: dict[str, str]) -> Generator[None, None, None]:
278 """
279 Set environment variables in context.
281 After leaving the context, it restores its original state.
282 :param new_env_variables: Environment variables to set
283 """
284 current_env_state = {key: os.environ.get(key) for key in new_env_variables.keys()}
285 os.environ.update(new_env_variables)
286 try:
287 yield
288 finally:
289 for key, old_value in current_env_state.items():
290 if old_value is None:
291 if key in os.environ:
292 del os.environ[key]
293 else:
294 os.environ[key] = old_value
297def check_if_pidfile_process_is_running(pid_file: str, process_name: str):
298 """
299 Checks if a pidfile already exists and process is still running.
300 If process is dead then pidfile is removed.
302 :param pid_file: path to the pidfile
303 :param process_name: name used in exception if process is up and
304 running
305 """
306 pid_lock_file = PIDLockFile(path=pid_file)
307 # If file exists
308 if pid_lock_file.is_locked():
309 # Read the pid
310 pid = pid_lock_file.read_pid()
311 if pid is None:
312 return
313 try:
314 # Check if process is still running
315 proc = psutil.Process(pid)
316 if proc.is_running():
317 raise AirflowException(f"The {process_name} is already running under PID {pid}.")
318 except psutil.NoSuchProcess:
319 # If process is dead remove the pidfile
320 pid_lock_file.break_lock()
323def set_new_process_group() -> None:
324 """
325 Try to set current process to a new process group.
326 That makes it easy to kill all sub-process of this at the OS-level,
327 rather than having to iterate the child processes.
328 If current process spawn by system call ``exec()`` than keep current process group
329 """
330 if os.getpid() == os.getsid(0):
331 # If PID = SID than process a session leader, and it is not possible to change process group
332 return
334 os.setpgid(0, 0)