Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/operators/python.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

389 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import fcntl 

21import importlib 

22import inspect 

23import json 

24import logging 

25import os 

26import shutil 

27import subprocess 

28import sys 

29import textwrap 

30import types 

31import warnings 

32from abc import ABCMeta, abstractmethod 

33from collections.abc import Container 

34from pathlib import Path 

35from tempfile import TemporaryDirectory 

36from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Mapping, NamedTuple, Sequence, cast 

37 

38import lazy_object_proxy 

39 

40from airflow.compat.functools import cache 

41from airflow.exceptions import ( 

42 AirflowConfigException, 

43 AirflowException, 

44 AirflowSkipException, 

45 DeserializingResultError, 

46 RemovedInAirflow3Warning, 

47) 

48from airflow.models.baseoperator import BaseOperator 

49from airflow.models.skipmixin import SkipMixin 

50from airflow.models.taskinstance import _CURRENT_CONTEXT 

51from airflow.models.variable import Variable 

52from airflow.operators.branch import BranchMixIn 

53from airflow.typing_compat import Literal 

54from airflow.utils import hashlib_wrapper 

55from airflow.utils.context import context_copy_partial, context_get_outlet_events, context_merge 

56from airflow.utils.file import get_unique_dag_module_name 

57from airflow.utils.operator_helpers import ExecutionCallableRunner, KeywordParameters 

58from airflow.utils.process_utils import execute_in_subprocess 

59from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script 

60 

61log = logging.getLogger(__name__) 

62 

63if TYPE_CHECKING: 

64 from pendulum.datetime import DateTime 

65 

66 from airflow.utils.context import Context 

67 

68 

69def is_venv_installed() -> bool: 

70 """ 

71 Check if the virtualenv package is installed via checking if it is on the path or installed as package. 

72 

73 :return: True if it is. Whichever way of checking it works, is fine. 

74 """ 

75 if shutil.which("virtualenv") or importlib.util.find_spec("virtualenv"): 

76 return True 

77 return False 

78 

79 

80def task(python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs): 

81 """Use :func:`airflow.decorators.task` instead, this is deprecated. 

82 

83 Calls ``@task.python`` and allows users to turn a Python function into 

84 an Airflow task. 

85 

86 :param python_callable: A reference to an object that is callable 

87 :param op_kwargs: a dictionary of keyword arguments that will get unpacked 

88 in your function (templated) 

89 :param op_args: a list of positional arguments that will get unpacked when 

90 calling your callable (templated) 

91 :param multiple_outputs: if set, function return value will be 

92 unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. 

93 Defaults to False. 

94 """ 

95 # To maintain backwards compatibility, we import the task object into this file 

96 # This prevents breakages in dags that use `from airflow.operators.python import task` 

97 from airflow.decorators.python import python_task 

98 

99 warnings.warn( 

100 """airflow.operators.python.task is deprecated. Please use the following instead 

101 

102 from airflow.decorators import task 

103 @task 

104 def my_task()""", 

105 RemovedInAirflow3Warning, 

106 stacklevel=2, 

107 ) 

108 return python_task(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs) 

109 

110 

111@cache 

112def _parse_version_info(text: str) -> tuple[int, int, int, str, int]: 

113 """Parse python version info from a text.""" 

114 parts = text.strip().split(".") 

115 if len(parts) != 5: 

116 msg = f"Invalid Python version info, expected 5 components separated by '.', but got {text!r}." 

117 raise ValueError(msg) 

118 try: 

119 return int(parts[0]), int(parts[1]), int(parts[2]), parts[3], int(parts[4]) 

120 except ValueError: 

121 msg = f"Unable to convert parts {parts} parsed from {text!r} to (int, int, int, str, int)." 

122 raise ValueError(msg) from None 

123 

124 

125class _PythonVersionInfo(NamedTuple): 

126 """Provide the same interface as ``sys.version_info``.""" 

127 

128 major: int 

129 minor: int 

130 micro: int 

131 releaselevel: str 

132 serial: int 

133 

134 @classmethod 

135 def from_executable(cls, executable: str) -> _PythonVersionInfo: 

136 """Parse python version info from an executable.""" 

137 cmd = [executable, "-c", 'import sys; print(".".join(map(str, sys.version_info)))'] 

138 try: 

139 result = subprocess.check_output(cmd, text=True) 

140 except Exception as e: 

141 raise ValueError(f"Error while executing command {cmd}: {e}") 

142 return cls(*_parse_version_info(result.strip())) 

143 

144 

145class PythonOperator(BaseOperator): 

146 """ 

147 Executes a Python callable. 

148 

149 .. seealso:: 

150 For more information on how to use this operator, take a look at the guide: 

151 :ref:`howto/operator:PythonOperator` 

152 

153 When running your callable, Airflow will pass a set of keyword arguments that can be used in your 

154 function. This set of kwargs correspond exactly to what you can use in your jinja templates. 

155 For this to work, you need to define ``**kwargs`` in your function header, or you can add directly the 

156 keyword arguments you would like to get - for example with the below code your callable will get 

157 the values of ``ti`` and ``next_ds`` context variables. 

158 

159 With explicit arguments: 

160 

161 .. code-block:: python 

162 

163 def my_python_callable(ti, next_ds): 

164 pass 

165 

166 With kwargs: 

167 

168 .. code-block:: python 

169 

170 def my_python_callable(**kwargs): 

171 ti = kwargs["ti"] 

172 next_ds = kwargs["next_ds"] 

173 

174 

175 :param python_callable: A reference to an object that is callable 

176 :param op_kwargs: a dictionary of keyword arguments that will get unpacked 

177 in your function 

178 :param op_args: a list of positional arguments that will get unpacked when 

179 calling your callable 

180 :param templates_dict: a dictionary where the values are templates that 

181 will get templated by the Airflow engine sometime between 

182 ``__init__`` and ``execute`` takes place and are made available 

183 in your callable's context after the template has been applied. (templated) 

184 :param templates_exts: a list of file extensions to resolve while 

185 processing templated fields, for examples ``['.sql', '.hql']`` 

186 :param show_return_value_in_logs: a bool value whether to show return_value 

187 logs. Defaults to True, which allows return value log output. 

188 It can be set to False to prevent log output of return value when you return huge data 

189 such as transmission a large amount of XCom to TaskAPI. 

190 """ 

