Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/providers/standard/operators/python.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

482 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import inspect 

21import json 

22import logging 

23import os 

24import re 

25import shutil 

26import subprocess 

27import sys 

28import textwrap 

29import types 

30import warnings 

31from abc import ABCMeta, abstractmethod 

32from collections.abc import Callable, Collection, Container, Iterable, Mapping, Sequence 

33from functools import cache 

34from itertools import chain 

35from pathlib import Path 

36from tempfile import TemporaryDirectory 

37from typing import TYPE_CHECKING, Any, NamedTuple, cast 

38 

39import lazy_object_proxy 

40from packaging.requirements import InvalidRequirement, Requirement 

41from packaging.specifiers import InvalidSpecifier 

42from packaging.version import InvalidVersion 

43 

44from airflow.exceptions import ( 

45 AirflowConfigException, 

46 AirflowProviderDeprecationWarning, 

47 DeserializingResultError, 

48) 

49from airflow.models.variable import Variable 

50from airflow.providers.common.compat.sdk import AirflowException, AirflowSkipException, context_merge 

51from airflow.providers.standard.hooks.package_index import PackageIndexHook 

52from airflow.providers.standard.utils.python_virtualenv import ( 

53 _execute_in_subprocess, 

54 prepare_virtualenv, 

55 write_python_script, 

56) 

57from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator 

58from airflow.utils import hashlib_wrapper 

59from airflow.utils.file import get_unique_dag_module_name 

60from airflow.utils.operator_helpers import KeywordParameters 

61 

62if AIRFLOW_V_3_0_PLUS: 

63 from airflow.providers.standard.operators.branch import BaseBranchOperator 

64 from airflow.providers.standard.utils.skipmixin import SkipMixin 

65else: 

66 from airflow.models.skipmixin import SkipMixin 

67 from airflow.operators.branch import BaseBranchOperator # type: ignore[no-redef] 

68 

69 

70log = logging.getLogger(__name__) 

71 

72if TYPE_CHECKING: 

73 from typing import Literal 

74 

75 from pendulum.datetime import DateTime 

76 

77 from airflow.providers.common.compat.sdk import Context 

78 from airflow.sdk.execution_time.callback_runner import ExecutionCallableRunner 

79 from airflow.sdk.execution_time.context import OutletEventAccessorsProtocol 

80 

81 _SerializerTypeDef = Literal["pickle", "cloudpickle", "dill"] 

82 

83 

84@cache 

85def _parse_version_info(text: str) -> tuple[int, int, int, str, int]: 

86 """Parse python version info from a text.""" 

87 parts = text.strip().split(".") 

88 if len(parts) != 5: 

89 msg = f"Invalid Python version info, expected 5 components separated by '.', but got {text!r}." 

90 raise ValueError(msg) 

91 try: 

92 return int(parts[0]), int(parts[1]), int(parts[2]), parts[3], int(parts[4]) 

93 except ValueError: 

94 msg = f"Unable to convert parts {parts} parsed from {text!r} to (int, int, int, str, int)." 

95 raise ValueError(msg) from None 

96 

97 

98class _PythonVersionInfo(NamedTuple): 

99 """Provide the same interface as ``sys.version_info``.""" 

100 

101 major: int 

102 minor: int 

103 micro: int 

104 releaselevel: str 

105 serial: int 

106 

107 @classmethod 

108 def from_executable(cls, executable: str) -> _PythonVersionInfo: 

109 """Parse python version info from an executable.""" 

110 cmd = [executable, "-c", 'import sys; print(".".join(map(str, sys.version_info)))'] 

111 try: 

112 result = subprocess.check_output(cmd, text=True) 

113 except Exception as e: 

114 raise ValueError(f"Error while executing command {cmd}: {e}") 

115 return cls(*_parse_version_info(result.strip())) 

116 

117 

118class PythonOperator(BaseOperator): 

119 """ 

120 Executes a Python callable. 

121 

122 .. seealso:: 

123 For more information on how to use this operator, take a look at the guide: 

124 :ref:`howto/operator:PythonOperator` 

125 

126 When running your callable, Airflow will pass a set of keyword arguments that can be used in your 

127 function. This set of kwargs correspond exactly to what you can use in your jinja templates. 

128 For this to work, you need to define ``**kwargs`` in your function header, or you can add directly the 

129 keyword arguments you would like to get - for example with the below code your callable will get 

130 the values of ``ti`` context variables. 

131 

132 With explicit arguments: 

133 

134 .. code-block:: python 

135 

136 def my_python_callable(ti): 

137 pass 

138 

139 With kwargs: 

140 

141 .. code-block:: python 

142 

143 def my_python_callable(**kwargs): 

144 ti = kwargs["ti"] 

145 

146 

147 :param python_callable: A reference to an object that is callable 

148 :param op_args: a list of positional arguments that will get unpacked when 

149 calling your callable 

150 :param op_kwargs: a dictionary of keyword arguments that will get unpacked 

151 in your function 

152 :param templates_dict: a dictionary where the values are templates that 

153 will get templated by the Airflow engine sometime between 

154 ``__init__`` and ``execute`` takes place and are made available 

155 in your callable's context after the template has been applied. (templated) 

156 :param templates_exts: a list of file extensions to resolve while 

157 processing templated fields, for examples ``['.sql', '.hql']`` 

158 :param show_return_value_in_logs: a bool value whether to show return_value 

159 logs. Defaults to True, which allows return value log output. 

160 It can be set to False to prevent log output of return value when you return huge data 

161 such as transmission a large amount of XCom to TaskAPI. 

162 """ 

163 

164 template_fields: Sequence[str] = ("templates_dict", "op_args", "op_kwargs") 

165 template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"} 

166 BLUE = "#ffefeb" 

167 ui_color = BLUE 

168 

169 # since we won't mutate the arguments, we should just do the shallow copy 

170 # there are some cases we can't deepcopy the objects(e.g protobuf). 

171 shallow_copy_attrs: Sequence[str] = ("python_callable", "op_kwargs") 

172 

173 def __init__( 

174 self, 

175 *, 

176 python_callable: Callable, 

177 op_args: Collection[Any] | None = None, 

178 op_kwargs: Mapping[str, Any] | None = None, 

179 templates_dict: dict[str, Any] | None = None, 

180 templates_exts: Sequence[str] | None = None, 

181 show_return_value_in_logs: bool = True, 

182 **kwargs, 

183 ) -> None: 

184 super().__init__(**kwargs) 

185 if not callable(python_callable): 

