Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/hooks/subprocess.py: 0%
41 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import contextlib
20import os
21import signal
22from collections import namedtuple
23from subprocess import PIPE, STDOUT, Popen
24from tempfile import TemporaryDirectory, gettempdir
26from airflow.hooks.base import BaseHook
28SubprocessResult = namedtuple("SubprocessResult", ["exit_code", "output"])
31class SubprocessHook(BaseHook):
32 """Hook for running processes with the ``subprocess`` module."""
34 def __init__(self) -> None:
35 self.sub_process: Popen[bytes] | None = None
36 super().__init__()
38 def run_command(
39 self,
40 command: list[str],
41 env: dict[str, str] | None = None,
42 output_encoding: str = "utf-8",
43 cwd: str | None = None,
44 ) -> SubprocessResult:
45 """
46 Execute the command.
48 If ``cwd`` is None, execute the command in a temporary directory which will be cleaned afterwards.
49 If ``env`` is not supplied, ``os.environ`` is passed
51 :param command: the command to run
52 :param env: Optional dict containing environment variables to be made available to the shell
53 environment in which ``command`` will be executed. If omitted, ``os.environ`` will be used.
54 Note, that in case you have Sentry configured, original variables from the environment
55 will also be passed to the subprocess with ``SUBPROCESS_`` prefix. See
56 :doc:`/administration-and-deployment/logging-monitoring/errors` for details.
57 :param output_encoding: encoding to use for decoding stdout
58 :param cwd: Working directory to run the command in.
59 If None (default), the command is run in a temporary directory.
60 :return: :class:`namedtuple` containing ``exit_code`` and ``output``, the last line from stderr
61 or stdout
62 """
63 self.log.info("Tmp dir root location: %s", gettempdir())
64 with contextlib.ExitStack() as stack:
65 if cwd is None:
66 cwd = stack.enter_context(TemporaryDirectory(prefix="airflowtmp"))
68 def pre_exec():
69 # Restore default signal disposition and invoke setsid
70 for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"):
71 if hasattr(signal, sig):
72 signal.signal(getattr(signal, sig), signal.SIG_DFL)
73 os.setsid()
75 self.log.info("Running command: %s", command)
77 self.sub_process = Popen(
78 command,
79 stdout=PIPE,
80 stderr=STDOUT,
81 cwd=cwd,
82 env=env if env or env == {} else os.environ,
83 preexec_fn=pre_exec,
84 )
86 self.log.info("Output:")
87 line = ""
88 if self.sub_process is None:
89 raise RuntimeError("The subprocess should be created here and is None!")
90 if self.sub_process.stdout is not None:
91 for raw_line in iter(self.sub_process.stdout.readline, b""):
92 line = raw_line.decode(output_encoding, errors="backslashreplace").rstrip()
93 self.log.info("%s", line)
95 self.sub_process.wait()
97 self.log.info("Command exited with return code %s", self.sub_process.returncode)
98 return_code: int = self.sub_process.returncode
100 return SubprocessResult(exit_code=return_code, output=line)
102 def send_sigterm(self):
103 """Sends SIGTERM signal to ``self.sub_process`` if one exists."""
104 self.log.info("Sending SIGTERM signal to process group")
105 if self.sub_process and hasattr(self.sub_process, "pid"):
106 os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)