191 

192 template_fields: Sequence[str] = ("templates_dict", "op_args", "op_kwargs") 

193 template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"} 

194 BLUE = "#ffefeb" 

195 ui_color = BLUE 

196 

197 # since we won't mutate the arguments, we should just do the shallow copy 

198 # there are some cases we can't deepcopy the objects(e.g protobuf). 

199 shallow_copy_attrs: Sequence[str] = ( 

200 "python_callable", 

201 "op_kwargs", 

202 ) 

203 

204 def __init__( 

205 self, 

206 *, 

207 python_callable: Callable, 

208 op_args: Collection[Any] | None = None, 

209 op_kwargs: Mapping[str, Any] | None = None, 

210 templates_dict: dict[str, Any] | None = None, 

211 templates_exts: Sequence[str] | None = None, 

212 show_return_value_in_logs: bool = True, 

213 **kwargs, 

214 ) -> None: 

215 if kwargs.get("provide_context"): 

216 warnings.warn( 

217 "provide_context is deprecated as of 2.0 and is no longer required", 

218 RemovedInAirflow3Warning, 

219 stacklevel=2, 

220 ) 

221 kwargs.pop("provide_context", None) 

222 super().__init__(**kwargs) 

223 if not callable(python_callable): 

224 raise AirflowException("`python_callable` param must be callable") 

225 self.python_callable = python_callable 

226 self.op_args = op_args or () 

227 self.op_kwargs = op_kwargs or {} 

228 self.templates_dict = templates_dict 

229 if templates_exts: 

230 self.template_ext = templates_exts 

231 self.show_return_value_in_logs = show_return_value_in_logs 

232 

233 def execute(self, context: Context) -> Any: 

234 context_merge(context, self.op_kwargs, templates_dict=self.templates_dict) 

235 self.op_kwargs = self.determine_kwargs(context) 

236 self._dataset_events = context_get_outlet_events(context) 

237 

238 return_value = self.execute_callable() 

239 if self.show_return_value_in_logs: 

240 self.log.info("Done. Returned value was: %s", return_value) 

241 else: 

242 self.log.info("Done. Returned value not shown") 

243 

244 return return_value 

245 

246 def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: 

247 return KeywordParameters.determine(self.python_callable, self.op_args, context).unpacking() 

248 

249 def execute_callable(self) -> Any: 

250 """ 

251 Call the python callable with the given arguments. 

252 

253 :return: the return value of the call. 

254 """ 

255 runner = ExecutionCallableRunner(self.python_callable, self._dataset_events, logger=self.log) 

256 return runner.run(*self.op_args, **self.op_kwargs) 

257 

258 

259class BranchPythonOperator(PythonOperator, BranchMixIn): 

260 """ 

261 A workflow can "branch" or follow a path after the execution of this task. 

262 

263 It derives the PythonOperator and expects a Python function that returns 

264 a single task_id or list of task_ids to follow. The task_id(s) returned 

265 should point to a task directly downstream from {self}. All other "branches" 

266 or directly downstream tasks are marked with a state of ``skipped`` so that 

267 these paths can't move forward. The ``skipped`` states are propagated 

268 downstream to allow for the DAG state to fill up and the DAG run's state 

269 to be inferred. 

270 """ 

271 

272 def execute(self, context: Context) -> Any: 

273 return self.do_branch(context, super().execute(context)) 

274 

275 

276class ShortCircuitOperator(PythonOperator, SkipMixin): 

277 """ 

278 Allows a pipeline to continue based on the result of a ``python_callable``. 

279 

280 The ShortCircuitOperator is derived from the PythonOperator and evaluates the result of a 

281 ``python_callable``. If the returned result is False or a falsy value, the pipeline will be 

282 short-circuited. Downstream tasks will be marked with a state of "skipped" based on the short-circuiting 

283 mode configured. If the returned result is True or a truthy value, downstream tasks proceed as normal and 

284 an ``XCom`` of the returned result is pushed. 

285 

286 The short-circuiting can be configured to either respect or ignore the ``trigger_rule`` set for 

287 downstream tasks. If ``ignore_downstream_trigger_rules`` is set to True, the default setting, all 

288 downstream tasks are skipped without considering the ``trigger_rule`` defined for tasks. However, if this 

289 parameter is set to False, the direct downstream tasks are skipped but the specified ``trigger_rule`` for 

290 other subsequent downstream tasks are respected. In this mode, the operator assumes the direct downstream 

291 tasks were purposely meant to be skipped but perhaps not other subsequent tasks. 

292 

293 .. seealso:: 

294 For more information on how to use this operator, take a look at the guide: 

295 :ref:`howto/operator:ShortCircuitOperator` 

296 

297 :param ignore_downstream_trigger_rules: If set to True, all downstream tasks from this operator task will 

298 be skipped. This is the default behavior. If set to False, the direct, downstream task(s) will be 

299 skipped but the ``trigger_rule`` defined for all other downstream tasks will be respected. 

300 """ 