186 raise AirflowException("`python_callable` param must be callable") 

187 self.python_callable = python_callable 

188 self.op_args = op_args or () 

189 self.op_kwargs = op_kwargs or {} 

190 self.templates_dict = templates_dict 

191 if templates_exts: 

192 self.template_ext = templates_exts 

193 self.show_return_value_in_logs = show_return_value_in_logs 

194 

195 def execute(self, context: Context) -> Any: 

196 context_merge(context, self.op_kwargs, templates_dict=self.templates_dict) 

197 self.op_kwargs = self.determine_kwargs(context) 

198 

199 # This needs to be lazy because subclasses may implement execute_callable 

200 # by running a separate process that can't use the eager result. 

201 def __prepare_execution() -> tuple[ExecutionCallableRunner, OutletEventAccessorsProtocol] | None: 

202 if AIRFLOW_V_3_0_PLUS: 

203 from airflow.sdk.execution_time.callback_runner import create_executable_runner 

204 from airflow.sdk.execution_time.context import context_get_outlet_events 

205 

206 return create_executable_runner, context_get_outlet_events(context) 

207 from airflow.utils.context import context_get_outlet_events # type: ignore 

208 from airflow.utils.operator_helpers import ExecutionCallableRunner # type: ignore 

209 

210 return ExecutionCallableRunner, context_get_outlet_events(context) 

211 

212 self.__prepare_execution = __prepare_execution 

213 

214 return_value = self.execute_callable() 

215 if self.show_return_value_in_logs: 

216 self.log.info("Done. Returned value was: %s", return_value) 

217 else: 

218 self.log.info("Done. Returned value not shown") 

219 

220 return return_value 

221 

222 def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: 

223 return KeywordParameters.determine(self.python_callable, self.op_args, context).unpacking() 

224 

225 __prepare_execution: Callable[[], tuple[ExecutionCallableRunner, OutletEventAccessorsProtocol] | None] 

226 

227 def execute_callable(self) -> Any: 

228 """ 

229 Call the python callable with the given arguments. 

230 

231 :return: the return value of the call. 

232 """ 

233 if (execution_preparation := self.__prepare_execution()) is None: 

234 return self.python_callable(*self.op_args, **self.op_kwargs) 

235 create_execution_runner, asset_events = execution_preparation 

236 runner = create_execution_runner(self.python_callable, asset_events, logger=self.log) 

237 return runner.run(*self.op_args, **self.op_kwargs) 

238 

239 

240class BranchPythonOperator(BaseBranchOperator, PythonOperator): 

241 """ 

242 A workflow can "branch" or follow a path after the execution of this task. 

243 

244 It derives the PythonOperator and expects a Python function that returns 

245 a single task_id, a single task_group_id, or a list of task_ids and/or 

246 task_group_ids to follow. The task_id(s) and/or task_group_id(s) returned 

247 should point to a task or task group directly downstream from {self}. All 

248 other "branches" or directly downstream tasks are marked with a state of 

249 ``skipped`` so that these paths can't move forward. The ``skipped`` states 

250 are propagated downstream to allow for the DAG state to fill up and 

251 the DAG run's state to be inferred. 

252 """ 

253 

254 def choose_branch(self, context: Context) -> str | Iterable[str]: 

255 return PythonOperator.execute(self, context) 

256 

257 

258class ShortCircuitOperator(PythonOperator, SkipMixin): 

259 """ 

260 Allows a pipeline to continue based on the result of a ``python_callable``. 

261 

262 The ShortCircuitOperator is derived from the PythonOperator and evaluates the result of a 

263 ``python_callable``. If the returned result is False or a falsy value, the pipeline will be 

264 short-circuited. Downstream tasks will be marked with a state of "skipped" based on the short-circuiting 

265 mode configured. If the returned result is True or a truthy value, downstream tasks proceed as normal and 

266 an ``XCom`` of the returned result is pushed. 

267 

268 The short-circuiting can be configured to either respect or ignore the ``trigger_rule`` set for 

269 downstream tasks. If ``ignore_downstream_trigger_rules`` is set to True, the default setting, all 

270 downstream tasks are skipped without considering the ``trigger_rule`` defined for tasks. However, if this 

271 parameter is set to False, the direct downstream tasks are skipped but the specified ``trigger_rule`` for 

272 other subsequent downstream tasks are respected. In this mode, the operator assumes the direct downstream 

273 tasks were purposely meant to be skipped but perhaps not other subsequent tasks. 

274 

275 .. seealso:: 

276 For more information on how to use this operator, take a look at the guide: 

277 :ref:`howto/operator:ShortCircuitOperator` 

278 

279 :param ignore_downstream_trigger_rules: If set to True, all downstream tasks from this operator task will 

280 be skipped. This is the default behavior. If set to False, the direct, downstream task(s) will be 

281 skipped but the ``trigger_rule`` defined for all other downstream tasks will be respected. 

282 """ 

283 

284 inherits_from_skipmixin = True 

285 

286 def __init__(self, *, ignore_downstream_trigger_rules: bool = True, **kwargs) -> None: 

287 super().__init__(**kwargs) 

288 self.ignore_downstream_trigger_rules = ignore_downstream_trigger_rules 

289 

290 def execute(self, context: Context) -> Any: 

291 condition = super().execute(context) 

292 self.log.info("Condition result is %s", condition) 

293 

294 if condition: 

295 self.log.info("Proceeding with downstream tasks...") 

296 return condition 

297 

298 if not self.downstream_task_ids: 

299 self.log.info("No downstream tasks; nothing to do.") 

300 return condition 

301 

302 dag_run = context["dag_run"] 

303 

304 def get_tasks_to_skip(): 

305 if self.ignore_downstream_trigger_rules is True: 

306 tasks = context["task"].get_flat_relatives(upstream=False) 

307 else: 

308 tasks = context["task"].get_direct_relatives(upstream=False) 

309 for t in tasks: 

310 if not t.is_teardown: 

311 yield t 

312 

313 to_skip = get_tasks_to_skip() 

314 

315 # this lets us avoid an intermediate list unless debug logging 

316 if self.log.getEffectiveLevel() <= logging.DEBUG: 

317 self.log.debug("Downstream task IDs %s", to_skip := list(get_tasks_to_skip())) 

318 

319 self.log.info("Skipping downstream tasks") 

320 if AIRFLOW_V_3_0_PLUS: 

321 self.skip( 

322 ti=context["ti"], 

323 tasks=to_skip, 

324 ) 

325 else: 

326 if to_skip: 

