Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/operators/python.py: 33%
237 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import inspect
21import os
22import pickle
23import shutil
24import subprocess
25import sys
26import types
27import warnings
28from abc import ABCMeta, abstractmethod
29from pathlib import Path
30from tempfile import TemporaryDirectory
31from textwrap import dedent
32from typing import Any, Callable, Collection, Iterable, Mapping, Sequence
34import dill
36from airflow.exceptions import AirflowConfigException, AirflowException, RemovedInAirflow3Warning
37from airflow.models.baseoperator import BaseOperator
38from airflow.models.skipmixin import SkipMixin
39from airflow.models.taskinstance import _CURRENT_CONTEXT
40from airflow.utils.context import Context, context_copy_partial, context_merge
41from airflow.utils.operator_helpers import KeywordParameters
42from airflow.utils.process_utils import execute_in_subprocess
43from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script
44from airflow.version import version as airflow_version
47def task(python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs):
48 """
49 Deprecated function.
50 Calls @task.python and allows users to turn a python function into
51 an Airflow task. Please use the following instead:
53 from airflow.decorators import task
55 @task
56 def my_task()
58 :param python_callable: A reference to an object that is callable
59 :param op_kwargs: a dictionary of keyword arguments that will get unpacked
60 in your function (templated)
61 :param op_args: a list of positional arguments that will get unpacked when
62 calling your callable (templated)
63 :param multiple_outputs: if set, function return value will be
64 unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
65 Defaults to False.
66 :return:
67 """
68 # To maintain backwards compatibility, we import the task object into this file
69 # This prevents breakages in dags that use `from airflow.operators.python import task`
70 from airflow.decorators.python import python_task
72 warnings.warn(
73 """airflow.operators.python.task is deprecated. Please use the following instead
75 from airflow.decorators import task
76 @task
77 def my_task()""",
78 RemovedInAirflow3Warning,
79 stacklevel=2,
80 )
81 return python_task(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)
84class PythonOperator(BaseOperator):
85 """
86 Executes a Python callable.
88 .. seealso::
89 For more information on how to use this operator, take a look at the guide:
90 :ref:`howto/operator:PythonOperator`
92 When running your callable, Airflow will pass a set of keyword arguments that can be used in your
93 function. This set of kwargs correspond exactly to what you can use in your jinja templates.
94 For this to work, you need to define ``**kwargs`` in your function header, or you can add directly the
95 keyword arguments you would like to get - for example with the below code your callable will get
96 the values of ``ti`` and ``next_ds`` context variables.
98 With explicit arguments:
100 .. code-block:: python
102 def my_python_callable(ti, next_ds):
103 pass
105 With kwargs:
107 .. code-block:: python
109 def my_python_callable(**kwargs):
110 ti = kwargs["ti"]
111 next_ds = kwargs["next_ds"]
114 :param python_callable: A reference to an object that is callable
115 :param op_kwargs: a dictionary of keyword arguments that will get unpacked
116 in your function
117 :param op_args: a list of positional arguments that will get unpacked when
118 calling your callable
119 :param templates_dict: a dictionary where the values are templates that
120 will get templated by the Airflow engine sometime between
121 ``__init__`` and ``execute`` takes place and are made available
122 in your callable's context after the template has been applied. (templated)
123 :param templates_exts: a list of file extensions to resolve while
124 processing templated fields, for examples ``['.sql', '.hql']``
125 :param show_return_value_in_logs: a bool value whether to show return_value
126 logs. Defaults to True, which allows return value log output.
127 It can be set to False to prevent log output of return value when you return huge data
128 such as transmission a large amount of XCom to TaskAPI.
129 """
131 template_fields: Sequence[str] = ("templates_dict", "op_args", "op_kwargs")
132 template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}
133 BLUE = "#ffefeb"
134 ui_color = BLUE
136 # since we won't mutate the arguments, we should just do the shallow copy
137 # there are some cases we can't deepcopy the objects(e.g protobuf).
138 shallow_copy_attrs: Sequence[str] = (
139 "python_callable",
140 "op_kwargs",
141 )
143 def __init__(
144 self,
145 *,
146 python_callable: Callable,
147 op_args: Collection[Any] | None = None,
148 op_kwargs: Mapping[str, Any] | None = None,
149 templates_dict: dict[str, Any] | None = None,
150 templates_exts: Sequence[str] | None = None,
151 show_return_value_in_logs: bool = True,
152 **kwargs,
153 ) -> None:
154 if kwargs.get("provide_context"):
155 warnings.warn(
156 "provide_context is deprecated as of 2.0 and is no longer required",
157 RemovedInAirflow3Warning,
158 stacklevel=2,
159 )
160 kwargs.pop("provide_context", None)
161 super().__init__(**kwargs)
162 if not callable(python_callable):
163 raise AirflowException("`python_callable` param must be callable")
164 self.python_callable = python_callable
165 self.op_args = op_args or ()
166 self.op_kwargs = op_kwargs or {}
167 self.templates_dict = templates_dict
168 if templates_exts:
169 self.template_ext = templates_exts
170 self.show_return_value_in_logs = show_return_value_in_logs
172 def execute(self, context: Context) -> Any:
173 context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
174 self.op_kwargs = self.determine_kwargs(context)
176 return_value = self.execute_callable()
177 if self.show_return_value_in_logs:
178 self.log.info("Done. Returned value was: %s", return_value)
179 else:
180 self.log.info("Done. Returned value not shown")
182 return return_value
184 def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]:
185 return KeywordParameters.determine(self.python_callable, self.op_args, context).unpacking()
187 def execute_callable(self) -> Any:
188 """
189 Calls the python callable with the given arguments.
191 :return: the return value of the call.
192 """
193 return self.python_callable(*self.op_args, **self.op_kwargs)
196class BranchPythonOperator(PythonOperator, SkipMixin):
197 """
198 A workflow can "branch" or follow a path after the execution of this task.
200 It derives the PythonOperator and expects a Python function that returns
201 a single task_id or list of task_ids to follow. The task_id(s) returned
202 should point to a task directly downstream from {self}. All other "branches"
203 or directly downstream tasks are marked with a state of ``skipped`` so that
204 these paths can't move forward. The ``skipped`` states are propagated
205 downstream to allow for the DAG state to fill up and the DAG run's state
206 to be inferred.
207 """
209 def execute(self, context: Context) -> Any:
210 branch = super().execute(context)
211 self.log.info("Branch callable return %s", branch)
212 self.skip_all_except(context["ti"], branch)
213 return branch
216class ShortCircuitOperator(PythonOperator, SkipMixin):
217 """
218 Allows a pipeline to continue based on the result of a ``python_callable``.
220 The ShortCircuitOperator is derived from the PythonOperator and evaluates the result of a
221 ``python_callable``. If the returned result is False or a falsy value, the pipeline will be
222 short-circuited. Downstream tasks will be marked with a state of "skipped" based on the short-circuiting
223 mode configured. If the returned result is True or a truthy value, downstream tasks proceed as normal and
224 an ``XCom`` of the returned result is pushed.
226 The short-circuiting can be configured to either respect or ignore the ``trigger_rule`` set for
227 downstream tasks. If ``ignore_downstream_trigger_rules`` is set to True, the default setting, all
228 downstream tasks are skipped without considering the ``trigger_rule`` defined for tasks. However, if this
229 parameter is set to False, the direct downstream tasks are skipped but the specified ``trigger_rule`` for
230 other subsequent downstream tasks are respected. In this mode, the operator assumes the direct downstream
231 tasks were purposely meant to be skipped but perhaps not other subsequent tasks.
233 .. seealso::
234 For more information on how to use this operator, take a look at the guide:
235 :ref:`howto/operator:ShortCircuitOperator`
237 :param ignore_downstream_trigger_rules: If set to True, all downstream tasks from this operator task will
238 be skipped. This is the default behavior. If set to False, the direct, downstream task(s) will be
239 skipped but the ``trigger_rule`` defined for a other downstream tasks will be respected.
240 """
242 def __init__(self, *, ignore_downstream_trigger_rules: bool = True, **kwargs) -> None:
243 super().__init__(**kwargs)
244 self.ignore_downstream_trigger_rules = ignore_downstream_trigger_rules
246 def execute(self, context: Context) -> Any:
247 condition = super().execute(context)
248 self.log.info("Condition result is %s", condition)
250 if condition:
251 self.log.info("Proceeding with downstream tasks...")
252 return condition
254 downstream_tasks = context["task"].get_flat_relatives(upstream=False)
255 self.log.debug("Downstream task IDs %s", downstream_tasks)
257 if downstream_tasks:
258 dag_run = context["dag_run"]
259 execution_date = dag_run.execution_date
261 if self.ignore_downstream_trigger_rules is True:
262 self.log.info("Skipping all downstream tasks...")
263 self.skip(dag_run, execution_date, downstream_tasks)
264 else:
265 self.log.info("Skipping downstream tasks while respecting trigger rules...")
266 # Explicitly setting the state of the direct, downstream task(s) to "skipped" and letting the
267 # Scheduler handle the remaining downstream task(s) appropriately.
268 self.skip(dag_run, execution_date, context["task"].get_direct_relatives(upstream=False))
270 self.log.info("Done.")
273class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta):
274 BASE_SERIALIZABLE_CONTEXT_KEYS = {
275 "ds",
276 "ds_nodash",
277 "expanded_ti_count",
278 "inlets",
279 "next_ds",
280 "next_ds_nodash",
281 "outlets",
282 "prev_ds",
283 "prev_ds_nodash",
284 "run_id",
285 "task_instance_key_str",
286 "test_mode",
287 "tomorrow_ds",
288 "tomorrow_ds_nodash",
289 "ts",
290 "ts_nodash",
291 "ts_nodash_with_tz",
292 "yesterday_ds",
293 "yesterday_ds_nodash",
294 }
295 PENDULUM_SERIALIZABLE_CONTEXT_KEYS = {
296 "data_interval_end",
297 "data_interval_start",
298 "execution_date",
299 "logical_date",
300 "next_execution_date",
301 "prev_data_interval_end_success",
302 "prev_data_interval_start_success",
303 "prev_execution_date",
304 "prev_execution_date_success",
305 "prev_start_date_success",
306 }
307 AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = {
308 "macros",
309 "conf",
310 "dag",
311 "dag_run",
312 "task",
313 "params",
314 "triggering_dataset_events",
315 }
317 def __init__(
318 self,
319 *,
320 python_callable: Callable,
321 use_dill: bool = False,
322 op_args: Collection[Any] | None = None,
323 op_kwargs: Mapping[str, Any] | None = None,
324 string_args: Iterable[str] | None = None,
325 templates_dict: dict | None = None,
326 templates_exts: list[str] | None = None,
327 expect_airflow: bool = True,
328 **kwargs,
329 ):
330 if (
331 not isinstance(python_callable, types.FunctionType)
332 or isinstance(python_callable, types.LambdaType)
333 and python_callable.__name__ == "<lambda>"
334 ):
335 raise AirflowException("PythonVirtualenvOperator only supports functions for python_callable arg")
336 super().__init__(
337 python_callable=python_callable,
338 op_args=op_args,
339 op_kwargs=op_kwargs,
340 templates_dict=templates_dict,
341 templates_exts=templates_exts,
342 **kwargs,
343 )
344 self.string_args = string_args or []
345 self.use_dill = use_dill
346 self.pickling_library = dill if self.use_dill else pickle
347 self.expect_airflow = expect_airflow
349 @abstractmethod
350 def _iter_serializable_context_keys(self):
351 pass
353 def execute(self, context: Context) -> Any:
354 serializable_keys = set(self._iter_serializable_context_keys())
355 serializable_context = context_copy_partial(context, serializable_keys)
356 return super().execute(context=serializable_context)
358 def get_python_source(self):
359 """Return the source of self.python_callable."""
360 return dedent(inspect.getsource(self.python_callable))
362 def _write_args(self, file: Path):
363 if self.op_args or self.op_kwargs:
364 file.write_bytes(self.pickling_library.dumps({"args": self.op_args, "kwargs": self.op_kwargs}))
366 def _write_string_args(self, file: Path):
367 file.write_text("\n".join(map(str, self.string_args)))
369 def _read_result(self, path: Path):
370 if path.stat().st_size == 0:
371 return None
372 try:
373 return self.pickling_library.loads(path.read_bytes())
374 except ValueError:
375 self.log.error(
376 "Error deserializing result. Note that result deserialization "
377 "is not supported across major Python versions."
378 )
379 raise
381 def __deepcopy__(self, memo):
382 # module objects can't be copied _at all__
383 memo[id(self.pickling_library)] = self.pickling_library
384 return super().__deepcopy__(memo)
386 def _execute_python_callable_in_subprocess(self, python_path: Path, tmp_dir: Path):
387 op_kwargs: dict[str, Any] = {k: v for k, v in self.op_kwargs.items()}
388 if self.templates_dict:
389 op_kwargs["templates_dict"] = self.templates_dict
390 input_path = tmp_dir / "script.in"
391 output_path = tmp_dir / "script.out"
392 string_args_path = tmp_dir / "string_args.txt"
393 script_path = tmp_dir / "script.py"
394 self._write_args(input_path)
395 self._write_string_args(string_args_path)
396 write_python_script(
397 jinja_context=dict(
398 op_args=self.op_args,
399 op_kwargs=op_kwargs,
400 expect_airflow=self.expect_airflow,
401 pickling_library=self.pickling_library.__name__,
402 python_callable=self.python_callable.__name__,
403 python_callable_source=self.get_python_source(),
404 ),
405 filename=os.fspath(script_path),
406 render_template_as_native_obj=self.dag.render_template_as_native_obj,
407 )
409 execute_in_subprocess(
410 cmd=[
411 os.fspath(python_path),
412 os.fspath(script_path),
413 os.fspath(input_path),
414 os.fspath(output_path),
415 os.fspath(string_args_path),
416 ]
417 )
418 return self._read_result(output_path)
420 def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]:
421 return KeywordParameters.determine(self.python_callable, self.op_args, context).serializing()
424class PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
425 """
426 Run a function in a virtualenv that is created and destroyed automatically.
428 The function (has certain caveats) must be defined using def, and not be
429 part of a class. All imports must happen inside the function
430 and no variables outside the scope may be referenced. A global scope
431 variable named virtualenv_string_args will be available (populated by
432 string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
433 can use a return value.
434 Note that if your virtualenv runs in a different Python major version than Airflow,
435 you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
436 Airflow through plugins. You can use string_args though.
438 .. seealso::
439 For more information on how to use this operator, take a look at the guide:
440 :ref:`howto/operator:PythonVirtualenvOperator`
442 :param python_callable: A python function with no references to outside variables,
443 defined with def, which will be run in a virtualenv
444 :param requirements: Either a list of requirement strings, or a (templated)
445 "requirements file" as specified by pip.
446 :param python_version: The Python version to run the virtualenv with. Note that
447 both 2 and 2.7 are acceptable forms.
448 :param use_dill: Whether to use dill to serialize
449 the args and result (pickle is default). This allow more complex types
450 but requires you to include dill in your requirements.
451 :param system_site_packages: Whether to include
452 system_site_packages in your virtualenv.
453 See virtualenv documentation for more information.
454 :param pip_install_options: a list of pip install options when installing requirements
455 See 'pip install -h' for available options
456 :param op_args: A list of positional arguments to pass to python_callable.
457 :param op_kwargs: A dict of keyword arguments to pass to python_callable.
458 :param string_args: Strings that are present in the global var virtualenv_string_args,
459 available to python_callable at runtime as a list[str]. Note that args are split
460 by newline.
461 :param templates_dict: a dictionary where the values are templates that
462 will get templated by the Airflow engine sometime between
463 ``__init__`` and ``execute`` takes place and are made available
464 in your callable's context after the template has been applied
465 :param templates_exts: a list of file extensions to resolve while
466 processing templated fields, for examples ``['.sql', '.hql']``
467 :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator
468 will raise warning if Airflow is not installed, and it will attempt to load Airflow
469 macros when starting.
470 """
472 template_fields: Sequence[str] = tuple({"requirements"} | set(PythonOperator.template_fields))
473 template_ext: Sequence[str] = (".txt",)
475 def __init__(
476 self,
477 *,
478 python_callable: Callable,
479 requirements: None | Iterable[str] | str = None,
480 python_version: str | int | float | None = None,
481 use_dill: bool = False,
482 system_site_packages: bool = True,
483 pip_install_options: list[str] | None = None,
484 op_args: Collection[Any] | None = None,
485 op_kwargs: Mapping[str, Any] | None = None,
486 string_args: Iterable[str] | None = None,
487 templates_dict: dict | None = None,
488 templates_exts: list[str] | None = None,
489 expect_airflow: bool = True,
490 **kwargs,
491 ):
492 if (
493 python_version
494 and str(python_version)[0] != str(sys.version_info.major)
495 and (op_args or op_kwargs)
496 ):
497 raise AirflowException(
498 "Passing op_args or op_kwargs is not supported across different Python "
499 "major versions for PythonVirtualenvOperator. Please use string_args."
500 f"Sys version: {sys.version_info}. Venv version: {python_version}"
501 )
502 if not shutil.which("virtualenv"):
503 raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.")
504 if not requirements:
505 self.requirements: list[str] | str = []
506 elif isinstance(requirements, str):
507 self.requirements = requirements
508 else:
509 self.requirements = list(requirements)
510 self.python_version = python_version
511 self.system_site_packages = system_site_packages
512 self.pip_install_options = pip_install_options
513 super().__init__(
514 python_callable=python_callable,
515 use_dill=use_dill,
516 op_args=op_args,
517 op_kwargs=op_kwargs,
518 string_args=string_args,
519 templates_dict=templates_dict,
520 templates_exts=templates_exts,
521 expect_airflow=expect_airflow,
522 **kwargs,
523 )
525 def execute_callable(self):
526 with TemporaryDirectory(prefix="venv") as tmp_dir:
527 tmp_path = Path(tmp_dir)
528 requirements_file_name = f"{tmp_dir}/requirements.txt"
530 if not isinstance(self.requirements, str):
531 requirements_file_contents = "\n".join(str(dependency) for dependency in self.requirements)
532 else:
533 requirements_file_contents = self.requirements
535 if not self.system_site_packages and self.use_dill:
536 requirements_file_contents += "\ndill"
538 with open(requirements_file_name, "w") as file:
539 file.write(requirements_file_contents)
540 prepare_virtualenv(
541 venv_directory=tmp_dir,
542 python_bin=f"python{self.python_version}" if self.python_version else None,
543 system_site_packages=self.system_site_packages,
544 requirements_file_path=requirements_file_name,
545 pip_install_options=self.pip_install_options,
546 )
547 python_path = tmp_path / "bin" / "python"
549 return self._execute_python_callable_in_subprocess(python_path, tmp_path)
551 def _iter_serializable_context_keys(self):
552 yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS
553 if self.system_site_packages or "apache-airflow" in self.requirements:
554 yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS
555 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS
556 elif "pendulum" in self.requirements:
557 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS
560class ExternalPythonOperator(_BasePythonVirtualenvOperator):
561 """
562 Run a function in a virtualenv that is not re-created.
564 Reused as is without the overhead of creating the virtualenv (with certain caveats).
566 The function must be defined using def, and not be
567 part of a class. All imports must happen inside the function
568 and no variables outside the scope may be referenced. A global scope
569 variable named virtualenv_string_args will be available (populated by
570 string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
571 can use a return value.
572 Note that if your virtualenv runs in a different Python major version than Airflow,
573 you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
574 Airflow through plugins. You can use string_args though.
576 If Airflow is installed in the external environment in different version that the version
577 used by the operator, the operator will fail.,
579 .. seealso::
580 For more information on how to use this operator, take a look at the guide:
581 :ref:`howto/operator:ExternalPythonOperator`
583 :param python: Full path string (file-system specific) that points to a Python binary inside
584 a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path
585 (so usually start with "/" or "X:/" depending on the filesystem/os used).
586 :param python_callable: A python function with no references to outside variables,
587 defined with def, which will be run in a virtualenv
588 :param use_dill: Whether to use dill to serialize
589 the args and result (pickle is default). This allow more complex types
590 but if dill is not preinstalled in your venv, the task will fail with use_dill enabled.
591 :param op_args: A list of positional arguments to pass to python_callable.
592 :param op_kwargs: A dict of keyword arguments to pass to python_callable.
593 :param string_args: Strings that are present in the global var virtualenv_string_args,
594 available to python_callable at runtime as a list[str]. Note that args are split
595 by newline.
596 :param templates_dict: a dictionary where the values are templates that
597 will get templated by the Airflow engine sometime between
598 ``__init__`` and ``execute`` takes place and are made available
599 in your callable's context after the template has been applied
600 :param templates_exts: a list of file extensions to resolve while
601 processing templated fields, for examples ``['.sql', '.hql']``
602 :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator
603 will raise warning if Airflow is not installed, and it will attempt to load Airflow
604 macros when starting.
605 """
607 template_fields: Sequence[str] = tuple({"python"} | set(PythonOperator.template_fields))
609 def __init__(
610 self,
611 *,
612 python: str,
613 python_callable: Callable,
614 use_dill: bool = False,
615 op_args: Collection[Any] | None = None,
616 op_kwargs: Mapping[str, Any] | None = None,
617 string_args: Iterable[str] | None = None,
618 templates_dict: dict | None = None,
619 templates_exts: list[str] | None = None,
620 expect_airflow: bool = True,
621 expect_pendulum: bool = False,
622 **kwargs,
623 ):
624 if not python:
625 raise ValueError("Python Path must be defined in ExternalPythonOperator")
626 self.python = python
627 self.expect_pendulum = expect_pendulum
628 super().__init__(
629 python_callable=python_callable,
630 use_dill=use_dill,
631 op_args=op_args,
632 op_kwargs=op_kwargs,
633 string_args=string_args,
634 templates_dict=templates_dict,
635 templates_exts=templates_exts,
636 expect_airflow=expect_airflow,
637 **kwargs,
638 )
640 def execute_callable(self):
641 python_path = Path(self.python)
642 if not python_path.exists():
643 raise ValueError(f"Python Path '{python_path}' must exists")
644 if not python_path.is_file():
645 raise ValueError(f"Python Path '{python_path}' must be a file")
646 if not python_path.is_absolute():
647 raise ValueError(f"Python Path '{python_path}' must be an absolute path.")
648 python_version_as_list_of_strings = self._get_python_version_from_environment()
649 if (
650 python_version_as_list_of_strings
651 and str(python_version_as_list_of_strings[0]) != str(sys.version_info.major)
652 and (self.op_args or self.op_kwargs)
653 ):
654 raise AirflowException(
655 "Passing op_args or op_kwargs is not supported across different Python "
656 "major versions for ExternalPythonOperator. Please use string_args."
657 f"Sys version: {sys.version_info}. Venv version: {python_version_as_list_of_strings}"
658 )
659 with TemporaryDirectory(prefix="tmd") as tmp_dir:
660 tmp_path = Path(tmp_dir)
661 return self._execute_python_callable_in_subprocess(python_path, tmp_path)
663 def _get_python_version_from_environment(self) -> list[str]:
664 try:
665 result = subprocess.check_output([self.python, "--version"], text=True)
666 return result.strip().split(" ")[-1].split(".")
667 except Exception as e:
668 raise ValueError(f"Error while executing {self.python}: {e}")
670 def _iter_serializable_context_keys(self):
671 yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS
672 if self._get_airflow_version_from_target_env():
673 yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS
674 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS
675 elif self._is_pendulum_installed_in_target_env():
676 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS
678 def _is_pendulum_installed_in_target_env(self) -> bool:
679 try:
680 subprocess.check_call([self.python, "-c", "import pendulum"])
681 return True
682 except Exception as e:
683 if self.expect_pendulum:
684 self.log.warning("When checking for Pendulum installed in venv got %s", e)
685 self.log.warning(
686 "Pendulum is not properly installed in the virtualenv "
687 "Pendulum context keys will not be available. "
688 "Please Install Pendulum or Airflow in your venv to access them."
689 )
690 return False
692 def _get_airflow_version_from_target_env(self) -> str | None:
693 try:
694 result = subprocess.check_output(
695 [self.python, "-c", "from airflow import version; print(version.version)"], text=True
696 )
697 target_airflow_version = result.strip()
698 if target_airflow_version != airflow_version:
699 raise AirflowConfigException(
700 f"The version of Airflow installed for the {self.python}("
701 f"{target_airflow_version}) is different than the runtime Airflow version: "
702 f"{airflow_version}. Make sure your environment has the same Airflow version "
703 f"installed as the Airflow runtime."
704 )
705 return target_airflow_version
706 except Exception as e:
707 if self.expect_airflow:
708 self.log.warning("When checking for Airflow installed in venv got %s", e)
709 self.log.warning(
710 f"This means that Airflow is not properly installed by "
711 f"{self.python}. Airflow context keys will not be available. "
712 f"Please Install Airflow {airflow_version} in your environment to access them."
713 )
714 return None
717def get_current_context() -> Context:
718 """
719 Retrieve the execution context dictionary without altering user method's signature.
720 This is the simplest method of retrieving the execution context dictionary.
722 **Old style:**
724 .. code:: python
726 def my_task(**context):
727 ti = context["ti"]
729 **New style:**
731 .. code:: python
733 from airflow.operators.python import get_current_context
736 def my_task():
737 context = get_current_context()
738 ti = context["ti"]
740 Current context will only have value if this method was called after an operator
741 was starting to execute.
742 """
743 if not _CURRENT_CONTEXT:
744 raise AirflowException(
745 "Current context was requested but no context was found! "
746 "Are you running within an airflow task?"
747 )
748 return _CURRENT_CONTEXT[-1]