301 

302 def __init__(self, *, ignore_downstream_trigger_rules: bool = True, **kwargs) -> None: 

303 super().__init__(**kwargs) 

304 self.ignore_downstream_trigger_rules = ignore_downstream_trigger_rules 

305 

306 def execute(self, context: Context) -> Any: 

307 condition = super().execute(context) 

308 self.log.info("Condition result is %s", condition) 

309 

310 if condition: 

311 self.log.info("Proceeding with downstream tasks...") 

312 return condition 

313 

314 if not self.downstream_task_ids: 

315 self.log.info("No downstream tasks; nothing to do.") 

316 return condition 

317 

318 dag_run = context["dag_run"] 

319 

320 def get_tasks_to_skip(): 

321 if self.ignore_downstream_trigger_rules is True: 

322 tasks = context["task"].get_flat_relatives(upstream=False) 

323 else: 

324 tasks = context["task"].get_direct_relatives(upstream=False) 

325 for t in tasks: 

326 if not t.is_teardown: 

327 yield t 

328 

329 to_skip = get_tasks_to_skip() 

330 

331 # this let's us avoid an intermediate list unless debug logging 

332 if self.log.getEffectiveLevel() <= logging.DEBUG: 

333 self.log.debug("Downstream task IDs %s", to_skip := list(get_tasks_to_skip())) 

334 

335 self.log.info("Skipping downstream tasks") 

336 

337 self.skip( 

338 dag_run=dag_run, 

339 execution_date=cast("DateTime", dag_run.execution_date), 

340 tasks=to_skip, 

341 map_index=context["ti"].map_index, 

342 ) 

343 self.log.info("Done.") 

344 # returns the result of the super execute method as it is instead of returning None 

345 return condition 

346 

347 

348def _load_pickle(): 

349 import pickle 

350 

351 return pickle 

352 

353 

354def _load_dill(): 

355 try: 

356 import dill 

357 except ModuleNotFoundError: 

358 log.error("Unable to import `dill` module. Please please make sure that it installed.") 

359 raise 

360 return dill 

361 

362 

363def _load_cloudpickle(): 

364 try: 

365 import cloudpickle 

366 except ModuleNotFoundError: 

367 log.error( 

368 "Unable to import `cloudpickle` module. " 

369 "Please install it with: pip install 'apache-airflow[cloudpickle]'" 

370 ) 

371 raise 

372 return cloudpickle 

373 

374 

375_SerializerTypeDef = Literal["pickle", "cloudpickle", "dill"] 

376_SERIALIZERS: dict[_SerializerTypeDef, Any] = { 

377 "pickle": lazy_object_proxy.Proxy(_load_pickle), 

378 "dill": lazy_object_proxy.Proxy(_load_dill), 

379 "cloudpickle": lazy_object_proxy.Proxy(_load_cloudpickle), 

380} 

381 

382 

383class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): 

384 BASE_SERIALIZABLE_CONTEXT_KEYS = { 

385 "ds", 

386 "ds_nodash", 

387 "expanded_ti_count", 

388 "inlets", 

389 "map_index_template", 

390 "next_ds", 

391 "next_ds_nodash", 

392 "outlets", 

393 "prev_ds", 

394 "prev_ds_nodash", 

395 "run_id", 

396 "task_instance_key_str", 

397 "test_mode", 

398 "tomorrow_ds", 

399 "tomorrow_ds_nodash", 

400 "ts", 

401 "ts_nodash", 

402 "ts_nodash_with_tz", 

403 "yesterday_ds", 

404 "yesterday_ds_nodash", 

405 } 

406 PENDULUM_SERIALIZABLE_CONTEXT_KEYS = { 

407 "data_interval_end", 

408 "data_interval_start", 

409 "execution_date", 

410 "logical_date", 

411 "next_execution_date", 

412 "prev_data_interval_end_success", 

413 "prev_data_interval_start_success", 

414 "prev_execution_date", 

415 "prev_execution_date_success", 

416 "prev_start_date_success", 

417 "prev_end_date_success", 

418 } 

419 AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = { 

420 "macros", 

421 "conf", 

422 "dag", 

423 "dag_run", 

424 "task", 

425 "params", 

426 "triggering_dataset_events", 

427 } 

428 

429 def __init__( 

430 self, 

431 *, 

432 python_callable: Callable, 

433 serializer: _SerializerTypeDef | None = None, 

434 op_args: Collection[Any] | None = None, 

435 op_kwargs: Mapping[str, Any] | None = None, 

436 string_args: Iterable[str] | None = None, 

437 templates_dict: dict | None = None, 

438 templates_exts: list[str] | None = None, 

439 expect_airflow: bool = True, 

440 skip_on_exit_code: int | Container[int] | None = None, 

441 use_dill: bool = False, 

442 **kwargs, 

443 ): 

444 if ( 

445 not isinstance(python_callable, types.FunctionType) 

446 or isinstance(python_callable, types.LambdaType) 

447 and python_callable.__name__ == "<lambda>" 

448 ): 

449 raise ValueError(f"{type(self).__name__} only supports functions for python_callable arg") 

450 if inspect.isgeneratorfunction(python_callable): 

451 raise ValueError(f"{type(self).__name__} does not support using 'yield' in python_callable") 

452 super().__init__( 

453 python_callable=python_callable, 

454 op_args=op_args, 

455 op_kwargs=op_kwargs, 

456 templates_dict=templates_dict, 

457 templates_exts=templates_exts, 

458 **kwargs, 

459 ) 