327 self.skip( 

328 dag_run=context["dag_run"], 

329 tasks=to_skip, 

330 execution_date=cast("DateTime", dag_run.logical_date), # type: ignore[call-arg] 

331 map_index=context["ti"].map_index, 

332 ) 

333 

334 self.log.info("Done.") 

335 # returns the result of the super execute method as it is instead of returning None 

336 return condition 

337 

338 

339def _load_pickle(): 

340 import pickle 

341 

342 return pickle 

343 

344 

345def _load_dill(): 

346 try: 

347 import dill 

348 except ModuleNotFoundError: 

349 log.error("Unable to import `dill` module. Please please make sure that it installed.") 

350 raise 

351 return dill 

352 

353 

354def _load_cloudpickle(): 

355 try: 

356 import cloudpickle 

357 except ModuleNotFoundError: 

358 log.error( 

359 "Unable to import `cloudpickle` module. " 

360 "Please install it with: pip install 'apache-airflow[cloudpickle]'" 

361 ) 

362 raise 

363 return cloudpickle 

364 

365 

366_SERIALIZERS: dict[_SerializerTypeDef, Any] = { 

367 "pickle": lazy_object_proxy.Proxy(_load_pickle), 

368 "dill": lazy_object_proxy.Proxy(_load_dill), 

369 "cloudpickle": lazy_object_proxy.Proxy(_load_cloudpickle), 

370} 

371 

372 

373class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): 

374 BASE_SERIALIZABLE_CONTEXT_KEYS = { 

375 "ds", 

376 "ds_nodash", 

377 "expanded_ti_count", 

378 "inlets", 

379 "outlets", 

380 "run_id", 

381 "task_instance_key_str", 

382 "test_mode", 

383 "ts", 

384 "ts_nodash", 

385 "ts_nodash_with_tz", 

386 # The following should be removed when Airflow 2 support is dropped. 

387 "next_ds", 

388 "next_ds_nodash", 

389 "prev_ds", 

390 "prev_ds_nodash", 

391 "tomorrow_ds", 

392 "tomorrow_ds_nodash", 

393 "yesterday_ds", 

394 "yesterday_ds_nodash", 

395 } 

396 if AIRFLOW_V_3_0_PLUS: 

397 BASE_SERIALIZABLE_CONTEXT_KEYS.add("task_reschedule_count") 

398 

399 PENDULUM_SERIALIZABLE_CONTEXT_KEYS = { 

400 "data_interval_end", 

401 "data_interval_start", 

402 "logical_date", 

403 "prev_data_interval_end_success", 

404 "prev_data_interval_start_success", 

405 "prev_start_date_success", 

406 "prev_end_date_success", 

407 # The following should be removed when Airflow 2 support is dropped. 

408 "execution_date", 

409 "next_execution_date", 

410 "prev_execution_date", 

411 "prev_execution_date_success", 

412 } 

413 

414 AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = { 

415 "macros", 

416 "conf", 

417 "dag", 

418 "dag_run", 

419 "task", 

420 "params", 

421 "triggering_asset_events", 

422 # The following should be removed when Airflow 2 support is dropped. 

423 "triggering_dataset_events", 

424 } 

425 

426 def __init__( 

427 self, 

428 *, 

429 python_callable: Callable, 

430 serializer: _SerializerTypeDef | None = None, 

431 op_args: Collection[Any] | None = None, 

432 op_kwargs: Mapping[str, Any] | None = None, 

433 string_args: Iterable[str] | None = None, 

434 templates_dict: dict | None = None, 

435 templates_exts: list[str] | None = None, 

436 expect_airflow: bool = True, 

437 skip_on_exit_code: int | Container[int] | None = None, 

438 env_vars: dict[str, str] | None = None, 

439 inherit_env: bool = True, 

440 **kwargs, 

441 ): 

442 if ( 

443 not isinstance(python_callable, types.FunctionType) 

444 or isinstance(python_callable, types.LambdaType) 

445 and python_callable.__name__ == "<lambda>" 

446 ): 

447 raise ValueError(f"{type(self).__name__} only supports functions for python_callable arg") 

448 if inspect.isgeneratorfunction(python_callable): 

449 raise ValueError(f"{type(self).__name__} does not support using 'yield' in python_callable") 

450 super().__init__( 

451 python_callable=python_callable, 

452 op_args=op_args, 

453 op_kwargs=op_kwargs, 

454 templates_dict=templates_dict, 

455 templates_exts=templates_exts, 

456 **kwargs, 

457 ) 

458 self.string_args = string_args or [] 

459 

460 serializer = serializer or "pickle" 

461 if serializer not in _SERIALIZERS: 

462 msg = ( 

463 f"Unsupported serializer {serializer!r}. Expected one of {', '.join(map(repr, _SERIALIZERS))}" 

464 ) 

465 raise AirflowException(msg) 

466 

467 self.pickling_library = _SERIALIZERS[serializer] 

468 self.serializer: _SerializerTypeDef = serializer 

469 

470 self.expect_airflow = expect_airflow 

471 self.skip_on_exit_code = ( 

472 skip_on_exit_code 

473 if isinstance(skip_on_exit_code, Container) 

474 else [skip_on_exit_code] 

475 if skip_on_exit_code is not None 

476 else [] 

477 ) 

478 self.env_vars = env_vars 

479 self.inherit_env = inherit_env 

480 

481 @abstractmethod 

482 def _iter_serializable_context_keys(self): 

483 pass 

484 

485 def execute(self, context: Context) -> Any: 

486 serializable_keys = set(self._iter_serializable_context_keys()) 

487 new = {k: v for k, v in context.items() if k in serializable_keys} 

488 serializable_context = cast("Context", new) 

489 # Store bundle_path for subprocess execution 

490 self._bundle_path = self._get_bundle_path_from_context(context) 

491 return super().execute(context=serializable_context) 

492 

493 def _get_bundle_path_from_context(self, context: Context) -> str | None: 

494 """ 

495 Extract bundle_path from the task instance's bundle_instance. 

496 

497 :param context: The task execution context 

498 :return: Path to the bundle root directory, or None if not in a bundle 

499 """ 

500 if not AIRFLOW_V_3_0_PLUS: 

501 return None 

502 

503 # In Airflow 3.x, the RuntimeTaskInstance has a bundle_instance attribute 

504 # that contains the bundle information including its path 

505 ti = context["ti"] 

506 if bundle_instance := getattr(ti, "bundle_instance", None): 

507 return bundle_instance.path 

508 

509 return None 

510 

