1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Utilities for creating a virtual environment."""
19
20from __future__ import annotations
21
22import logging
23import os
24import shlex
25import shutil
26import subprocess
27import warnings
28from pathlib import Path
29
30import jinja2
31from jinja2 import select_autoescape
32
33from airflow.configuration import conf
34
35
36def _is_uv_installed() -> bool:
37 """
38 Verify whether the uv tool is installed by checking if it's included in the system PATH or installed as a package.
39
40 :return: True if it is. Whichever way of checking it works, is fine.
41 """
42 return bool(shutil.which("uv"))
43
44
45def _use_uv() -> bool:
46 """
47 Check if the uv tool should be used.
48
49 :return: True if uv should be used.
50 """
51 venv_install_method = conf.get("standard", "venv_install_method", fallback="auto").lower()
52 if venv_install_method == "auto":
53 return _is_uv_installed()
54 if venv_install_method == "uv":
55 return True
56 return False
57
58
59def _generate_uv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> list[str]:
60 """Build the command to install the venv via UV."""
61 if python_bin == "python" or python_bin == "python3":
62 python_interpreter_exists = bool(shutil.which(python_bin))
63 if not python_interpreter_exists:
64 warnings.warn(
65 f"uv trying to use `{python_bin}` as the python interpreter. it could lead to errors if the python interpreter not found in PATH. "
66 f"please specify python_version in operator.",
67 UserWarning,
68 stacklevel=3,
69 )
70 cmd = ["uv", "venv", "--allow-existing", "--seed", "--python", python_bin]
71 if system_site_packages:
72 cmd.append("--system-site-packages")
73 cmd.append(tmp_dir)
74 return cmd
75
76
77def _generate_venv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> list[str]:
78 """We are using venv command instead of venv module to allow creation of venv for different python versions."""
79 cmd = [python_bin, "-m", "venv", tmp_dir]
80 if system_site_packages:
81 cmd.append("--system-site-packages")
82 return cmd
83
84
85def _generate_uv_install_cmd_from_file(
86 tmp_dir: str, requirements_file_path: str, pip_install_options: list[str]
87) -> list[str]:
88 return [
89 "uv",
90 "pip",
91 "install",
92 "--python",
93 f"{tmp_dir}/bin/python",
94 *pip_install_options,
95 "-r",
96 requirements_file_path,
97 ]
98
99
100def _generate_pip_install_cmd_from_file(
101 tmp_dir: str, requirements_file_path: str, pip_install_options: list[str]
102) -> list[str]:
103 return [f"{tmp_dir}/bin/pip", "install", *pip_install_options, "-r", requirements_file_path]
104
105
106def _generate_uv_install_cmd_from_list(
107 tmp_dir: str, requirements: list[str], pip_install_options: list[str]
108) -> list[str]:
109 return ["uv", "pip", "install", "--python", f"{tmp_dir}/bin/python", *pip_install_options, *requirements]
110
111
112def _generate_pip_install_cmd_from_list(
113 tmp_dir: str, requirements: list[str], pip_install_options: list[str]
114) -> list[str]:
115 return [f"{tmp_dir}/bin/pip", "install", *pip_install_options, *requirements]
116
117
118def _generate_pip_conf(conf_file: Path, index_urls: list[str]) -> None:
119 if index_urls:
120 pip_conf_options = f"index-url = {index_urls[0]}"
121 if len(index_urls) > 1:
122 pip_conf_options += f"\nextra-index-url = {' '.join(x for x in index_urls[1:])}"
123 else:
124 pip_conf_options = "no-index = true"
125 conf_file.write_text(f"[global]\n{pip_conf_options}")
126
127
128def _index_urls_to_uv_env_vars(index_urls: list[str] | None = None) -> dict[str, str]:
129 uv_index_env_vars = {}
130 if index_urls:
131 uv_index_env_vars = {"UV_DEFAULT_INDEX": index_urls[0]}
132 if len(index_urls) > 1:
133 uv_index_env_vars["UV_INDEX"] = " ".join(x for x in index_urls[1:])
134 return uv_index_env_vars
135
136
137def _execute_in_subprocess(cmd: list[str], cwd: str | None = None, env: dict[str, str] | None = None) -> None:
138 """
139 Execute a process and stream output to logger.
140
141 :param cmd: command and arguments to run
142 :param cwd: Current working directory passed to the Popen constructor
143 :param env: Additional environment variables to set for the subprocess.
144 """
145 log = logging.getLogger(__name__)
146
147 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
148 with subprocess.Popen(
149 cmd,
150 stdout=subprocess.PIPE,
151 stderr=subprocess.STDOUT,
152 bufsize=0,
153 close_fds=False,
154 cwd=cwd,
155 env=env,
156 ) as proc:
157 log.info("Output:")
158 if proc.stdout:
159 with proc.stdout:
160 for line in iter(proc.stdout.readline, b""):
161 log.info("%s", line.decode().rstrip())
162
163 exit_code = proc.wait()
164 if exit_code != 0:
165 raise subprocess.CalledProcessError(exit_code, cmd)
166
167
168def prepare_virtualenv(
169 venv_directory: str,
170 python_bin: str,
171 system_site_packages: bool,
172 requirements: list[str] | None = None,
173 requirements_file_path: str | None = None,
174 pip_install_options: list[str] | None = None,
175 index_urls: list[str] | None = None,
176) -> str:
177 """
178 Create a virtual environment and install the additional python packages.
179
180 :param venv_directory: The path for directory where the environment will be created.
181 :param python_bin: Path to the Python executable.
182 :param system_site_packages: Whether to include system_site_packages in your virtualenv.
183 See virtualenv documentation for more information.
184 :param requirements: List of additional python packages.
185 :param requirements_file_path: Path to the ``requirements.txt`` file.
186 :param pip_install_options: a list of pip install options when installing requirements
187 See 'pip install -h' for available options
188 :param index_urls: an optional list of index urls to load Python packages from.
189 If not provided the system pip conf will be used to source packages from.
190 :return: Path to a binary file with Python in a virtual environment.
191 """
192 if pip_install_options is None:
193 pip_install_options = []
194
195 if requirements is not None and requirements_file_path is not None:
196 raise ValueError("Either requirements OR requirements_file_path has to be passed, but not both")
197
198 if index_urls is not None:
199 _generate_pip_conf(Path(venv_directory) / "pip.conf", index_urls)
200
201 if _use_uv():
202 venv_cmd = _generate_uv_cmd(venv_directory, python_bin, system_site_packages)
203 _execute_in_subprocess(venv_cmd, env={**os.environ, **_index_urls_to_uv_env_vars(index_urls)})
204 else:
205 venv_cmd = _generate_venv_cmd(venv_directory, python_bin, system_site_packages)
206 _execute_in_subprocess(venv_cmd)
207
208 pip_cmd = None
209 if requirements is not None and len(requirements) != 0:
210 if _use_uv():
211 pip_cmd = _generate_uv_install_cmd_from_list(venv_directory, requirements, pip_install_options)
212 else:
213 pip_cmd = _generate_pip_install_cmd_from_list(venv_directory, requirements, pip_install_options)
214 if requirements_file_path is not None and requirements_file_path:
215 if _use_uv():
216 pip_cmd = _generate_uv_install_cmd_from_file(
217 venv_directory, requirements_file_path, pip_install_options
218 )
219 else:
220 pip_cmd = _generate_pip_install_cmd_from_file(
221 venv_directory, requirements_file_path, pip_install_options
222 )
223
224 if pip_cmd:
225 _execute_in_subprocess(pip_cmd, env={**os.environ, **_index_urls_to_uv_env_vars(index_urls)})
226
227 return f"{venv_directory}/bin/python"
228
229
230def write_python_script(
231 jinja_context: dict,
232 filename: str,
233 render_template_as_native_obj: bool = False,
234):
235 """
236 Render the python script to a file to execute in the virtual environment.
237
238 :param jinja_context: The jinja context variables to unpack and replace with its placeholders in the
239 template file.
240 :param filename: The name of the file to dump the rendered script to.
241 :param render_template_as_native_obj: If ``True``, rendered Jinja template would be converted
242 to a native Python object
243 """
244 template_loader = jinja2.FileSystemLoader(searchpath=os.path.dirname(__file__))
245 template_env: jinja2.Environment
246 if render_template_as_native_obj:
247 template_env = jinja2.nativetypes.NativeEnvironment(
248 loader=template_loader, undefined=jinja2.StrictUndefined
249 )
250 else:
251 template_env = jinja2.Environment(
252 loader=template_loader,
253 undefined=jinja2.StrictUndefined,
254 autoescape=select_autoescape(["html", "xml"]),
255 )
256 template = template_env.get_template("python_virtualenv_script.jinja2")
257 template.stream(**jinja_context).dump(filename)