460 self.string_args = string_args or [] 

461 

462 if use_dill: 

463 warnings.warn( 

464 "`use_dill` is deprecated and will be removed in a future version. " 

465 "Please provide serializer='dill' instead.", 

466 RemovedInAirflow3Warning, 

467 stacklevel=3, 

468 ) 

469 if serializer: 

470 raise AirflowException( 

471 "Both 'use_dill' and 'serializer' parameters are set. Please set only one of them" 

472 ) 

473 serializer = "dill" 

474 serializer = serializer or "pickle" 

475 if serializer not in _SERIALIZERS: 

476 msg = ( 

477 f"Unsupported serializer {serializer!r}. " 

478 f"Expected one of {', '.join(map(repr, _SERIALIZERS))}" 

479 ) 

480 raise AirflowException(msg) 

481 self.pickling_library = _SERIALIZERS[serializer] 

482 self.serializer: _SerializerTypeDef = serializer 

483 

484 self.expect_airflow = expect_airflow 

485 self.skip_on_exit_code = ( 

486 skip_on_exit_code 

487 if isinstance(skip_on_exit_code, Container) 

488 else [skip_on_exit_code] 

489 if skip_on_exit_code is not None 

490 else [] 

491 ) 

492 

493 @abstractmethod 

494 def _iter_serializable_context_keys(self): 

495 pass 

496 

497 def execute(self, context: Context) -> Any: 

498 serializable_keys = set(self._iter_serializable_context_keys()) 

499 serializable_context = context_copy_partial(context, serializable_keys) 

500 return super().execute(context=serializable_context) 

501 

502 def get_python_source(self): 

503 """Return the source of self.python_callable.""" 

504 return textwrap.dedent(inspect.getsource(self.python_callable)) 

505 

506 def _write_args(self, file: Path): 

507 if self.op_args or self.op_kwargs: 

508 self.log.info("Use %r as serializer.", self.serializer) 

509 file.write_bytes(self.pickling_library.dumps({"args": self.op_args, "kwargs": self.op_kwargs})) 

510 

511 def _write_string_args(self, file: Path): 

512 file.write_text("\n".join(map(str, self.string_args))) 

513 

514 def _read_result(self, path: Path): 

515 if path.stat().st_size == 0: 

516 return None 

517 try: 

518 return self.pickling_library.loads(path.read_bytes()) 

519 except ValueError as value_error: 

520 raise DeserializingResultError() from value_error 

521 

522 def __deepcopy__(self, memo): 

523 # module objects can't be copied _at all__ 

524 memo[id(self.pickling_library)] = self.pickling_library 

525 return super().__deepcopy__(memo) 

526 

527 def _execute_python_callable_in_subprocess(self, python_path: Path): 

528 with TemporaryDirectory(prefix="venv-call") as tmp: 

529 tmp_dir = Path(tmp) 

530 op_kwargs: dict[str, Any] = dict(self.op_kwargs) 

531 if self.templates_dict: 

532 op_kwargs["templates_dict"] = self.templates_dict 

533 input_path = tmp_dir / "script.in" 

534 output_path = tmp_dir / "script.out" 

535 string_args_path = tmp_dir / "string_args.txt" 

536 script_path = tmp_dir / "script.py" 

537 termination_log_path = tmp_dir / "termination.log" 

538 

539 self._write_args(input_path) 

540 self._write_string_args(string_args_path) 

541 

542 jinja_context = { 

543 "op_args": self.op_args, 

544 "op_kwargs": op_kwargs, 

545 "expect_airflow": self.expect_airflow, 

546 "pickling_library": self.serializer, 

547 "python_callable": self.python_callable.__name__, 

548 "python_callable_source": self.get_python_source(), 

549 } 

550 

551 if inspect.getfile(self.python_callable) == self.dag.fileloc: 

552 jinja_context["modified_dag_module_name"] = get_unique_dag_module_name(self.dag.fileloc) 

553 

554 write_python_script( 

555 jinja_context=jinja_context, 

556 filename=os.fspath(script_path), 

557 render_template_as_native_obj=self.dag.render_template_as_native_obj, 

558 ) 

559 

560 try: 

561 execute_in_subprocess( 

562 cmd=[ 

563 os.fspath(python_path), 

564 os.fspath(script_path), 

565 os.fspath(input_path), 

566 os.fspath(output_path), 

567 os.fspath(string_args_path), 

568 os.fspath(termination_log_path), 

569 ] 

570 ) 

571 except subprocess.CalledProcessError as e: 

572 if e.returncode in self.skip_on_exit_code: 

573 raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.") 

574 elif termination_log_path.exists() and termination_log_path.stat().st_size > 0: 

575 error_msg = f"Process returned non-zero exit status {e.returncode}.\n" 

576 with open(termination_log_path) as file: 

577 error_msg += file.read() 

578 raise AirflowException(error_msg) from None 

579 else: 

580 raise 

581 

582 if 0 in self.skip_on_exit_code: 

583 raise AirflowSkipException("Process exited with code 0. Skipping.") 

584 

585 return self._read_result(output_path) 

586 

587 def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: 

588 return KeywordParameters.determine(self.python_callable, self.op_args, context).serializing() 

589 

590 

591class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): 