511 def get_python_source(self): 

512 """Return the source of self.python_callable.""" 

513 return textwrap.dedent(inspect.getsource(self.python_callable)) 

514 

515 def _write_args(self, file: Path): 

516 def resolve_proxies(obj): 

517 """Recursively replaces lazy_object_proxy.Proxy instances with their resolved values.""" 

518 if isinstance(obj, lazy_object_proxy.Proxy): 

519 return obj.__wrapped__ # force evaluation 

520 if isinstance(obj, dict): 

521 return {k: resolve_proxies(v) for k, v in obj.items()} 

522 if isinstance(obj, list): 

523 return [resolve_proxies(v) for v in obj] 

524 return obj 

525 

526 if self.op_args or self.op_kwargs: 

527 self.log.info("Use %r as serializer.", self.serializer) 

528 file.write_bytes( 

529 self.pickling_library.dumps({"args": self.op_args, "kwargs": resolve_proxies(self.op_kwargs)}) 

530 ) 

531 

532 def _write_string_args(self, file: Path): 

533 file.write_text("\n".join(map(str, self.string_args))) 

534 

535 def _read_result(self, path: Path): 

536 if path.stat().st_size == 0: 

537 return None 

538 try: 

539 return self.pickling_library.loads(path.read_bytes()) 

540 except ValueError as value_error: 

541 raise DeserializingResultError() from value_error 

542 

543 def __deepcopy__(self, memo): 

544 # module objects can't be copied _at all__ 

545 memo[id(self.pickling_library)] = self.pickling_library 

546 return super().__deepcopy__(memo) 

547 

548 def _execute_python_callable_in_subprocess(self, python_path: Path): 

549 with TemporaryDirectory(prefix="venv-call") as tmp: 

550 tmp_dir = Path(tmp) 

551 op_kwargs: dict[str, Any] = dict(self.op_kwargs) 

552 if self.templates_dict: 

553 op_kwargs["templates_dict"] = self.templates_dict 

554 input_path = tmp_dir / "script.in" 

555 output_path = tmp_dir / "script.out" 

556 string_args_path = tmp_dir / "string_args.txt" 

557 script_path = tmp_dir / "script.py" 

558 termination_log_path = tmp_dir / "termination.log" 

559 airflow_context_path = tmp_dir / "airflow_context.json" 

560 

561 self._write_args(input_path) 

562 self._write_string_args(string_args_path) 

563 

564 jinja_context = { 

565 "op_args": self.op_args, 

566 "op_kwargs": op_kwargs, 

567 "expect_airflow": self.expect_airflow, 

568 "pickling_library": self.serializer, 

569 "python_callable": self.python_callable.__name__, 

570 "python_callable_source": self.get_python_source(), 

571 } 

572 

573 if inspect.getfile(self.python_callable) == self.dag.fileloc: 

574 jinja_context["modified_dag_module_name"] = get_unique_dag_module_name(self.dag.fileloc) 

575 

576 write_python_script( 

577 jinja_context=jinja_context, 

578 filename=os.fspath(script_path), 

579 render_template_as_native_obj=self.dag.render_template_as_native_obj, 

580 ) 

581 

582 env_vars = dict(os.environ) if self.inherit_env else {} 

583 if fd := os.getenv("__AIRFLOW_SUPERVISOR_FD"): 

584 env_vars["__AIRFLOW_SUPERVISOR_FD"] = fd 

585 if self.env_vars: 

586 env_vars.update(self.env_vars) 

587 

588 # Add bundle_path to PYTHONPATH for subprocess to import Dag bundle modules 

589 if self._bundle_path: 

590 bundle_path = self._bundle_path 

591 existing_pythonpath = env_vars.get("PYTHONPATH", "") 

592 if existing_pythonpath: 

593 # Append bundle_path after existing PYTHONPATH 

594 env_vars["PYTHONPATH"] = f"{existing_pythonpath}{os.pathsep}{bundle_path}" 

595 else: 

596 env_vars["PYTHONPATH"] = bundle_path 

597 

598 try: 

599 cmd: list[str] = [ 

600 os.fspath(python_path), 

601 os.fspath(script_path), 

602 os.fspath(input_path), 

603 os.fspath(output_path), 

604 os.fspath(string_args_path), 

605 os.fspath(termination_log_path), 

606 os.fspath(airflow_context_path), 

607 ] 

608 _execute_in_subprocess( 

609 cmd=cmd, 

610 env=env_vars, 

611 ) 

612 except subprocess.CalledProcessError as e: 

613 if e.returncode in self.skip_on_exit_code: 

614 raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.") 

615 if termination_log_path.exists() and termination_log_path.stat().st_size > 0: 

616 error_msg = f"Process returned non-zero exit status {e.returncode}.\n" 

617 with open(termination_log_path) as file: 

618 error_msg += file.read() 

619 raise AirflowException(error_msg) from None 

620 raise 

621 

622 if 0 in self.skip_on_exit_code: 

623 raise AirflowSkipException("Process exited with code 0. Skipping.") 

624 

625 return self._read_result(output_path) 

626 

627 def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: 

628 keyword_params = KeywordParameters.determine(self.python_callable, self.op_args, context) 

629 if AIRFLOW_V_3_0_PLUS: 

630 return keyword_params.unpacking() 

631 return keyword_params.serializing() # type: ignore[attr-defined] 

632 

633 

634class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): 

