Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/providers/standard/utils/python_virtualenv.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

102 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Utilities for creating a virtual environment.""" 

19 

20from __future__ import annotations 

21 

22import logging 

23import os 

24import shlex 

25import shutil 

26import subprocess 

27import warnings 

28from pathlib import Path 

29 

30import jinja2 

31from jinja2 import select_autoescape 

32 

33from airflow.configuration import conf 

34 

35 

36def _is_uv_installed() -> bool: 

37 """ 

38 Verify whether the uv tool is installed by checking if it's included in the system PATH or installed as a package. 

39 

40 :return: True if it is. Whichever way of checking it works, is fine. 

41 """ 

42 return bool(shutil.which("uv")) 

43 

44 

45def _use_uv() -> bool: 

46 """ 

47 Check if the uv tool should be used. 

48 

49 :return: True if uv should be used. 

50 """ 

51 venv_install_method = conf.get("standard", "venv_install_method", fallback="auto").lower() 

52 if venv_install_method == "auto": 

53 return _is_uv_installed() 

54 if venv_install_method == "uv": 

55 return True 

56 return False 

57 

58 

59def _generate_uv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> list[str]: 

60 """Build the command to install the venv via UV.""" 

61 if python_bin == "python" or python_bin == "python3": 

62 python_interpreter_exists = bool(shutil.which(python_bin)) 

63 if not python_interpreter_exists: 

64 warnings.warn( 

65 f"uv trying to use `{python_bin}` as the python interpreter. it could lead to errors if the python interpreter not found in PATH. " 

66 f"please specify python_version in operator.", 

67 UserWarning, 

68 stacklevel=3, 

69 ) 

70 cmd = ["uv", "venv", "--allow-existing", "--seed", "--python", python_bin] 

71 if system_site_packages: 

72 cmd.append("--system-site-packages") 

73 cmd.append(tmp_dir) 

74 return cmd 

75 

76 

77def _generate_venv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> list[str]: 

78 """We are using venv command instead of venv module to allow creation of venv for different python versions.""" 

79 cmd = [python_bin, "-m", "venv", tmp_dir] 

80 if system_site_packages: 

81 cmd.append("--system-site-packages") 

82 return cmd 

83 

84 

85def _generate_uv_install_cmd_from_file( 

86 tmp_dir: str, requirements_file_path: str, pip_install_options: list[str] 

87) -> list[str]: 

88 return [ 

89 "uv", 

90 "pip", 

91 "install", 

92 "--python", 

93 f"{tmp_dir}/bin/python", 

94 *pip_install_options, 

95 "-r", 

96 requirements_file_path, 

97 ] 

98 

99 

100def _generate_pip_install_cmd_from_file( 

101 tmp_dir: str, requirements_file_path: str, pip_install_options: list[str] 

102) -> list[str]: 

103 return [f"{tmp_dir}/bin/pip", "install", *pip_install_options, "-r", requirements_file_path] 

104 

105 

106def _generate_uv_install_cmd_from_list( 

107 tmp_dir: str, requirements: list[str], pip_install_options: list[str] 

108) -> list[str]: 

109 return ["uv", "pip", "install", "--python", f"{tmp_dir}/bin/python", *pip_install_options, *requirements] 

110 

111 

112def _generate_pip_install_cmd_from_list( 

113 tmp_dir: str, requirements: list[str], pip_install_options: list[str] 

114) -> list[str]: 

115 return [f"{tmp_dir}/bin/pip", "install", *pip_install_options, *requirements] 

116 

117 

118def _generate_pip_conf(conf_file: Path, index_urls: list[str]) -> None: 

119 if index_urls: 

120 pip_conf_options = f"index-url = {index_urls[0]}" 

121 if len(index_urls) > 1: 

122 pip_conf_options += f"\nextra-index-url = {' '.join(x for x in index_urls[1:])}" 

123 else: 

124 pip_conf_options = "no-index = true" 

125 conf_file.write_text(f"[global]\n{pip_conf_options}") 

126 

127 

128def _index_urls_to_uv_env_vars(index_urls: list[str] | None = None) -> dict[str, str]: 

129 uv_index_env_vars = {} 

130 if index_urls: 

131 uv_index_env_vars = {"UV_DEFAULT_INDEX": index_urls[0]} 

132 if len(index_urls) > 1: 

133 uv_index_env_vars["UV_INDEX"] = " ".join(x for x in index_urls[1:]) 

134 return uv_index_env_vars 

135 

136 

137def _execute_in_subprocess(cmd: list[str], cwd: str | None = None, env: dict[str, str] | None = None) -> None: 

138 """ 

139 Execute a process and stream output to logger. 

140 

141 :param cmd: command and arguments to run 

142 :param cwd: Current working directory passed to the Popen constructor 

143 :param env: Additional environment variables to set for the subprocess. 