592 """ 

593 Run a function in a virtualenv that is created and destroyed automatically. 

594 

595 The function (has certain caveats) must be defined using def, and not be 

596 part of a class. All imports must happen inside the function 

597 and no variables outside the scope may be referenced. A global scope 

598 variable named virtualenv_string_args will be available (populated by 

599 string_args). In addition, one can pass stuff through op_args and op_kwargs, and one 

600 can use a return value. 

601 Note that if your virtualenv runs in a different Python major version than Airflow, 

602 you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to 

603 Airflow through plugins. You can use string_args though. 

604 

605 .. seealso:: 

606 For more information on how to use this operator, take a look at the guide: 

607 :ref:`howto/operator:PythonVirtualenvOperator` 

608 

609 :param python_callable: A python function with no references to outside variables, 

610 defined with def, which will be run in a virtual environment. 

611 :param requirements: Either a list of requirement strings, or a (templated) 

612 "requirements file" as specified by pip. 

613 :param python_version: The Python version to run the virtual environment with. Note that 

614 both 2 and 2.7 are acceptable forms. 

615 :param serializer: Which serializer use to serialize the args and result. It can be one of the following: 

616 

617 - ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library. 

618 - ``"cloudpickle"``: Use cloudpickle for serialize more complex types, 

619 this requires to include cloudpickle in your requirements. 

620 - ``"dill"``: Use dill for serialize more complex types, 

621 this requires to include dill in your requirements. 

622 :param system_site_packages: Whether to include 

623 system_site_packages in your virtual environment. 

624 See virtualenv documentation for more information. 

625 :param pip_install_options: a list of pip install options when installing requirements 

626 See 'pip install -h' for available options 

627 :param op_args: A list of positional arguments to pass to python_callable. 

628 :param op_kwargs: A dict of keyword arguments to pass to python_callable. 

629 :param string_args: Strings that are present in the global var virtualenv_string_args, 

630 available to python_callable at runtime as a list[str]. Note that args are split 

631 by newline. 

632 :param templates_dict: a dictionary where the values are templates that 

633 will get templated by the Airflow engine sometime between 

634 ``__init__`` and ``execute`` takes place and are made available 

635 in your callable's context after the template has been applied 

636 :param templates_exts: a list of file extensions to resolve while 

637 processing templated fields, for examples ``['.sql', '.hql']`` 

638 :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator 

639 will raise warning if Airflow is not installed, and it will attempt to load Airflow 

640 macros when starting. 

641 :param skip_on_exit_code: If python_callable exits with this exit code, leave the task 

642 in ``skipped`` state (default: None). If set to ``None``, any non-zero 

643 exit code will be treated as a failure. 

644 :param index_urls: an optional list of index urls to load Python packages from. 

645 If not provided the system pip conf will be used to source packages from. 

646 :param venv_cache_path: Optional path to the virtual environment parent folder in which the 

647 virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be replaced 

648 with a checksum of requirements. If not provided the virtual environment will be created and deleted 

649 in a temp folder for every execution. 

650 :param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize 

651 the args and result (pickle is default). This allows more complex types 

652 but requires you to include dill in your requirements. 

653 """ 

654 

655 template_fields: Sequence[str] = tuple( 

656 {"requirements", "index_urls", "venv_cache_path"}.union(PythonOperator.template_fields) 

657 ) 

658 template_ext: Sequence[str] = (".txt",) 

659 

660 def __init__( 

661 self, 

662 *, 

663 python_callable: Callable, 

664 requirements: None | Iterable[str] | str = None, 

665 python_version: str | None = None, 

666 serializer: _SerializerTypeDef | None = None, 

667 system_site_packages: bool = True, 

668 pip_install_options: list[str] | None = None, 

669 op_args: Collection[Any] | None = None, 

670 op_kwargs: Mapping[str, Any] | None = None, 

671 string_args: Iterable[str] | None = None, 

672 templates_dict: dict | None = None, 

673 templates_exts: list[str] | None = None, 

674 expect_airflow: bool = True, 

675 skip_on_exit_code: int | Container[int] | None = None, 

676 index_urls: None | Collection[str] | str = None, 

677 venv_cache_path: None | os.PathLike[str] = None, 

678 use_dill: bool = False, 

679 **kwargs, 

680 ): 

681 if ( 

682 python_version 

683 and str(python_version)[0] != str(sys.version_info.major) 

684 and (op_args or op_kwargs) 

685 ): 

686 raise AirflowException( 

687 "Passing op_args or op_kwargs is not supported across different Python " 

688 "major versions for PythonVirtualenvOperator. Please use string_args." 

689 f"Sys version: {sys.version_info}. Virtual environment version: {python_version}" 

690 ) 

691 if python_version is not None and not isinstance(python_version, str): 

692 warnings.warn( 

693 "Passing non-string types (e.g. int or float) as python_version " 

694 "is deprecated. Please use string value instead.", 

695 RemovedInAirflow3Warning, 

696 stacklevel=2, 

697 ) 

698 if not is_venv_installed(): 

699 raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.") 

700 if not requirements: 

701 self.requirements: list[str] = [] 

702 elif isinstance(requirements, str): 

703 self.requirements = [requirements] 

704 else: 

705 self.requirements = list(requirements) 

706 self.python_version = python_version 

707 self.system_site_packages = system_site_packages 

708 self.pip_install_options = pip_install_options 

709 if isinstance(index_urls, str): 

710 self.index_urls: list[str] | None = [index_urls] 

711 elif isinstance(index_urls, Collection): 

712 self.index_urls = list(index_urls) 

713 else: 

714 self.index_urls = None 

715 self.venv_cache_path = venv_cache_path 