635 """ 

636 Run a function in a virtualenv that is created and destroyed automatically. 

637 

638 The function (has certain caveats) must be defined using def, and not be 

639 part of a class. All imports must happen inside the function 

640 and no variables outside the scope may be referenced. A global scope 

641 variable named virtualenv_string_args will be available (populated by 

642 string_args). In addition, one can pass stuff through op_args and op_kwargs, and one 

643 can use a return value. 

644 Note that if your virtualenv runs in a different Python major version than Airflow, 

645 you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to 

646 Airflow through plugins. You can use string_args though. 

647 

648 .. seealso:: 

649 For more information on how to use this operator, take a look at the guide: 

650 :ref:`howto/operator:PythonVirtualenvOperator` 

651 

652 :param python_callable: A python function with no references to outside variables, 

653 defined with def, which will be run in a virtual environment. 

654 :param requirements: Either a list of requirement strings, or a (templated) 

655 "requirements file" as specified by pip. 

656 :param python_version: The Python version to run the virtual environment with. Note that 

657 both 2 and 2.7 are acceptable forms. 

658 :param serializer: Which serializer use to serialize the args and result. It can be one of the following: 

659 

660 - ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library. 

661 - ``"cloudpickle"``: Use cloudpickle for serialize more complex types, 

662 this requires to include cloudpickle in your requirements. 

663 - ``"dill"``: Use dill for serialize more complex types, 

664 this requires to include dill in your requirements. 

665 :param system_site_packages: Whether to include 

666 system_site_packages in your virtual environment. 

667 See virtualenv documentation for more information. 

668 :param pip_install_options: a list of pip install options when installing requirements 

669 See 'pip install -h' for available options 

670 :param op_args: A list of positional arguments to pass to python_callable. 

671 :param op_kwargs: A dict of keyword arguments to pass to python_callable. 

672 :param string_args: Strings that are present in the global var virtualenv_string_args, 

673 available to python_callable at runtime as a list[str]. Note that args are split 

674 by newline. 

675 :param templates_dict: a dictionary where the values are templates that 

676 will get templated by the Airflow engine sometime between 

677 ``__init__`` and ``execute`` takes place and are made available 

678 in your callable's context after the template has been applied 

679 :param templates_exts: a list of file extensions to resolve while 

680 processing templated fields, for examples ``['.sql', '.hql']`` 

681 :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator 

682 will raise warning if Airflow is not installed, and it will attempt to load Airflow 

683 macros when starting. 

684 :param skip_on_exit_code: If python_callable exits with this exit code, leave the task 

685 in ``skipped`` state (default: None). If set to ``None``, any non-zero 

686 exit code will be treated as a failure. 

687 :param index_urls: an optional list of index urls to load Python packages from. 

688 If not provided the system pip conf will be used to source packages from. 

689 :param index_urls_from_connection_ids: An optional list of ``PackageIndex`` connection IDs. 

690 Will be appended to ``index_urls``. 

691 :param venv_cache_path: Optional path to the virtual environment parent folder in which the 

692 virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be replaced 

693 with a checksum of requirements. If not provided the virtual environment will be created and deleted 

694 in a temp folder for every execution. 

695 :param env_vars: A dictionary containing additional environment variables to set for the virtual 

696 environment when it is executed. 

697 :param inherit_env: Whether to inherit the current environment variables when executing the virtual 

698 environment. If set to ``True``, the virtual environment will inherit the environment variables 

699 of the parent process (``os.environ``). If set to ``False``, the virtual environment will be 

700 executed with a clean environment. 

701 """ 

702 

703 template_fields: Sequence[str] = tuple( 

704 {"requirements", "index_urls", "index_urls_from_connection_ids", "venv_cache_path"}.union( 

705 PythonOperator.template_fields 

706 ) 

707 ) 

708 template_ext: Sequence[str] = (".txt",) 

709 

710 def __init__( 

711 self, 

712 *, 

713 python_callable: Callable, 

714 requirements: None | Iterable[str] | str = None, 

715 python_version: str | None = None, 

716 serializer: _SerializerTypeDef | None = None, 

717 system_site_packages: bool = True, 

718 pip_install_options: list[str] | None = None, 

719 op_args: Collection[Any] | None = None, 

720 op_kwargs: Mapping[str, Any] | None = None, 

721 string_args: Iterable[str] | None = None, 

722 templates_dict: dict | None = None, 

723 templates_exts: list[str] | None = None, 

724 expect_airflow: bool = True, 

725 skip_on_exit_code: int | Container[int] | None = None, 

726 index_urls: None | Collection[str] | str = None, 

727 index_urls_from_connection_ids: None | Collection[str] | str = None, 

728 venv_cache_path: None | os.PathLike[str] = None, 

729 env_vars: dict[str, str] | None = None, 

730 inherit_env: bool = True, 

731 **kwargs, 

732 ): 

733 if ( 

734 python_version 

735 and str(python_version)[0] != str(sys.version_info.major) 

736 and (op_args or op_kwargs) 

737 ): 

738 raise AirflowException( 

739 "Passing op_args or op_kwargs is not supported across different Python " 

740 "major versions for PythonVirtualenvOperator. Please use string_args." 

741 f"Sys version: {sys.version_info}. Virtual environment version: {python_version}" 

742 ) 

743 if python_version is not None and not isinstance(python_version, str): 

744 raise AirflowException( 

745 "Passing non-string types (e.g. int or float) as python_version not supported" 

746 ) 

747 if not requirements: 

748 self.requirements: list[str] = [] 

749 elif isinstance(requirements, str): 

750 self.requirements = [requirements] 

751 else: 

752 self.requirements = list(requirements) 

753 self.python_version = python_version 

754 self.system_site_packages = system_site_packages 

755 self.pip_install_options = pip_install_options 

756 if isinstance(index_urls, str): 

757 self.index_urls: list[str] | None = [index_urls] 

758 elif isinstance(index_urls, Collection): 

759 self.index_urls = list(index_urls) 

760 else: 

761 self.index_urls = None 

762 if isinstance(index_urls_from_connection_ids, str): 

763 self.index_urls_from_connection_ids: list[str] | None = [index_urls_from_connection_ids] 

764 elif isinstance(index_urls_from_connection_ids, Collection): 

765 self.index_urls_from_connection_ids = list(index_urls_from_connection_ids) 

766 else: 

767 self.index_urls_from_connection_ids = None 

768 self.venv_cache_path = venv_cache_path 

769 super().__init__( 

770 python_callable=python_callable, 

771 serializer=serializer, 

772 op_args=op_args, 

773 op_kwargs=op_kwargs, 

774 string_args=string_args, 

775 templates_dict=templates_dict, 

776 templates_exts=templates_exts, 

777 expect_airflow=expect_airflow, 

778 skip_on_exit_code=skip_on_exit_code, 

779 env_vars=env_vars, 

780 inherit_env=inherit_env, 

781 **kwargs, 

782 ) 

783 

784 def _requirements_list(self, exclude_cloudpickle: bool = False) -> list[str]: 

785 """Prepare a list of requirements that need to be installed for the virtual environment.""" 

786 requirements = [str(dependency) for dependency in self.requirements] 

787 if not self.system_site_packages: 

788 if ( 

789 self.serializer == "cloudpickle" 

790 and not exclude_cloudpickle 

791 and "cloudpickle" not in requirements 

792 ): 

793 requirements.append("cloudpickle") 

794 elif self.serializer == "dill" and "dill" not in requirements: 

795 requirements.append("dill") 

796 requirements.sort() # Ensure a hash is stable 

797 return requirements 

798 

799 def _prepare_venv(self, venv_path: Path) -> None: 