144 """ 

145 log = logging.getLogger(__name__) 

146 

147 log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd)) 

148 with subprocess.Popen( 

149 cmd, 

150 stdout=subprocess.PIPE, 

151 stderr=subprocess.STDOUT, 

152 bufsize=0, 

153 close_fds=False, 

154 cwd=cwd, 

155 env=env, 

156 ) as proc: 

157 log.info("Output:") 

158 if proc.stdout: 

159 with proc.stdout: 

160 for line in iter(proc.stdout.readline, b""): 

161 log.info("%s", line.decode().rstrip()) 

162 

163 exit_code = proc.wait() 

164 if exit_code != 0: 

165 raise subprocess.CalledProcessError(exit_code, cmd) 

166 

167 

168def prepare_virtualenv( 

169 venv_directory: str, 

170 python_bin: str, 

171 system_site_packages: bool, 

172 requirements: list[str] | None = None, 

173 requirements_file_path: str | None = None, 

174 pip_install_options: list[str] | None = None, 

175 index_urls: list[str] | None = None, 

176) -> str: 

177 """ 

178 Create a virtual environment and install the additional python packages. 

179 

180 :param venv_directory: The path for directory where the environment will be created. 

181 :param python_bin: Path to the Python executable. 

182 :param system_site_packages: Whether to include system_site_packages in your virtualenv. 

183 See virtualenv documentation for more information. 

184 :param requirements: List of additional python packages. 

185 :param requirements_file_path: Path to the ``requirements.txt`` file. 

186 :param pip_install_options: a list of pip install options when installing requirements 

187 See 'pip install -h' for available options 

188 :param index_urls: an optional list of index urls to load Python packages from. 

189 If not provided the system pip conf will be used to source packages from. 

190 :return: Path to a binary file with Python in a virtual environment. 

191 """ 

192 if pip_install_options is None: 

193 pip_install_options = [] 

194 

195 if requirements is not None and requirements_file_path is not None: 

196 raise ValueError("Either requirements OR requirements_file_path has to be passed, but not both") 

197 

198 if index_urls is not None: 

199 _generate_pip_conf(Path(venv_directory) / "pip.conf", index_urls) 

200 

201 if _use_uv(): 

202 venv_cmd = _generate_uv_cmd(venv_directory, python_bin, system_site_packages) 

203 _execute_in_subprocess(venv_cmd, env={**os.environ, **_index_urls_to_uv_env_vars(index_urls)}) 

204 else: 

205 venv_cmd = _generate_venv_cmd(venv_directory, python_bin, system_site_packages) 

206 _execute_in_subprocess(venv_cmd) 

207 

208 pip_cmd = None 

209 if requirements is not None and len(requirements) != 0: 

210 if _use_uv(): 

211 pip_cmd = _generate_uv_install_cmd_from_list(venv_directory, requirements, pip_install_options) 

212 else: 

213 pip_cmd = _generate_pip_install_cmd_from_list(venv_directory, requirements, pip_install_options) 

214 if requirements_file_path is not None and requirements_file_path: 

215 if _use_uv(): 

216 pip_cmd = _generate_uv_install_cmd_from_file( 

217 venv_directory, requirements_file_path, pip_install_options 

218 ) 

219 else: 

220 pip_cmd = _generate_pip_install_cmd_from_file( 

221 venv_directory, requirements_file_path, pip_install_options 

222 ) 

223 

224 if pip_cmd: 

225 _execute_in_subprocess(pip_cmd, env={**os.environ, **_index_urls_to_uv_env_vars(index_urls)}) 

226 

227 return f"{venv_directory}/bin/python" 

228 

229 

230def write_python_script( 

231 jinja_context: dict, 

232 filename: str, 

233 render_template_as_native_obj: bool = False, 

234): 

235 """ 

236 Render the python script to a file to execute in the virtual environment. 

237 

238 :param jinja_context: The jinja context variables to unpack and replace with its placeholders in the 

239 template file. 

240 :param filename: The name of the file to dump the rendered script to. 

241 :param render_template_as_native_obj: If ``True``, rendered Jinja template would be converted 

242 to a native Python object 

243 """ 

244 template_loader = jinja2.FileSystemLoader(searchpath=os.path.dirname(__file__)) 

245 template_env: jinja2.Environment 

246 if render_template_as_native_obj: 

247 template_env = jinja2.nativetypes.NativeEnvironment( 

248 loader=template_loader, undefined=jinja2.StrictUndefined 

249 ) 

250 else: 

251 template_env = jinja2.Environment( 

252 loader=template_loader, 

253 undefined=jinja2.StrictUndefined, 

254 autoescape=select_autoescape(["html", "xml"]), 

255 ) 

256 template = template_env.get_template("python_virtualenv_script.jinja2") 

257 template.stream(**jinja_context).dump(filename)