716 super().__init__( 

717 python_callable=python_callable, 

718 serializer=serializer, 

719 op_args=op_args, 

720 op_kwargs=op_kwargs, 

721 string_args=string_args, 

722 templates_dict=templates_dict, 

723 templates_exts=templates_exts, 

724 expect_airflow=expect_airflow, 

725 skip_on_exit_code=skip_on_exit_code, 

726 use_dill=use_dill, 

727 **kwargs, 

728 ) 

729 

730 def _requirements_list(self, exclude_cloudpickle: bool = False) -> list[str]: 

731 """Prepare a list of requirements that need to be installed for the virtual environment.""" 

732 requirements = [str(dependency) for dependency in self.requirements] 

733 if not self.system_site_packages: 

734 if ( 

735 self.serializer == "cloudpickle" 

736 and not exclude_cloudpickle 

737 and "cloudpickle" not in requirements 

738 ): 

739 requirements.append("cloudpickle") 

740 elif self.serializer == "dill" and "dill" not in requirements: 

741 requirements.append("dill") 

742 requirements.sort() # Ensure a hash is stable 

743 return requirements 

744 

745 def _prepare_venv(self, venv_path: Path) -> None: 

746 """Prepare the requirements and installs the virtual environment.""" 

747 requirements_file = venv_path / "requirements.txt" 

748 requirements_file.write_text("\n".join(self._requirements_list())) 

749 prepare_virtualenv( 

750 venv_directory=str(venv_path), 

751 python_bin=f"python{self.python_version}" if self.python_version else "python", 

752 system_site_packages=self.system_site_packages, 

753 requirements_file_path=str(requirements_file), 

754 pip_install_options=self.pip_install_options, 

755 index_urls=self.index_urls, 

756 ) 

757 

758 def _calculate_cache_hash(self, exclude_cloudpickle: bool = False) -> tuple[str, str]: 

759 """Generate the hash of the cache folder to use. 

760 

761 The following factors are used as input for the hash: 

762 - (sorted) list of requirements 

763 - pip install options 

764 - flag of system site packages 

765 - python version 

766 - Variable to override the hash with a cache key 

767 - Index URLs 

768 

769 Returns a hash and the data dict which is the base for the hash as text. 

770 """ 

771 hash_dict = { 

772 "requirements_list": self._requirements_list(exclude_cloudpickle=exclude_cloudpickle), 

773 "pip_install_options": self.pip_install_options, 

774 "index_urls": self.index_urls, 

775 "cache_key": str(Variable.get("PythonVirtualenvOperator.cache_key", "")), 

776 "python_version": self.python_version, 

777 "system_site_packages": self.system_site_packages, 

778 } 

779 hash_text = json.dumps(hash_dict, sort_keys=True) 

780 hash_object = hashlib_wrapper.md5(hash_text.encode()) 

781 requirements_hash = hash_object.hexdigest() 

782 return requirements_hash[:8], hash_text 

783 

784 def _ensure_venv_cache_exists(self, venv_cache_path: Path) -> Path: 

785 """Ensure a valid virtual environment is set up and will create inplace.""" 

786 cache_hash, hash_data = self._calculate_cache_hash() 

787 venv_path = venv_cache_path / f"venv-{cache_hash}" 

788 self.log.info("Python virtual environment will be cached in %s", venv_path) 

789 venv_path.parent.mkdir(parents=True, exist_ok=True) 

790 with open(f"{venv_path}.lock", "w") as f: 

791 # Ensure that cache is not build by parallel workers 

792 fcntl.flock(f, fcntl.LOCK_EX) 

793 

794 hash_marker = venv_path / "install_complete_marker.json" 

795 try: 

796 if venv_path.exists(): 

797 if hash_marker.exists(): 

798 previous_hash_data = hash_marker.read_text(encoding="utf8") 

799 if previous_hash_data == hash_data: 

800 self.log.info("Re-using cached Python virtual environment in %s", venv_path) 

801 return venv_path 

802 

803 _, hash_data_before_upgrade = self._calculate_cache_hash(exclude_cloudpickle=True) 

804 if previous_hash_data == hash_data_before_upgrade: 

805 self.log.warning( 

806 "Found a previous virtual environment in with outdated dependencies %s, " 

807 "deleting and re-creating.", 

808 venv_path, 

809 ) 

810 else: 

811 self.log.error( 

812 "Unicorn alert: Found a previous virtual environment in %s " 

813 "with the same hash but different parameters. Previous setup: '%s' / " 

814 "Requested venv setup: '%s'. Please report a bug to airflow!", 

815 venv_path, 

816 previous_hash_data, 

817 hash_data, 

818 ) 

819 else: 

820 self.log.warning( 

821 "Found a previous (probably partial installed) virtual environment in %s, " 

822 "deleting and re-creating.", 

823 venv_path, 

824 ) 

825 

826 shutil.rmtree(venv_path) 

827 

828 venv_path.mkdir(parents=True) 

829 self._prepare_venv(venv_path) 

830 hash_marker.write_text(hash_data, encoding="utf8") 

831 except Exception as e: 

832 shutil.rmtree(venv_path) 

833 raise AirflowException(f"Unable to create new virtual environment in {venv_path}") from e 

834 self.log.info("New Python virtual environment created in %s", venv_path) 

835 return venv_path 

836 

837 def execute_callable(self): 

838 if self.venv_cache_path: 

839 venv_path = self._ensure_venv_cache_exists(Path(self.venv_cache_path)) 

840 python_path = venv_path / "bin" / "python" 

841 return self._execute_python_callable_in_subprocess(python_path) 

842 

843 with TemporaryDirectory(prefix="venv") as tmp_dir: 