800 """Prepare the requirements and installs the virtual environment.""" 

801 requirements_file = venv_path / "requirements.txt" 

802 requirements_file.write_text("\n".join(self._requirements_list())) 

803 prepare_virtualenv( 

804 venv_directory=str(venv_path), 

805 python_bin=f"python{self.python_version}" if self.python_version else "python", 

806 system_site_packages=self.system_site_packages, 

807 requirements_file_path=str(requirements_file), 

808 pip_install_options=self.pip_install_options, 

809 index_urls=self.index_urls, 

810 ) 

811 

812 def _calculate_cache_hash(self, exclude_cloudpickle: bool = False) -> tuple[str, str]: 

813 """ 

814 Generate the hash of the cache folder to use. 

815 

816 The following factors are used as input for the hash: 

817 - (sorted) list of requirements 

818 - pip install options 

819 - flag of system site packages 

820 - python version 

821 - Variable to override the hash with a cache key 

822 - Index URLs 

823 

824 Returns a hash and the data dict which is the base for the hash as text. 

825 """ 

826 hash_dict = { 

827 "requirements_list": self._requirements_list(exclude_cloudpickle=exclude_cloudpickle), 

828 "pip_install_options": self.pip_install_options, 

829 "index_urls": self.index_urls, 

830 "cache_key": str(Variable.get("PythonVirtualenvOperator.cache_key", "")), 

831 "python_version": self.python_version, 

832 "system_site_packages": self.system_site_packages, 

833 } 

834 hash_text = json.dumps(hash_dict, sort_keys=True) 

835 hash_object = hashlib_wrapper.md5(hash_text.encode()) 

836 requirements_hash = hash_object.hexdigest() 

837 return requirements_hash[:8], hash_text 

838 

839 def _ensure_venv_cache_exists(self, venv_cache_path: Path) -> Path: 

840 """Ensure a valid virtual environment is set up and will create inplace.""" 

841 cache_hash, hash_data = self._calculate_cache_hash() 

842 venv_path = venv_cache_path / f"venv-{cache_hash}" 

843 self.log.info("Python virtual environment will be cached in %s", venv_path) 

844 venv_path.parent.mkdir(parents=True, exist_ok=True) 

845 with open(f"{venv_path}.lock", "w") as f: 

846 # Ensure that cache is not build by parallel workers 

847 import fcntl 

848 

849 fcntl.flock(f, fcntl.LOCK_EX) 

850 

851 hash_marker = venv_path / "install_complete_marker.json" 

852 try: 

853 if venv_path.exists(): 

854 if hash_marker.exists(): 

855 previous_hash_data = hash_marker.read_text(encoding="utf8") 

856 if previous_hash_data == hash_data: 

857 self.log.info("Reusing cached Python virtual environment in %s", venv_path) 

858 return venv_path 

859 

860 _, hash_data_before_upgrade = self._calculate_cache_hash(exclude_cloudpickle=True) 

861 if previous_hash_data == hash_data_before_upgrade: 

862 self.log.warning( 

863 "Found a previous virtual environment in with outdated dependencies %s, " 

864 "deleting and re-creating.", 

865 venv_path, 

866 ) 

867 else: 

868 self.log.error( 

869 "Unicorn alert: Found a previous virtual environment in %s " 

870 "with the same hash but different parameters. Previous setup: '%s' / " 

871 "Requested venv setup: '%s'. Please report a bug to airflow!", 

872 venv_path, 

873 previous_hash_data, 

874 hash_data, 

875 ) 

876 else: 

877 self.log.warning( 

878 "Found a previous (probably partial installed) virtual environment in %s, " 

879 "deleting and re-creating.", 

880 venv_path, 

881 ) 

882 

883 shutil.rmtree(venv_path) 

884 

885 venv_path.mkdir(parents=True) 

886 self._prepare_venv(venv_path) 

887 hash_marker.write_text(hash_data, encoding="utf8") 

888 except Exception as e: 

889 shutil.rmtree(venv_path) 

890 raise AirflowException(f"Unable to create new virtual environment in {venv_path}") from e 

891 self.log.info("New Python virtual environment created in %s", venv_path) 

892 return venv_path 

893 

894 def _cleanup_python_pycache_dir(self, cache_dir_path: Path) -> None: 

895 try: 

896 shutil.rmtree(cache_dir_path) 

897 self.log.debug("The directory %s has been deleted.", cache_dir_path) 

898 except FileNotFoundError: 

899 self.log.warning("Fail to delete %s. The directory does not exist.", cache_dir_path) 

900 except PermissionError: 

901 self.log.warning("Permission denied to delete the directory %s.", cache_dir_path) 

902 

903 def _retrieve_index_urls_from_connection_ids(self): 

904 """Retrieve index URLs from Package Index connections.""" 

905 if self.index_urls is None: 

906 self.index_urls = [] 

907 for conn_id in self.index_urls_from_connection_ids: 

908 conn_url = PackageIndexHook(conn_id).get_connection_url() 

909 self.index_urls.append(conn_url) 

910 

911 def execute_callable(self): 

912 if self.index_urls_from_connection_ids: 

913 self._retrieve_index_urls_from_connection_ids() 

914 

915 if self.venv_cache_path: 

916 venv_path = self._ensure_venv_cache_exists(Path(self.venv_cache_path)) 

917 python_path = venv_path / "bin" / "python" 

918 return self._execute_python_callable_in_subprocess(python_path) 

919 

920 with TemporaryDirectory(prefix="venv") as tmp_dir: 

921 tmp_path = Path(tmp_dir) 

922 custom_pycache_prefix = Path(sys.pycache_prefix or "") 

923 r_path = tmp_path.relative_to(tmp_path.anchor) 

924 venv_python_cache_dir = Path.cwd() / custom_pycache_prefix / r_path 

925 self._prepare_venv(tmp_path) 

926 python_path = tmp_path / "bin" / "python" 

927 result = self._execute_python_callable_in_subprocess(python_path) 

928 self._cleanup_python_pycache_dir(venv_python_cache_dir) 

929 return result 

930 

931 def _iter_serializable_context_keys(self): 

932 yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS 

933 

934 found_airflow = found_pendulum = False 

935 

936 if self.system_site_packages: 

937 # If we're using system packages, assume both are present 

938 found_airflow = found_pendulum = True 

939 else: 

940 for raw_str in chain.from_iterable(req.splitlines() for req in self.requirements): 

941 line = raw_str.strip() 

942 # Skip blank lines and full‐line comments 

