1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20import inspect
21import json
22import logging
23import os
24import re
25import shutil
26import subprocess
27import sys
28import textwrap
29import types
30import warnings
31from abc import ABCMeta, abstractmethod
32from collections.abc import Callable, Collection, Container, Iterable, Mapping, Sequence
33from functools import cache
34from itertools import chain
35from pathlib import Path
36from tempfile import TemporaryDirectory
37from typing import TYPE_CHECKING, Any, NamedTuple, cast
38
39import lazy_object_proxy
40from packaging.requirements import InvalidRequirement, Requirement
41from packaging.specifiers import InvalidSpecifier
42from packaging.version import InvalidVersion
43
44from airflow.exceptions import (
45 AirflowConfigException,
46 AirflowProviderDeprecationWarning,
47 DeserializingResultError,
48)
49from airflow.models.variable import Variable
50from airflow.providers.common.compat.sdk import AirflowException, AirflowSkipException, context_merge
51from airflow.providers.standard.hooks.package_index import PackageIndexHook
52from airflow.providers.standard.utils.python_virtualenv import (
53 _execute_in_subprocess,
54 prepare_virtualenv,
55 write_python_script,
56)
57from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator
58from airflow.utils import hashlib_wrapper
59from airflow.utils.file import get_unique_dag_module_name
60from airflow.utils.operator_helpers import KeywordParameters
61
62if AIRFLOW_V_3_0_PLUS:
63 from airflow.providers.standard.operators.branch import BaseBranchOperator
64 from airflow.providers.standard.utils.skipmixin import SkipMixin
65else:
66 from airflow.models.skipmixin import SkipMixin
67 from airflow.operators.branch import BaseBranchOperator # type: ignore[no-redef]
68
69
70log = logging.getLogger(__name__)
71
72if TYPE_CHECKING:
73 from typing import Literal
74
75 from pendulum.datetime import DateTime
76
77 from airflow.providers.common.compat.sdk import Context
78 from airflow.sdk.execution_time.callback_runner import ExecutionCallableRunner
79 from airflow.sdk.execution_time.context import OutletEventAccessorsProtocol
80
81 _SerializerTypeDef = Literal["pickle", "cloudpickle", "dill"]
82
83
84@cache
85def _parse_version_info(text: str) -> tuple[int, int, int, str, int]:
86 """Parse python version info from a text."""
87 parts = text.strip().split(".")
88 if len(parts) != 5:
89 msg = f"Invalid Python version info, expected 5 components separated by '.', but got {text!r}."
90 raise ValueError(msg)
91 try:
92 return int(parts[0]), int(parts[1]), int(parts[2]), parts[3], int(parts[4])
93 except ValueError:
94 msg = f"Unable to convert parts {parts} parsed from {text!r} to (int, int, int, str, int)."
95 raise ValueError(msg) from None
96
97
98class _PythonVersionInfo(NamedTuple):
99 """Provide the same interface as ``sys.version_info``."""
100
101 major: int
102 minor: int
103 micro: int
104 releaselevel: str
105 serial: int
106
107 @classmethod
108 def from_executable(cls, executable: str) -> _PythonVersionInfo:
109 """Parse python version info from an executable."""
110 cmd = [executable, "-c", 'import sys; print(".".join(map(str, sys.version_info)))']
111 try:
112 result = subprocess.check_output(cmd, text=True)
113 except Exception as e:
114 raise ValueError(f"Error while executing command {cmd}: {e}")
115 return cls(*_parse_version_info(result.strip()))
116
117
118class PythonOperator(BaseOperator):
119 """
120 Executes a Python callable.
121
122 .. seealso::
123 For more information on how to use this operator, take a look at the guide:
124 :ref:`howto/operator:PythonOperator`
125
126 When running your callable, Airflow will pass a set of keyword arguments that can be used in your
127 function. This set of kwargs correspond exactly to what you can use in your jinja templates.
128 For this to work, you need to define ``**kwargs`` in your function header, or you can add directly the
129 keyword arguments you would like to get - for example with the below code your callable will get
130 the values of ``ti`` context variables.
131
132 With explicit arguments:
133
134 .. code-block:: python
135
136 def my_python_callable(ti):
137 pass
138
139 With kwargs:
140
141 .. code-block:: python
142
143 def my_python_callable(**kwargs):
144 ti = kwargs["ti"]
145
146
147 :param python_callable: A reference to an object that is callable
148 :param op_args: a list of positional arguments that will get unpacked when
149 calling your callable
150 :param op_kwargs: a dictionary of keyword arguments that will get unpacked
151 in your function
152 :param templates_dict: a dictionary where the values are templates that
153 will get templated by the Airflow engine sometime between
154 ``__init__`` and ``execute`` takes place and are made available
155 in your callable's context after the template has been applied. (templated)
156 :param templates_exts: a list of file extensions to resolve while
157 processing templated fields, for examples ``['.sql', '.hql']``
158 :param show_return_value_in_logs: a bool value whether to show return_value
159 logs. Defaults to True, which allows return value log output.
160 It can be set to False to prevent log output of return value when you return huge data
161 such as transmission a large amount of XCom to TaskAPI.
162 """
163
164 template_fields: Sequence[str] = ("templates_dict", "op_args", "op_kwargs")
165 template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}
166 BLUE = "#ffefeb"
167 ui_color = BLUE
168
169 # since we won't mutate the arguments, we should just do the shallow copy
170 # there are some cases we can't deepcopy the objects(e.g protobuf).
171 shallow_copy_attrs: Sequence[str] = ("python_callable", "op_kwargs")
172
173 def __init__(
174 self,
175 *,
176 python_callable: Callable,
177 op_args: Collection[Any] | None = None,
178 op_kwargs: Mapping[str, Any] | None = None,
179 templates_dict: dict[str, Any] | None = None,
180 templates_exts: Sequence[str] | None = None,
181 show_return_value_in_logs: bool = True,
182 **kwargs,
183 ) -> None:
184 super().__init__(**kwargs)
185 if not callable(python_callable):
186 raise AirflowException("`python_callable` param must be callable")
187 self.python_callable = python_callable
188 self.op_args = op_args or ()
189 self.op_kwargs = op_kwargs or {}
190 self.templates_dict = templates_dict
191 if templates_exts:
192 self.template_ext = templates_exts
193 self.show_return_value_in_logs = show_return_value_in_logs
194
195 def execute(self, context: Context) -> Any:
196 context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
197 self.op_kwargs = self.determine_kwargs(context)
198
199 # This needs to be lazy because subclasses may implement execute_callable
200 # by running a separate process that can't use the eager result.
201 def __prepare_execution() -> tuple[ExecutionCallableRunner, OutletEventAccessorsProtocol] | None:
202 if AIRFLOW_V_3_0_PLUS:
203 from airflow.sdk.execution_time.callback_runner import create_executable_runner
204 from airflow.sdk.execution_time.context import context_get_outlet_events
205
206 return create_executable_runner, context_get_outlet_events(context)
207 from airflow.utils.context import context_get_outlet_events # type: ignore
208 from airflow.utils.operator_helpers import ExecutionCallableRunner # type: ignore
209
210 return ExecutionCallableRunner, context_get_outlet_events(context)
211
212 self.__prepare_execution = __prepare_execution
213
214 return_value = self.execute_callable()
215 if self.show_return_value_in_logs:
216 self.log.info("Done. Returned value was: %s", return_value)
217 else:
218 self.log.info("Done. Returned value not shown")
219
220 return return_value
221
222 def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]:
223 return KeywordParameters.determine(self.python_callable, self.op_args, context).unpacking()
224
225 __prepare_execution: Callable[[], tuple[ExecutionCallableRunner, OutletEventAccessorsProtocol] | None]
226
227 def execute_callable(self) -> Any:
228 """
229 Call the python callable with the given arguments.
230
231 :return: the return value of the call.
232 """
233 if (execution_preparation := self.__prepare_execution()) is None:
234 return self.python_callable(*self.op_args, **self.op_kwargs)
235 create_execution_runner, asset_events = execution_preparation
236 runner = create_execution_runner(self.python_callable, asset_events, logger=self.log)
237 return runner.run(*self.op_args, **self.op_kwargs)
238
239
240class BranchPythonOperator(BaseBranchOperator, PythonOperator):
241 """
242 A workflow can "branch" or follow a path after the execution of this task.
243
244 It derives the PythonOperator and expects a Python function that returns
245 a single task_id, a single task_group_id, or a list of task_ids and/or
246 task_group_ids to follow. The task_id(s) and/or task_group_id(s) returned
247 should point to a task or task group directly downstream from {self}. All
248 other "branches" or directly downstream tasks are marked with a state of
249 ``skipped`` so that these paths can't move forward. The ``skipped`` states
250 are propagated downstream to allow for the DAG state to fill up and
251 the DAG run's state to be inferred.
252 """
253
254 def choose_branch(self, context: Context) -> str | Iterable[str]:
255 return PythonOperator.execute(self, context)
256
257
258class ShortCircuitOperator(PythonOperator, SkipMixin):
259 """
260 Allows a pipeline to continue based on the result of a ``python_callable``.
261
262 The ShortCircuitOperator is derived from the PythonOperator and evaluates the result of a
263 ``python_callable``. If the returned result is False or a falsy value, the pipeline will be
264 short-circuited. Downstream tasks will be marked with a state of "skipped" based on the short-circuiting
265 mode configured. If the returned result is True or a truthy value, downstream tasks proceed as normal and
266 an ``XCom`` of the returned result is pushed.
267
268 The short-circuiting can be configured to either respect or ignore the ``trigger_rule`` set for
269 downstream tasks. If ``ignore_downstream_trigger_rules`` is set to True, the default setting, all
270 downstream tasks are skipped without considering the ``trigger_rule`` defined for tasks. However, if this
271 parameter is set to False, the direct downstream tasks are skipped but the specified ``trigger_rule`` for
272 other subsequent downstream tasks are respected. In this mode, the operator assumes the direct downstream
273 tasks were purposely meant to be skipped but perhaps not other subsequent tasks.
274
275 .. seealso::
276 For more information on how to use this operator, take a look at the guide:
277 :ref:`howto/operator:ShortCircuitOperator`
278
279 :param ignore_downstream_trigger_rules: If set to True, all downstream tasks from this operator task will
280 be skipped. This is the default behavior. If set to False, the direct, downstream task(s) will be
281 skipped but the ``trigger_rule`` defined for all other downstream tasks will be respected.
282 """
283
284 inherits_from_skipmixin = True
285
286 def __init__(self, *, ignore_downstream_trigger_rules: bool = True, **kwargs) -> None:
287 super().__init__(**kwargs)
288 self.ignore_downstream_trigger_rules = ignore_downstream_trigger_rules
289
290 def execute(self, context: Context) -> Any:
291 condition = super().execute(context)
292 self.log.info("Condition result is %s", condition)
293
294 if condition:
295 self.log.info("Proceeding with downstream tasks...")
296 return condition
297
298 if not self.downstream_task_ids:
299 self.log.info("No downstream tasks; nothing to do.")
300 return condition
301
302 dag_run = context["dag_run"]
303
304 def get_tasks_to_skip():
305 if self.ignore_downstream_trigger_rules is True:
306 tasks = context["task"].get_flat_relatives(upstream=False)
307 else:
308 tasks = context["task"].get_direct_relatives(upstream=False)
309 for t in tasks:
310 if not t.is_teardown:
311 yield t
312
313 to_skip = get_tasks_to_skip()
314
315 # this lets us avoid an intermediate list unless debug logging
316 if self.log.getEffectiveLevel() <= logging.DEBUG:
317 self.log.debug("Downstream task IDs %s", to_skip := list(get_tasks_to_skip()))
318
319 self.log.info("Skipping downstream tasks")
320 if AIRFLOW_V_3_0_PLUS:
321 self.skip(
322 ti=context["ti"],
323 tasks=to_skip,
324 )
325 else:
326 if to_skip:
327 self.skip(
328 dag_run=context["dag_run"],
329 tasks=to_skip,
330 execution_date=cast("DateTime", dag_run.logical_date), # type: ignore[call-arg]
331 map_index=context["ti"].map_index,
332 )
333
334 self.log.info("Done.")
335 # returns the result of the super execute method as it is instead of returning None
336 return condition
337
338
339def _load_pickle():
340 import pickle
341
342 return pickle
343
344
345def _load_dill():
346 try:
347 import dill
348 except ModuleNotFoundError:
349 log.error("Unable to import `dill` module. Please please make sure that it installed.")
350 raise
351 return dill
352
353
354def _load_cloudpickle():
355 try:
356 import cloudpickle
357 except ModuleNotFoundError:
358 log.error(
359 "Unable to import `cloudpickle` module. "
360 "Please install it with: pip install 'apache-airflow[cloudpickle]'"
361 )
362 raise
363 return cloudpickle
364
365
366_SERIALIZERS: dict[_SerializerTypeDef, Any] = {
367 "pickle": lazy_object_proxy.Proxy(_load_pickle),
368 "dill": lazy_object_proxy.Proxy(_load_dill),
369 "cloudpickle": lazy_object_proxy.Proxy(_load_cloudpickle),
370}
371
372
373class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta):
374 BASE_SERIALIZABLE_CONTEXT_KEYS = {
375 "ds",
376 "ds_nodash",
377 "expanded_ti_count",
378 "inlets",
379 "outlets",
380 "run_id",
381 "task_instance_key_str",
382 "test_mode",
383 "ts",
384 "ts_nodash",
385 "ts_nodash_with_tz",
386 # The following should be removed when Airflow 2 support is dropped.
387 "next_ds",
388 "next_ds_nodash",
389 "prev_ds",
390 "prev_ds_nodash",
391 "tomorrow_ds",
392 "tomorrow_ds_nodash",
393 "yesterday_ds",
394 "yesterday_ds_nodash",
395 }
396 if AIRFLOW_V_3_0_PLUS:
397 BASE_SERIALIZABLE_CONTEXT_KEYS.add("task_reschedule_count")
398
399 PENDULUM_SERIALIZABLE_CONTEXT_KEYS = {
400 "data_interval_end",
401 "data_interval_start",
402 "logical_date",
403 "prev_data_interval_end_success",
404 "prev_data_interval_start_success",
405 "prev_start_date_success",
406 "prev_end_date_success",
407 # The following should be removed when Airflow 2 support is dropped.
408 "execution_date",
409 "next_execution_date",
410 "prev_execution_date",
411 "prev_execution_date_success",
412 }
413
414 AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = {
415 "macros",
416 "conf",
417 "dag",
418 "dag_run",
419 "task",
420 "params",
421 "triggering_asset_events",
422 # The following should be removed when Airflow 2 support is dropped.
423 "triggering_dataset_events",
424 }
425
426 def __init__(
427 self,
428 *,
429 python_callable: Callable,
430 serializer: _SerializerTypeDef | None = None,
431 op_args: Collection[Any] | None = None,
432 op_kwargs: Mapping[str, Any] | None = None,
433 string_args: Iterable[str] | None = None,
434 templates_dict: dict | None = None,
435 templates_exts: list[str] | None = None,
436 expect_airflow: bool = True,
437 skip_on_exit_code: int | Container[int] | None = None,
438 env_vars: dict[str, str] | None = None,
439 inherit_env: bool = True,
440 **kwargs,
441 ):
442 if (
443 not isinstance(python_callable, types.FunctionType)
444 or isinstance(python_callable, types.LambdaType)
445 and python_callable.__name__ == "<lambda>"
446 ):
447 raise ValueError(f"{type(self).__name__} only supports functions for python_callable arg")
448 if inspect.isgeneratorfunction(python_callable):
449 raise ValueError(f"{type(self).__name__} does not support using 'yield' in python_callable")
450 super().__init__(
451 python_callable=python_callable,
452 op_args=op_args,
453 op_kwargs=op_kwargs,
454 templates_dict=templates_dict,
455 templates_exts=templates_exts,
456 **kwargs,
457 )
458 self.string_args = string_args or []
459
460 serializer = serializer or "pickle"
461 if serializer not in _SERIALIZERS:
462 msg = (
463 f"Unsupported serializer {serializer!r}. Expected one of {', '.join(map(repr, _SERIALIZERS))}"
464 )
465 raise AirflowException(msg)
466
467 self.pickling_library = _SERIALIZERS[serializer]
468 self.serializer: _SerializerTypeDef = serializer
469
470 self.expect_airflow = expect_airflow
471 self.skip_on_exit_code = (
472 skip_on_exit_code
473 if isinstance(skip_on_exit_code, Container)
474 else [skip_on_exit_code]
475 if skip_on_exit_code is not None
476 else []
477 )
478 self.env_vars = env_vars
479 self.inherit_env = inherit_env
480
481 @abstractmethod
482 def _iter_serializable_context_keys(self):
483 pass
484
485 def execute(self, context: Context) -> Any:
486 serializable_keys = set(self._iter_serializable_context_keys())
487 new = {k: v for k, v in context.items() if k in serializable_keys}
488 serializable_context = cast("Context", new)
489 # Store bundle_path for subprocess execution
490 self._bundle_path = self._get_bundle_path_from_context(context)
491 return super().execute(context=serializable_context)
492
493 def _get_bundle_path_from_context(self, context: Context) -> str | None:
494 """
495 Extract bundle_path from the task instance's bundle_instance.
496
497 :param context: The task execution context
498 :return: Path to the bundle root directory, or None if not in a bundle
499 """
500 if not AIRFLOW_V_3_0_PLUS:
501 return None
502
503 # In Airflow 3.x, the RuntimeTaskInstance has a bundle_instance attribute
504 # that contains the bundle information including its path
505 ti = context["ti"]
506 if bundle_instance := getattr(ti, "bundle_instance", None):
507 return bundle_instance.path
508
509 return None
510
511 def get_python_source(self):
512 """Return the source of self.python_callable."""
513 return textwrap.dedent(inspect.getsource(self.python_callable))
514
515 def _write_args(self, file: Path):
516 def resolve_proxies(obj):
517 """Recursively replaces lazy_object_proxy.Proxy instances with their resolved values."""
518 if isinstance(obj, lazy_object_proxy.Proxy):
519 return obj.__wrapped__ # force evaluation
520 if isinstance(obj, dict):
521 return {k: resolve_proxies(v) for k, v in obj.items()}
522 if isinstance(obj, list):
523 return [resolve_proxies(v) for v in obj]
524 return obj
525
526 if self.op_args or self.op_kwargs:
527 self.log.info("Use %r as serializer.", self.serializer)
528 file.write_bytes(
529 self.pickling_library.dumps({"args": self.op_args, "kwargs": resolve_proxies(self.op_kwargs)})
530 )
531
532 def _write_string_args(self, file: Path):
533 file.write_text("\n".join(map(str, self.string_args)))
534
535 def _read_result(self, path: Path):
536 if path.stat().st_size == 0:
537 return None
538 try:
539 return self.pickling_library.loads(path.read_bytes())
540 except ValueError as value_error:
541 raise DeserializingResultError() from value_error
542
543 def __deepcopy__(self, memo):
544 # module objects can't be copied _at all__
545 memo[id(self.pickling_library)] = self.pickling_library
546 return super().__deepcopy__(memo)
547
548 def _execute_python_callable_in_subprocess(self, python_path: Path):
549 with TemporaryDirectory(prefix="venv-call") as tmp:
550 tmp_dir = Path(tmp)
551 op_kwargs: dict[str, Any] = dict(self.op_kwargs)
552 if self.templates_dict:
553 op_kwargs["templates_dict"] = self.templates_dict
554 input_path = tmp_dir / "script.in"
555 output_path = tmp_dir / "script.out"
556 string_args_path = tmp_dir / "string_args.txt"
557 script_path = tmp_dir / "script.py"
558 termination_log_path = tmp_dir / "termination.log"
559 airflow_context_path = tmp_dir / "airflow_context.json"
560
561 self._write_args(input_path)
562 self._write_string_args(string_args_path)
563
564 jinja_context = {
565 "op_args": self.op_args,
566 "op_kwargs": op_kwargs,
567 "expect_airflow": self.expect_airflow,
568 "pickling_library": self.serializer,
569 "python_callable": self.python_callable.__name__,
570 "python_callable_source": self.get_python_source(),
571 }
572
573 if inspect.getfile(self.python_callable) == self.dag.fileloc:
574 jinja_context["modified_dag_module_name"] = get_unique_dag_module_name(self.dag.fileloc)
575
576 write_python_script(
577 jinja_context=jinja_context,
578 filename=os.fspath(script_path),
579 render_template_as_native_obj=self.dag.render_template_as_native_obj,
580 )
581
582 env_vars = dict(os.environ) if self.inherit_env else {}
583 if fd := os.getenv("__AIRFLOW_SUPERVISOR_FD"):
584 env_vars["__AIRFLOW_SUPERVISOR_FD"] = fd
585 if self.env_vars:
586 env_vars.update(self.env_vars)
587
588 # Add bundle_path to PYTHONPATH for subprocess to import Dag bundle modules
589 if self._bundle_path:
590 bundle_path = self._bundle_path
591 existing_pythonpath = env_vars.get("PYTHONPATH", "")
592 if existing_pythonpath:
593 # Append bundle_path after existing PYTHONPATH
594 env_vars["PYTHONPATH"] = f"{existing_pythonpath}{os.pathsep}{bundle_path}"
595 else:
596 env_vars["PYTHONPATH"] = bundle_path
597
598 try:
599 cmd: list[str] = [
600 os.fspath(python_path),
601 os.fspath(script_path),
602 os.fspath(input_path),
603 os.fspath(output_path),
604 os.fspath(string_args_path),
605 os.fspath(termination_log_path),
606 os.fspath(airflow_context_path),
607 ]
608 _execute_in_subprocess(
609 cmd=cmd,
610 env=env_vars,
611 )
612 except subprocess.CalledProcessError as e:
613 if e.returncode in self.skip_on_exit_code:
614 raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.")
615 if termination_log_path.exists() and termination_log_path.stat().st_size > 0:
616 error_msg = f"Process returned non-zero exit status {e.returncode}.\n"
617 with open(termination_log_path) as file:
618 error_msg += file.read()
619 raise AirflowException(error_msg) from None
620 raise
621
622 if 0 in self.skip_on_exit_code:
623 raise AirflowSkipException("Process exited with code 0. Skipping.")
624
625 return self._read_result(output_path)
626
627 def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]:
628 keyword_params = KeywordParameters.determine(self.python_callable, self.op_args, context)
629 if AIRFLOW_V_3_0_PLUS:
630 return keyword_params.unpacking()
631 return keyword_params.serializing() # type: ignore[attr-defined]
632
633
634class PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
635 """
636 Run a function in a virtualenv that is created and destroyed automatically.
637
638 The function (has certain caveats) must be defined using def, and not be
639 part of a class. All imports must happen inside the function
640 and no variables outside the scope may be referenced. A global scope
641 variable named virtualenv_string_args will be available (populated by
642 string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
643 can use a return value.
644 Note that if your virtualenv runs in a different Python major version than Airflow,
645 you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
646 Airflow through plugins. You can use string_args though.
647
648 .. seealso::
649 For more information on how to use this operator, take a look at the guide:
650 :ref:`howto/operator:PythonVirtualenvOperator`
651
652 :param python_callable: A python function with no references to outside variables,
653 defined with def, which will be run in a virtual environment.
654 :param requirements: Either a list of requirement strings, or a (templated)
655 "requirements file" as specified by pip.
656 :param python_version: The Python version to run the virtual environment with. Note that
657 both 2 and 2.7 are acceptable forms.
658 :param serializer: Which serializer use to serialize the args and result. It can be one of the following:
659
660 - ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
661 - ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
662 this requires to include cloudpickle in your requirements.
663 - ``"dill"``: Use dill for serialize more complex types,
664 this requires to include dill in your requirements.
665 :param system_site_packages: Whether to include
666 system_site_packages in your virtual environment.
667 See virtualenv documentation for more information.
668 :param pip_install_options: a list of pip install options when installing requirements
669 See 'pip install -h' for available options
670 :param op_args: A list of positional arguments to pass to python_callable.
671 :param op_kwargs: A dict of keyword arguments to pass to python_callable.
672 :param string_args: Strings that are present in the global var virtualenv_string_args,
673 available to python_callable at runtime as a list[str]. Note that args are split
674 by newline.
675 :param templates_dict: a dictionary where the values are templates that
676 will get templated by the Airflow engine sometime between
677 ``__init__`` and ``execute`` takes place and are made available
678 in your callable's context after the template has been applied
679 :param templates_exts: a list of file extensions to resolve while
680 processing templated fields, for examples ``['.sql', '.hql']``
681 :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator
682 will raise warning if Airflow is not installed, and it will attempt to load Airflow
683 macros when starting.
684 :param skip_on_exit_code: If python_callable exits with this exit code, leave the task
685 in ``skipped`` state (default: None). If set to ``None``, any non-zero
686 exit code will be treated as a failure.
687 :param index_urls: an optional list of index urls to load Python packages from.
688 If not provided the system pip conf will be used to source packages from.
689 :param index_urls_from_connection_ids: An optional list of ``PackageIndex`` connection IDs.
690 Will be appended to ``index_urls``.
691 :param venv_cache_path: Optional path to the virtual environment parent folder in which the
692 virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be replaced
693 with a checksum of requirements. If not provided the virtual environment will be created and deleted
694 in a temp folder for every execution.
695 :param env_vars: A dictionary containing additional environment variables to set for the virtual
696 environment when it is executed.
697 :param inherit_env: Whether to inherit the current environment variables when executing the virtual
698 environment. If set to ``True``, the virtual environment will inherit the environment variables
699 of the parent process (``os.environ``). If set to ``False``, the virtual environment will be
700 executed with a clean environment.
701 """
702
703 template_fields: Sequence[str] = tuple(
704 {"requirements", "index_urls", "index_urls_from_connection_ids", "venv_cache_path"}.union(
705 PythonOperator.template_fields
706 )
707 )
708 template_ext: Sequence[str] = (".txt",)
709
710 def __init__(
711 self,
712 *,
713 python_callable: Callable,
714 requirements: None | Iterable[str] | str = None,
715 python_version: str | None = None,
716 serializer: _SerializerTypeDef | None = None,
717 system_site_packages: bool = True,
718 pip_install_options: list[str] | None = None,
719 op_args: Collection[Any] | None = None,
720 op_kwargs: Mapping[str, Any] | None = None,
721 string_args: Iterable[str] | None = None,
722 templates_dict: dict | None = None,
723 templates_exts: list[str] | None = None,
724 expect_airflow: bool = True,
725 skip_on_exit_code: int | Container[int] | None = None,
726 index_urls: None | Collection[str] | str = None,
727 index_urls_from_connection_ids: None | Collection[str] | str = None,
728 venv_cache_path: None | os.PathLike[str] = None,
729 env_vars: dict[str, str] | None = None,
730 inherit_env: bool = True,
731 **kwargs,
732 ):
733 if (
734 python_version
735 and str(python_version)[0] != str(sys.version_info.major)
736 and (op_args or op_kwargs)
737 ):
738 raise AirflowException(
739 "Passing op_args or op_kwargs is not supported across different Python "
740 "major versions for PythonVirtualenvOperator. Please use string_args."
741 f"Sys version: {sys.version_info}. Virtual environment version: {python_version}"
742 )
743 if python_version is not None and not isinstance(python_version, str):
744 raise AirflowException(
745 "Passing non-string types (e.g. int or float) as python_version not supported"
746 )
747 if not requirements:
748 self.requirements: list[str] = []
749 elif isinstance(requirements, str):
750 self.requirements = [requirements]
751 else:
752 self.requirements = list(requirements)
753 self.python_version = python_version
754 self.system_site_packages = system_site_packages
755 self.pip_install_options = pip_install_options
756 if isinstance(index_urls, str):
757 self.index_urls: list[str] | None = [index_urls]
758 elif isinstance(index_urls, Collection):
759 self.index_urls = list(index_urls)
760 else:
761 self.index_urls = None
762 if isinstance(index_urls_from_connection_ids, str):
763 self.index_urls_from_connection_ids: list[str] | None = [index_urls_from_connection_ids]
764 elif isinstance(index_urls_from_connection_ids, Collection):
765 self.index_urls_from_connection_ids = list(index_urls_from_connection_ids)
766 else:
767 self.index_urls_from_connection_ids = None
768 self.venv_cache_path = venv_cache_path
769 super().__init__(
770 python_callable=python_callable,
771 serializer=serializer,
772 op_args=op_args,
773 op_kwargs=op_kwargs,
774 string_args=string_args,
775 templates_dict=templates_dict,
776 templates_exts=templates_exts,
777 expect_airflow=expect_airflow,
778 skip_on_exit_code=skip_on_exit_code,
779 env_vars=env_vars,
780 inherit_env=inherit_env,
781 **kwargs,
782 )
783
784 def _requirements_list(self, exclude_cloudpickle: bool = False) -> list[str]:
785 """Prepare a list of requirements that need to be installed for the virtual environment."""
786 requirements = [str(dependency) for dependency in self.requirements]
787 if not self.system_site_packages:
788 if (
789 self.serializer == "cloudpickle"
790 and not exclude_cloudpickle
791 and "cloudpickle" not in requirements
792 ):
793 requirements.append("cloudpickle")
794 elif self.serializer == "dill" and "dill" not in requirements:
795 requirements.append("dill")
796 requirements.sort() # Ensure a hash is stable
797 return requirements
798
799 def _prepare_venv(self, venv_path: Path) -> None:
800 """Prepare the requirements and installs the virtual environment."""
801 requirements_file = venv_path / "requirements.txt"
802 requirements_file.write_text("\n".join(self._requirements_list()))
803 prepare_virtualenv(
804 venv_directory=str(venv_path),
805 python_bin=f"python{self.python_version}" if self.python_version else "python",
806 system_site_packages=self.system_site_packages,
807 requirements_file_path=str(requirements_file),
808 pip_install_options=self.pip_install_options,
809 index_urls=self.index_urls,
810 )
811
812 def _calculate_cache_hash(self, exclude_cloudpickle: bool = False) -> tuple[str, str]:
813 """
814 Generate the hash of the cache folder to use.
815
816 The following factors are used as input for the hash:
817 - (sorted) list of requirements
818 - pip install options
819 - flag of system site packages
820 - python version
821 - Variable to override the hash with a cache key
822 - Index URLs
823
824 Returns a hash and the data dict which is the base for the hash as text.
825 """
826 hash_dict = {
827 "requirements_list": self._requirements_list(exclude_cloudpickle=exclude_cloudpickle),
828 "pip_install_options": self.pip_install_options,
829 "index_urls": self.index_urls,
830 "cache_key": str(Variable.get("PythonVirtualenvOperator.cache_key", "")),
831 "python_version": self.python_version,
832 "system_site_packages": self.system_site_packages,
833 }
834 hash_text = json.dumps(hash_dict, sort_keys=True)
835 hash_object = hashlib_wrapper.md5(hash_text.encode())
836 requirements_hash = hash_object.hexdigest()
837 return requirements_hash[:8], hash_text
838
839 def _ensure_venv_cache_exists(self, venv_cache_path: Path) -> Path:
840 """Ensure a valid virtual environment is set up and will create inplace."""
841 cache_hash, hash_data = self._calculate_cache_hash()
842 venv_path = venv_cache_path / f"venv-{cache_hash}"
843 self.log.info("Python virtual environment will be cached in %s", venv_path)
844 venv_path.parent.mkdir(parents=True, exist_ok=True)
845 with open(f"{venv_path}.lock", "w") as f:
846 # Ensure that cache is not build by parallel workers
847 import fcntl
848
849 fcntl.flock(f, fcntl.LOCK_EX)
850
851 hash_marker = venv_path / "install_complete_marker.json"
852 try:
853 if venv_path.exists():
854 if hash_marker.exists():
855 previous_hash_data = hash_marker.read_text(encoding="utf8")
856 if previous_hash_data == hash_data:
857 self.log.info("Reusing cached Python virtual environment in %s", venv_path)
858 return venv_path
859
860 _, hash_data_before_upgrade = self._calculate_cache_hash(exclude_cloudpickle=True)
861 if previous_hash_data == hash_data_before_upgrade:
862 self.log.warning(
863 "Found a previous virtual environment in with outdated dependencies %s, "
864 "deleting and re-creating.",
865 venv_path,
866 )
867 else:
868 self.log.error(
869 "Unicorn alert: Found a previous virtual environment in %s "
870 "with the same hash but different parameters. Previous setup: '%s' / "
871 "Requested venv setup: '%s'. Please report a bug to airflow!",
872 venv_path,
873 previous_hash_data,
874 hash_data,
875 )
876 else:
877 self.log.warning(
878 "Found a previous (probably partial installed) virtual environment in %s, "
879 "deleting and re-creating.",
880 venv_path,
881 )
882
883 shutil.rmtree(venv_path)
884
885 venv_path.mkdir(parents=True)
886 self._prepare_venv(venv_path)
887 hash_marker.write_text(hash_data, encoding="utf8")
888 except Exception as e:
889 shutil.rmtree(venv_path)
890 raise AirflowException(f"Unable to create new virtual environment in {venv_path}") from e
891 self.log.info("New Python virtual environment created in %s", venv_path)
892 return venv_path
893
894 def _cleanup_python_pycache_dir(self, cache_dir_path: Path) -> None:
895 try:
896 shutil.rmtree(cache_dir_path)
897 self.log.debug("The directory %s has been deleted.", cache_dir_path)
898 except FileNotFoundError:
899 self.log.warning("Fail to delete %s. The directory does not exist.", cache_dir_path)
900 except PermissionError:
901 self.log.warning("Permission denied to delete the directory %s.", cache_dir_path)
902
903 def _retrieve_index_urls_from_connection_ids(self):
904 """Retrieve index URLs from Package Index connections."""
905 if self.index_urls is None:
906 self.index_urls = []
907 for conn_id in self.index_urls_from_connection_ids:
908 conn_url = PackageIndexHook(conn_id).get_connection_url()
909 self.index_urls.append(conn_url)
910
911 def execute_callable(self):
912 if self.index_urls_from_connection_ids:
913 self._retrieve_index_urls_from_connection_ids()
914
915 if self.venv_cache_path:
916 venv_path = self._ensure_venv_cache_exists(Path(self.venv_cache_path))
917 python_path = venv_path / "bin" / "python"
918 return self._execute_python_callable_in_subprocess(python_path)
919
920 with TemporaryDirectory(prefix="venv") as tmp_dir:
921 tmp_path = Path(tmp_dir)
922 custom_pycache_prefix = Path(sys.pycache_prefix or "")
923 r_path = tmp_path.relative_to(tmp_path.anchor)
924 venv_python_cache_dir = Path.cwd() / custom_pycache_prefix / r_path
925 self._prepare_venv(tmp_path)
926 python_path = tmp_path / "bin" / "python"
927 result = self._execute_python_callable_in_subprocess(python_path)
928 self._cleanup_python_pycache_dir(venv_python_cache_dir)
929 return result
930
931 def _iter_serializable_context_keys(self):
932 yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS
933
934 found_airflow = found_pendulum = False
935
936 if self.system_site_packages:
937 # If we're using system packages, assume both are present
938 found_airflow = found_pendulum = True
939 else:
940 for raw_str in chain.from_iterable(req.splitlines() for req in self.requirements):
941 line = raw_str.strip()
942 # Skip blank lines and full‐line comments
943 if not line or line.startswith("#"):
944 continue
945
946 # Strip off any inline comment
947 # e.g. turn "foo==1.2.3 # comment" → "foo==1.2.3"
948 req_str = re.sub(r"#.*$", "", line).strip()
949
950 try:
951 req = Requirement(req_str)
952 except (InvalidRequirement, InvalidSpecifier, InvalidVersion) as e:
953 raise ValueError(f"Invalid requirement '{raw_str}': {e}") from e
954
955 if req.name == "apache-airflow":
956 found_airflow = found_pendulum = True
957 break
958 elif req.name == "pendulum":
959 found_pendulum = True
960
961 if found_airflow:
962 yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS
963 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS
964 elif found_pendulum:
965 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS
966
967
968class BranchPythonVirtualenvOperator(BaseBranchOperator, PythonVirtualenvOperator):
969 """
970 A workflow can "branch" or follow a path after the execution of this task in a virtual environment.
971
972 It derives the PythonVirtualenvOperator and expects a Python function that returns
973 a single task_id, a single task_group_id, or a list of task_ids and/or
974 task_group_ids to follow. The task_id(s) and/or task_group_id(s) returned
975 should point to a task or task group directly downstream from {self}. All
976 other "branches" or directly downstream tasks are marked with a state of
977 ``skipped`` so that these paths can't move forward. The ``skipped`` states
978 are propagated downstream to allow for the DAG state to fill up and
979 the DAG run's state to be inferred.
980
981 .. seealso::
982 For more information on how to use this operator, take a look at the guide:
983 :ref:`howto/operator:BranchPythonVirtualenvOperator`
984 """
985
986 def choose_branch(self, context: Context) -> str | Iterable[str]:
987 return PythonVirtualenvOperator.execute(self, context)
988
989
990class ExternalPythonOperator(_BasePythonVirtualenvOperator):
991 """
992 Run a function in a virtualenv that is not re-created.
993
994 Reused as is without the overhead of creating the virtual environment (with certain caveats).
995
996 The function must be defined using def, and not be
997 part of a class. All imports must happen inside the function
998 and no variables outside the scope may be referenced. A global scope
999 variable named virtualenv_string_args will be available (populated by
1000 string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
1001 can use a return value.
1002 Note that if your virtual environment runs in a different Python major version than Airflow,
1003 you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
1004 Airflow through plugins. You can use string_args though.
1005
1006 If Airflow is installed in the external environment in different version that the version
1007 used by the operator, the operator will fail.,
1008
1009 .. seealso::
1010 For more information on how to use this operator, take a look at the guide:
1011 :ref:`howto/operator:ExternalPythonOperator`
1012
1013 :param python: Full path string (file-system specific) that points to a Python binary inside
1014 a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path
1015 (so usually start with "/" or "X:/" depending on the filesystem/os used).
1016 :param python_callable: A python function with no references to outside variables,
1017 defined with def, which will be run in a virtual environment.
1018 :param serializer: Which serializer use to serialize the args and result. It can be one of the following:
1019
1020 - ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
1021 - ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
1022 this requires to include cloudpickle in your requirements.
1023 - ``"dill"``: Use dill for serialize more complex types,
1024 this requires to include dill in your requirements.
1025 :param op_args: A list of positional arguments to pass to python_callable.
1026 :param op_kwargs: A dict of keyword arguments to pass to python_callable.
1027 :param string_args: Strings that are present in the global var virtualenv_string_args,
1028 available to python_callable at runtime as a list[str]. Note that args are split
1029 by newline.
1030 :param templates_dict: a dictionary where the values are templates that
1031 will get templated by the Airflow engine sometime between
1032 ``__init__`` and ``execute`` takes place and are made available
1033 in your callable's context after the template has been applied
1034 :param templates_exts: a list of file extensions to resolve while
1035 processing templated fields, for examples ``['.sql', '.hql']``
1036 :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator
1037 will raise warning if Airflow is not installed, and it will attempt to load Airflow
1038 macros when starting.
1039 :param skip_on_exit_code: If python_callable exits with this exit code, leave the task
1040 in ``skipped`` state (default: None). If set to ``None``, any non-zero
1041 exit code will be treated as a failure.
1042 :param env_vars: A dictionary containing additional environment variables to set for the virtual
1043 environment when it is executed.
1044 :param inherit_env: Whether to inherit the current environment variables when executing the virtual
1045 environment. If set to ``True``, the virtual environment will inherit the environment variables
1046 of the parent process (``os.environ``). If set to ``False``, the virtual environment will be
1047 executed with a clean environment.
1048 """
1049
1050 template_fields: Sequence[str] = tuple({"python"}.union(PythonOperator.template_fields))
1051
1052 def __init__(
1053 self,
1054 *,
1055 python: str,
1056 python_callable: Callable,
1057 serializer: _SerializerTypeDef | None = None,
1058 op_args: Collection[Any] | None = None,
1059 op_kwargs: Mapping[str, Any] | None = None,
1060 string_args: Iterable[str] | None = None,
1061 templates_dict: dict | None = None,
1062 templates_exts: list[str] | None = None,
1063 expect_airflow: bool = True,
1064 expect_pendulum: bool = False,
1065 skip_on_exit_code: int | Container[int] | None = None,
1066 env_vars: dict[str, str] | None = None,
1067 inherit_env: bool = True,
1068 **kwargs,
1069 ):
1070 if not python:
1071 raise ValueError("Python Path must be defined in ExternalPythonOperator")
1072 self.python = python
1073 self.expect_pendulum = expect_pendulum
1074 super().__init__(
1075 python_callable=python_callable,
1076 serializer=serializer,
1077 op_args=op_args,
1078 op_kwargs=op_kwargs,
1079 string_args=string_args,
1080 templates_dict=templates_dict,
1081 templates_exts=templates_exts,
1082 expect_airflow=expect_airflow,
1083 skip_on_exit_code=skip_on_exit_code,
1084 env_vars=env_vars,
1085 inherit_env=inherit_env,
1086 **kwargs,
1087 )
1088
1089 def execute_callable(self):
1090 python_path = Path(self.python)
1091 if not python_path.exists():
1092 raise ValueError(f"Python Path '{python_path}' must exists")
1093 if not python_path.is_file():
1094 raise ValueError(f"Python Path '{python_path}' must be a file")
1095 if not python_path.is_absolute():
1096 raise ValueError(f"Python Path '{python_path}' must be an absolute path.")
1097 python_version = _PythonVersionInfo.from_executable(self.python)
1098 if python_version.major != sys.version_info.major and (self.op_args or self.op_kwargs):
1099 raise AirflowException(
1100 "Passing op_args or op_kwargs is not supported across different Python "
1101 "major versions for ExternalPythonOperator. Please use string_args."
1102 f"Sys version: {sys.version_info}. "
1103 f"Virtual environment version: {python_version}"
1104 )
1105 return self._execute_python_callable_in_subprocess(python_path)
1106
1107 def _iter_serializable_context_keys(self):
1108 yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS
1109 if self.expect_airflow and self._get_airflow_version_from_target_env():
1110 yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS
1111 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS
1112 elif self._is_pendulum_installed_in_target_env():
1113 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS
1114
1115 def _is_pendulum_installed_in_target_env(self) -> bool:
1116 try:
1117 subprocess.check_call([self.python, "-c", "import pendulum"])
1118 return True
1119 except Exception as e:
1120 if self.expect_pendulum:
1121 self.log.warning("When checking for Pendulum installed in virtual environment got %s", e)
1122 self.log.warning(
1123 "Pendulum is not properly installed in the virtual environment "
1124 "Pendulum context keys will not be available. "
1125 "Please Install Pendulum or Airflow in your virtual environment to access them."
1126 )
1127 return False
1128
1129 @property
1130 def _external_airflow_version_script(self):
1131 """
1132 Return python script which determines the version of the Apache Airflow.
1133
1134 Import airflow as a module might take a while as a result,
1135 obtaining a version would take up to 1 second.
1136 On the other hand, `importlib.metadata.version` will retrieve the package version pretty fast
1137 something below 100ms; this includes new subprocess overhead.
1138
1139 Possible side effect: It might be a situation that `importlib.metadata` is not available (Python < 3.8),
1140 as well as backport `importlib_metadata` which might indicate that venv doesn't contain an `apache-airflow`
1141 or something wrong with the environment.
1142 """
1143 return textwrap.dedent(
1144 """
1145 try:
1146 from importlib.metadata import version
1147 except ImportError:
1148 from importlib_metadata import version
1149 print(version("apache-airflow"))
1150 """
1151 )
1152
1153 def _get_airflow_version_from_target_env(self) -> str | None:
1154 from airflow import __version__ as airflow_version
1155
1156 try:
1157 result = subprocess.check_output(
1158 [self.python, "-c", self._external_airflow_version_script],
1159 text=True,
1160 )
1161 target_airflow_version = result.strip()
1162 if target_airflow_version != airflow_version:
1163 raise AirflowConfigException(
1164 f"The version of Airflow installed for the {self.python} "
1165 f"({target_airflow_version}) is different than the runtime Airflow version: "
1166 f"{airflow_version}. Make sure your environment has the same Airflow version "
1167 f"installed as the Airflow runtime."
1168 )
1169 return target_airflow_version
1170 except Exception as e:
1171 if self.expect_airflow:
1172 self.log.warning("When checking for Airflow installed in virtual environment got %s", e)
1173 self.log.warning(
1174 "This means that Airflow is not properly installed by %s. "
1175 "Airflow context keys will not be available. "
1176 "Please Install Airflow %s in your environment to access them.",
1177 self.python,
1178 airflow_version,
1179 )
1180 return None
1181
1182
1183class BranchExternalPythonOperator(BaseBranchOperator, ExternalPythonOperator):
1184 """
1185 A workflow can "branch" or follow a path after the execution of this task.
1186
1187 Extends ExternalPythonOperator, so expects to get Python:
1188 virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path,
1189 so it can run on separate virtual environment similarly to ExternalPythonOperator.
1190
1191 .. seealso::
1192 For more information on how to use this operator, take a look at the guide:
1193 :ref:`howto/operator:BranchExternalPythonOperator`
1194 """
1195
1196 def choose_branch(self, context: Context) -> str | Iterable[str]:
1197 return ExternalPythonOperator.execute(self, context)
1198
1199
1200def get_current_context() -> Mapping[str, Any]:
1201 """
1202 Retrieve the execution context dictionary without altering user method's signature.
1203
1204 This is the simplest method of retrieving the execution context dictionary.
1205
1206 **Old style:**
1207
1208 .. code:: python
1209
1210 def my_task(**context):
1211 ti = context["ti"]
1212
1213 **New style:**
1214
1215 .. code:: python
1216
1217 from airflow.providers.standard.operators.python import get_current_context
1218
1219
1220 def my_task():
1221 context = get_current_context()
1222 ti = context["ti"]
1223
1224 Current context will only have value if this method was called after an operator
1225 was starting to execute.
1226 """
1227 if AIRFLOW_V_3_0_PLUS:
1228 warnings.warn(
1229 "Using get_current_context from standard provider is deprecated and will be removed."
1230 "Please import `from airflow.sdk import get_current_context` and use it instead.",
1231 AirflowProviderDeprecationWarning,
1232 stacklevel=2,
1233 )
1234
1235 from airflow.sdk import get_current_context
1236
1237 return get_current_context()
1238 return _get_current_context()
1239
1240
1241def _get_current_context() -> Mapping[str, Any]:
1242 # Airflow 2.x
1243 # TODO: To be removed when Airflow 2 support is dropped
1244 from airflow.models.taskinstance import _CURRENT_CONTEXT # type: ignore[attr-defined]
1245
1246 if not _CURRENT_CONTEXT:
1247 raise RuntimeError(
1248 "Current context was requested but no context was found! Are you running within an Airflow task?"
1249 )
1250 return _CURRENT_CONTEXT[-1]