844 tmp_path = Path(tmp_dir) 

845 self._prepare_venv(tmp_path) 

846 python_path = tmp_path / "bin" / "python" 

847 result = self._execute_python_callable_in_subprocess(python_path) 

848 return result 

849 

850 def _iter_serializable_context_keys(self): 

851 yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS 

852 if self.system_site_packages or "apache-airflow" in self.requirements: 

853 yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS 

854 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

855 elif "pendulum" in self.requirements: 

856 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

857 

858 

859class BranchPythonVirtualenvOperator(PythonVirtualenvOperator, BranchMixIn): 

860 """ 

861 A workflow can "branch" or follow a path after the execution of this task in a virtual environment. 

862 

863 It derives the PythonVirtualenvOperator and expects a Python function that returns 

864 a single task_id or list of task_ids to follow. The task_id(s) returned 

865 should point to a task directly downstream from {self}. All other "branches" 

866 or directly downstream tasks are marked with a state of ``skipped`` so that 

867 these paths can't move forward. The ``skipped`` states are propagated 

868 downstream to allow for the DAG state to fill up and the DAG run's state 

869 to be inferred. 

870 

871 .. seealso:: 

872 For more information on how to use this operator, take a look at the guide: 

873 :ref:`howto/operator:BranchPythonVirtualenvOperator` 

874 """ 

875 

876 def execute(self, context: Context) -> Any: 

877 return self.do_branch(context, super().execute(context)) 

878 

879 

880class ExternalPythonOperator(_BasePythonVirtualenvOperator): 

881 """ 

882 Run a function in a virtualenv that is not re-created. 

883 

884 Reused as is without the overhead of creating the virtual environment (with certain caveats). 

885 

886 The function must be defined using def, and not be 

887 part of a class. All imports must happen inside the function 

888 and no variables outside the scope may be referenced. A global scope 

889 variable named virtualenv_string_args will be available (populated by 

890 string_args). In addition, one can pass stuff through op_args and op_kwargs, and one 

891 can use a return value. 

892 Note that if your virtual environment runs in a different Python major version than Airflow, 

893 you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to 

894 Airflow through plugins. You can use string_args though. 

895 

896 If Airflow is installed in the external environment in different version that the version 

897 used by the operator, the operator will fail., 

898 

899 .. seealso:: 

900 For more information on how to use this operator, take a look at the guide: 

901 :ref:`howto/operator:ExternalPythonOperator` 

902 

903 :param python: Full path string (file-system specific) that points to a Python binary inside 

904 a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path 

905 (so usually start with "/" or "X:/" depending on the filesystem/os used). 

906 :param python_callable: A python function with no references to outside variables, 

907 defined with def, which will be run in a virtual environment. 

908 :param serializer: Which serializer use to serialize the args and result. It can be one of the following: 

909 

910 - ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library. 

911 - ``"cloudpickle"``: Use cloudpickle for serialize more complex types, 

912 this requires to include cloudpickle in your requirements. 

913 - ``"dill"``: Use dill for serialize more complex types, 

914 this requires to include dill in your requirements. 

915 :param op_args: A list of positional arguments to pass to python_callable. 

916 :param op_kwargs: A dict of keyword arguments to pass to python_callable. 

917 :param string_args: Strings that are present in the global var virtualenv_string_args, 

918 available to python_callable at runtime as a list[str]. Note that args are split 

919 by newline. 

920 :param templates_dict: a dictionary where the values are templates that 

921 will get templated by the Airflow engine sometime between 

922 ``__init__`` and ``execute`` takes place and are made available 

923 in your callable's context after the template has been applied 

924 :param templates_exts: a list of file extensions to resolve while 

925 processing templated fields, for examples ``['.sql', '.hql']`` 

926 :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator 

927 will raise warning if Airflow is not installed, and it will attempt to load Airflow 

928 macros when starting. 

929 :param skip_on_exit_code: If python_callable exits with this exit code, leave the task 

930 in ``skipped`` state (default: None). If set to ``None``, any non-zero 

931 exit code will be treated as a failure. 

932 :param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize 

933 the args and result (pickle is default). This allows more complex types 

934 but requires you to include dill in your requirements. 

935 """ 

936 

937 template_fields: Sequence[str] = tuple({"python"}.union(PythonOperator.template_fields)) 

938 

939 def __init__( 

940 self, 

941 *, 

942 python: str, 

943 python_callable: Callable, 

944 serializer: _SerializerTypeDef | None = None, 

945 op_args: Collection[Any] | None = None, 

946 op_kwargs: Mapping[str, Any] | None = None, 

947 string_args: Iterable[str] | None = None, 

948 templates_dict: dict | None = None, 

949 templates_exts: list[str] | None = None, 

950 expect_airflow: bool = True, 

951 expect_pendulum: bool = False, 

952 skip_on_exit_code: int | Container[int] | None = None, 

953 use_dill: bool = False, 

954 **kwargs, 

955 ): 

956 if not python: 

957 raise ValueError("Python Path must be defined in ExternalPythonOperator") 

958 self.python = python 

959 self.expect_pendulum = expect_pendulum 

960 super().__init__( 

961 python_callable=python_callable, 

962 serializer=serializer, 

963 op_args=op_args, 

964 op_kwargs=op_kwargs, 

965 string_args=string_args, 

966 templates_dict=templates_dict, 

967 templates_exts=templates_exts, 

968 expect_airflow=expect_airflow, 

969 skip_on_exit_code=skip_on_exit_code, 

970 use_dill=use_dill, 

971 **kwargs, 

972 ) 

973 