943 if not line or line.startswith("#"): 

944 continue 

945 

946 # Strip off any inline comment 

947 # e.g. turn "foo==1.2.3 # comment" → "foo==1.2.3" 

948 req_str = re.sub(r"#.*$", "", line).strip() 

949 

950 try: 

951 req = Requirement(req_str) 

952 except (InvalidRequirement, InvalidSpecifier, InvalidVersion) as e: 

953 raise ValueError(f"Invalid requirement '{raw_str}': {e}") from e 

954 

955 if req.name == "apache-airflow": 

956 found_airflow = found_pendulum = True 

957 break 

958 elif req.name == "pendulum": 

959 found_pendulum = True 

960 

961 if found_airflow: 

962 yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS 

963 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

964 elif found_pendulum: 

965 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

966 

967 

968class BranchPythonVirtualenvOperator(BaseBranchOperator, PythonVirtualenvOperator): 

969 """ 

970 A workflow can "branch" or follow a path after the execution of this task in a virtual environment. 

971 

972 It derives the PythonVirtualenvOperator and expects a Python function that returns 

973 a single task_id, a single task_group_id, or a list of task_ids and/or 

974 task_group_ids to follow. The task_id(s) and/or task_group_id(s) returned 

975 should point to a task or task group directly downstream from {self}. All 

976 other "branches" or directly downstream tasks are marked with a state of 

977 ``skipped`` so that these paths can't move forward. The ``skipped`` states 

978 are propagated downstream to allow for the DAG state to fill up and 

979 the DAG run's state to be inferred. 

980 

981 .. seealso:: 

982 For more information on how to use this operator, take a look at the guide: 

983 :ref:`howto/operator:BranchPythonVirtualenvOperator` 

984 """ 

985 

986 def choose_branch(self, context: Context) -> str | Iterable[str]: 

987 return PythonVirtualenvOperator.execute(self, context) 

988 

989 

990class ExternalPythonOperator(_BasePythonVirtualenvOperator): 

991 """ 

992 Run a function in a virtualenv that is not re-created. 

993 

994 Reused as is without the overhead of creating the virtual environment (with certain caveats). 

995 

996 The function must be defined using def, and not be 

997 part of a class. All imports must happen inside the function 

998 and no variables outside the scope may be referenced. A global scope 

999 variable named virtualenv_string_args will be available (populated by 

1000 string_args). In addition, one can pass stuff through op_args and op_kwargs, and one 

1001 can use a return value. 

1002 Note that if your virtual environment runs in a different Python major version than Airflow, 

1003 you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to 

1004 Airflow through plugins. You can use string_args though. 

1005 

1006 If Airflow is installed in the external environment in different version that the version 

1007 used by the operator, the operator will fail., 

1008 

1009 .. seealso:: 

1010 For more information on how to use this operator, take a look at the guide: 

1011 :ref:`howto/operator:ExternalPythonOperator` 

1012 

1013 :param python: Full path string (file-system specific) that points to a Python binary inside 

1014 a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path 

1015 (so usually start with "/" or "X:/" depending on the filesystem/os used). 

1016 :param python_callable: A python function with no references to outside variables, 

1017 defined with def, which will be run in a virtual environment. 

1018 :param serializer: Which serializer use to serialize the args and result. It can be one of the following: 

1019 

1020 - ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library. 

1021 - ``"cloudpickle"``: Use cloudpickle for serialize more complex types, 

1022 this requires to include cloudpickle in your requirements. 

1023 - ``"dill"``: Use dill for serialize more complex types, 

1024 this requires to include dill in your requirements. 

1025 :param op_args: A list of positional arguments to pass to python_callable. 

1026 :param op_kwargs: A dict of keyword arguments to pass to python_callable. 

1027 :param string_args: Strings that are present in the global var virtualenv_string_args, 

1028 available to python_callable at runtime as a list[str]. Note that args are split 

1029 by newline. 

1030 :param templates_dict: a dictionary where the values are templates that 

1031 will get templated by the Airflow engine sometime between 

1032 ``__init__`` and ``execute`` takes place and are made available 

1033 in your callable's context after the template has been applied 

1034 :param templates_exts: a list of file extensions to resolve while 

1035 processing templated fields, for examples ``['.sql', '.hql']`` 

1036 :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator 

1037 will raise warning if Airflow is not installed, and it will attempt to load Airflow 

1038 macros when starting. 

1039 :param skip_on_exit_code: If python_callable exits with this exit code, leave the task 

1040 in ``skipped`` state (default: None). If set to ``None``, any non-zero 

1041 exit code will be treated as a failure. 

1042 :param env_vars: A dictionary containing additional environment variables to set for the virtual 

1043 environment when it is executed. 

1044 :param inherit_env: Whether to inherit the current environment variables when executing the virtual 

1045 environment. If set to ``True``, the virtual environment will inherit the environment variables 

1046 of the parent process (``os.environ``). If set to ``False``, the virtual environment will be 

1047 executed with a clean environment. 

1048 """ 

1049 

1050 template_fields: Sequence[str] = tuple({"python"}.union(PythonOperator.template_fields)) 

1051 

1052 def __init__( 

1053 self, 

1054 *, 

1055 python: str, 

1056 python_callable: Callable, 

1057 serializer: _SerializerTypeDef | None = None, 

1058 op_args: Collection[Any] | None = None, 

1059 op_kwargs: Mapping[str, Any] | None = None, 

1060 string_args: Iterable[str] | None = None, 

1061 templates_dict: dict | None = None, 

1062 templates_exts: list[str] | None = None, 

1063 expect_airflow: bool = True, 

1064 expect_pendulum: bool = False, 

1065 skip_on_exit_code: int | Container[int] | None = None, 

1066 env_vars: dict[str, str] | None = None, 

1067 inherit_env: bool = True, 

1068 **kwargs, 

1069 ): 

1070 if not python: 

1071 raise ValueError("Python Path must be defined in ExternalPythonOperator") 

1072 self.python = python 

1073 self.expect_pendulum = expect_pendulum 

1074 super().__init__( 

1075 python_callable=python_callable, 

1076 serializer=serializer, 

1077 op_args=op_args, 

1078 op_kwargs=op_kwargs, 

1079 string_args=string_args, 

1080 templates_dict=templates_dict, 

1081 templates_exts=templates_exts, 

1082 expect_airflow=expect_airflow, 

1083 skip_on_exit_code=skip_on_exit_code, 

1084 env_vars=env_vars, 

1085 inherit_env=inherit_env, 

1086 **kwargs, 

1087 ) 

1088 

1089 def execute_callable(self): 

1090 python_path = Path(self.python) 

1091 if not python_path.exists(): 

1092 raise ValueError(f"Python Path '{python_path}' must exists") 

1093 if not python_path.is_file(): 

1094 raise ValueError(f"Python Path '{python_path}' must be a file") 

1095 if not python_path.is_absolute(): 

1096 raise ValueError(f"Python Path '{python_path}' must be an absolute path.") 

1097 python_version = _PythonVersionInfo.from_executable(self.python) 

1098 if python_version.major != sys.version_info.major and (self.op_args or self.op_kwargs): 

1099 raise AirflowException( 

1100 "Passing op_args or op_kwargs is not supported across different Python " 

1101 "major versions for ExternalPythonOperator. Please use string_args." 

1102 f"Sys version: {sys.version_info}. " 

1103 f"Virtual environment version: {python_version}" 

1104 ) 

1105 return self._execute_python_callable_in_subprocess(python_path) 

1106 

1107 def _iter_serializable_context_keys(self): 

1108 yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS 

1109 if self.expect_airflow and self._get_airflow_version_from_target_env(): 

1110 yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS 

1111 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

1112 elif self._is_pendulum_installed_in_target_env(): 

1113 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

1114 

1115 def _is_pendulum_installed_in_target_env(self) -> bool: 

1116 try: 

1117 subprocess.check_call([self.python, "-c", "import pendulum"]) 

1118 return True 

1119 except Exception as e: 

1120 if self.expect_pendulum: 

1121 self.log.warning("When checking for Pendulum installed in virtual environment got %s", e) 

1122 self.log.warning( 

1123 "Pendulum is not properly installed in the virtual environment " 

1124 "Pendulum context keys will not be available. " 

1125 "Please Install Pendulum or Airflow in your virtual environment to access them." 

1126 ) 

1127 return False 

1128 

1129 @property 

1130 def _external_airflow_version_script(self): 

1131 """ 

1132 Return python script which determines the version of the Apache Airflow. 

1133 

1134 Import airflow as a module might take a while as a result, 

1135 obtaining a version would take up to 1 second. 

1136 On the other hand, `importlib.metadata.version` will retrieve the package version pretty fast 

1137 something below 100ms; this includes new subprocess overhead. 

1138 

1139 Possible side effect: It might be a situation that `importlib.metadata` is not available (Python < 3.8), 

1140 as well as backport `importlib_metadata` which might indicate that venv doesn't contain an `apache-airflow` 

1141 or something wrong with the environment. 

1142 """ 

1143 return textwrap.dedent( 

1144 """ 

1145 try: 

1146 from importlib.metadata import version 

1147 except ImportError: 

1148 from importlib_metadata import version 

1149 print(version("apache-airflow")) 

1150 """ 

1151 ) 

1152 

1153 def _get_airflow_version_from_target_env(self) -> str | None: 

1154 from airflow import __version__ as airflow_version 

1155 

1156 try: 

1157 result = subprocess.check_output( 

1158 [self.python, "-c", self._external_airflow_version_script], 

1159 text=True, 

1160 ) 

1161 target_airflow_version = result.strip() 

1162 if target_airflow_version != airflow_version: 

1163 raise AirflowConfigException( 

1164 f"The version of Airflow installed for the {self.python} " 

1165 f"({target_airflow_version}) is different than the runtime Airflow version: " 

1166 f"{airflow_version}. Make sure your environment has the same Airflow version " 

1167 f"installed as the Airflow runtime." 

1168 ) 

1169 return target_airflow_version 

1170 except Exception as e: 

1171 if self.expect_airflow: 

1172 self.log.warning("When checking for Airflow installed in virtual environment got %s", e) 

1173 self.log.warning( 

1174 "This means that Airflow is not properly installed by %s. " 

1175 "Airflow context keys will not be available. " 

1176 "Please Install Airflow %s in your environment to access them.", 

1177 self.python, 

1178 airflow_version, 

1179 ) 

1180 return None 

1181 

1182 

1183class BranchExternalPythonOperator(BaseBranchOperator, ExternalPythonOperator): 

1184 """ 

1185 A workflow can "branch" or follow a path after the execution of this task. 

1186 

1187 Extends ExternalPythonOperator, so expects to get Python: 

1188 virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path, 

1189 so it can run on separate virtual environment similarly to ExternalPythonOperator. 

1190 

1191 .. seealso:: 

1192 For more information on how to use this operator, take a look at the guide: 

1193 :ref:`howto/operator:BranchExternalPythonOperator` 

1194 """ 

1195 

1196 def choose_branch(self, context: Context) -> str | Iterable[str]: 

1197 return ExternalPythonOperator.execute(self, context) 

1198 

1199 

1200def get_current_context() -> Mapping[str, Any]: 

1201 """ 

1202 Retrieve the execution context dictionary without altering user method's signature. 

1203 

1204 This is the simplest method of retrieving the execution context dictionary. 

1205 

1206 **Old style:** 

1207 

1208 .. code:: python 

1209 

1210 def my_task(**context): 

1211 ti = context["ti"] 

1212 

1213 **New style:** 

1214 

1215 .. code:: python 

1216 

1217 from airflow.providers.standard.operators.python import get_current_context 

1218 

1219 

1220 def my_task(): 

1221 context = get_current_context() 

1222 ti = context["ti"] 

1223 

1224 Current context will only have value if this method was called after an operator 

1225 was starting to execute. 

1226 """ 

1227 if AIRFLOW_V_3_0_PLUS: 

1228 warnings.warn( 

1229 "Using get_current_context from standard provider is deprecated and will be removed." 

1230 "Please import `from airflow.sdk import get_current_context` and use it instead.", 

1231 AirflowProviderDeprecationWarning, 

1232 stacklevel=2, 

1233 ) 

1234 

1235 from airflow.sdk import get_current_context 

1236 

1237 return get_current_context() 

1238 return _get_current_context() 

1239 

1240 

1241def _get_current_context() -> Mapping[str, Any]: 

1242 # Airflow 2.x 

1243 # TODO: To be removed when Airflow 2 support is dropped 

1244 from airflow.models.taskinstance import _CURRENT_CONTEXT # type: ignore[attr-defined] 

1245 

1246 if not _CURRENT_CONTEXT: 

1247 raise RuntimeError( 

1248 "Current context was requested but no context was found! Are you running within an Airflow task?" 

1249 ) 

1250 return _CURRENT_CONTEXT[-1]