974 def execute_callable(self): 

975 python_path = Path(self.python) 

976 if not python_path.exists(): 

977 raise ValueError(f"Python Path '{python_path}' must exists") 

978 if not python_path.is_file(): 

979 raise ValueError(f"Python Path '{python_path}' must be a file") 

980 if not python_path.is_absolute(): 

981 raise ValueError(f"Python Path '{python_path}' must be an absolute path.") 

982 python_version = _PythonVersionInfo.from_executable(self.python) 

983 if python_version.major != sys.version_info.major and (self.op_args or self.op_kwargs): 

984 raise AirflowException( 

985 "Passing op_args or op_kwargs is not supported across different Python " 

986 "major versions for ExternalPythonOperator. Please use string_args." 

987 f"Sys version: {sys.version_info}. " 

988 f"Virtual environment version: {python_version}" 

989 ) 

990 return self._execute_python_callable_in_subprocess(python_path) 

991 

992 def _iter_serializable_context_keys(self): 

993 yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS 

994 if self._get_airflow_version_from_target_env(): 

995 yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS 

996 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

997 elif self._is_pendulum_installed_in_target_env(): 

998 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

999 

1000 def _is_pendulum_installed_in_target_env(self) -> bool: 

1001 try: 

1002 subprocess.check_call([self.python, "-c", "import pendulum"]) 

1003 return True 

1004 except Exception as e: 

1005 if self.expect_pendulum: 

1006 self.log.warning("When checking for Pendulum installed in virtual environment got %s", e) 

1007 self.log.warning( 

1008 "Pendulum is not properly installed in the virtual environment " 

1009 "Pendulum context keys will not be available. " 

1010 "Please Install Pendulum or Airflow in your virtual environment to access them." 

1011 ) 

1012 return False 

1013 

1014 @property 

1015 def _external_airflow_version_script(self): 

1016 """ 

1017 Return python script which determines the version of the Apache Airflow. 

1018 

1019 Import airflow as a module might take a while as a result, 

1020 obtaining a version would take up to 1 second. 

1021 On the other hand, `importlib.metadata.version` will retrieve the package version pretty fast 

1022 something below 100ms; this includes new subprocess overhead. 

1023 

1024 Possible side effect: It might be a situation that `importlib.metadata` is not available (Python < 3.8), 

1025 as well as backport `importlib_metadata` which might indicate that venv doesn't contain an `apache-airflow` 

1026 or something wrong with the environment. 

1027 """ 

1028 return textwrap.dedent( 

1029 """ 

1030 try: 

1031 from importlib.metadata import version 

1032 except ImportError: 

1033 from importlib_metadata import version 

1034 print(version("apache-airflow")) 

1035 """ 

1036 ) 

1037 

1038 def _get_airflow_version_from_target_env(self) -> str | None: 

1039 from airflow import __version__ as airflow_version 

1040 

1041 try: 

1042 result = subprocess.check_output( 

1043 [self.python, "-c", self._external_airflow_version_script], 

1044 text=True, 

1045 ) 

1046 target_airflow_version = result.strip() 

1047 if target_airflow_version != airflow_version: 

1048 raise AirflowConfigException( 

1049 f"The version of Airflow installed for the {self.python} " 

1050 f"({target_airflow_version}) is different than the runtime Airflow version: " 

1051 f"{airflow_version}. Make sure your environment has the same Airflow version " 

1052 f"installed as the Airflow runtime." 

1053 ) 

1054 return target_airflow_version 

1055 except Exception as e: 

1056 if self.expect_airflow: 

1057 self.log.warning("When checking for Airflow installed in virtual environment got %s", e) 

1058 self.log.warning( 

1059 "This means that Airflow is not properly installed by %s. " 

1060 "Airflow context keys will not be available. " 

1061 "Please Install Airflow %s in your environment to access them.", 

1062 self.python, 

1063 airflow_version, 

1064 ) 

1065 return None 

1066 

1067 

1068class BranchExternalPythonOperator(ExternalPythonOperator, BranchMixIn): 

1069 """ 

1070 A workflow can "branch" or follow a path after the execution of this task. 

1071 

1072 Extends ExternalPythonOperator, so expects to get Python: 

1073 virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path, 

1074 so it can run on separate virtual environment similarly to ExternalPythonOperator. 

1075 

1076 .. seealso:: 

1077 For more information on how to use this operator, take a look at the guide: 

1078 :ref:`howto/operator:BranchExternalPythonOperator` 

1079 """ 

1080 

1081 def execute(self, context: Context) -> Any: 

1082 return self.do_branch(context, super().execute(context)) 

1083 

1084 

1085def get_current_context() -> Context: 

1086 """ 

1087 Retrieve the execution context dictionary without altering user method's signature. 

1088 

1089 This is the simplest method of retrieving the execution context dictionary. 

1090 

1091 **Old style:** 

1092 

1093 .. code:: python 

1094 

1095 def my_task(**context): 

1096 ti = context["ti"] 

1097 

1098 **New style:** 

1099 

1100 .. code:: python 

1101 

1102 from airflow.operators.python import get_current_context 

1103 

1104 

1105 def my_task(): 

1106 context = get_current_context() 

1107 ti = context["ti"] 

1108 

1109 Current context will only have value if this method was called after an operator 

1110 was starting to execute. 

1111 """ 

1112 if not _CURRENT_CONTEXT: 

1113 raise AirflowException( 

1114 "Current context was requested but no context was found! " 

1115 "Are you running within an airflow task?" 

1116 ) 

1117 return _CURRENT_CONTEXT[-1]