1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""
19Base operator for all operators.
20
21:sphinx-autoapi-skip:
22"""
23
24from __future__ import annotations
25
26import abc
27import collections.abc
28import contextlib
29import copy
30import functools
31import inspect
32import logging
33import sys
34import warnings
35from datetime import datetime, timedelta
36from functools import total_ordering, wraps
37from types import FunctionType
38from typing import (
39 TYPE_CHECKING,
40 Any,
41 Callable,
42 Collection,
43 Iterable,
44 Sequence,
45 TypeVar,
46 Union,
47 cast,
48)
49
50import attr
51import pendulum
52from dateutil.relativedelta import relativedelta
53from sqlalchemy import select
54from sqlalchemy.orm.exc import NoResultFound
55
56from airflow.configuration import conf
57from airflow.exceptions import (
58 AirflowException,
59 FailStopDagInvalidTriggerRule,
60 RemovedInAirflow3Warning,
61 TaskDeferralError,
62 TaskDeferred,
63)
64from airflow.lineage import apply_lineage, prepare_lineage
65from airflow.models.abstractoperator import (
66 DEFAULT_EXECUTOR,
67 DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
68 DEFAULT_OWNER,
69 DEFAULT_POOL_SLOTS,
70 DEFAULT_PRIORITY_WEIGHT,
71 DEFAULT_QUEUE,
72 DEFAULT_RETRIES,
73 DEFAULT_RETRY_DELAY,
74 DEFAULT_TASK_EXECUTION_TIMEOUT,
75 DEFAULT_TRIGGER_RULE,
76 DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
77 DEFAULT_WEIGHT_RULE,
78 AbstractOperator,
79)
80from airflow.models.base import _sentinel
81from airflow.models.mappedoperator import OperatorPartial, validate_mapping_kwargs
82from airflow.models.param import ParamsDict
83from airflow.models.pool import Pool
84from airflow.models.taskinstance import TaskInstance, clear_task_instances
85from airflow.models.taskmixin import DependencyMixin
86from airflow.serialization.enums import DagAttributeTypes
87from airflow.task.priority_strategy import PriorityWeightStrategy, validate_and_load_priority_weight_strategy
88from airflow.ti_deps.deps.mapped_task_upstream_dep import MappedTaskUpstreamDep
89from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
90from airflow.ti_deps.deps.not_previously_skipped_dep import NotPreviouslySkippedDep
91from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
92from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
93from airflow.utils import timezone
94from airflow.utils.context import Context, context_get_outlet_events
95from airflow.utils.decorators import fixup_decorator_warning_stack
96from airflow.utils.edgemodifier import EdgeModifier
97from airflow.utils.helpers import validate_key
98from airflow.utils.operator_helpers import ExecutionCallableRunner
99from airflow.utils.operator_resources import Resources
100from airflow.utils.session import NEW_SESSION, provide_session
101from airflow.utils.setup_teardown import SetupTeardownContext
102from airflow.utils.trigger_rule import TriggerRule
103from airflow.utils.types import NOTSET
104from airflow.utils.xcom import XCOM_RETURN_KEY
105
106if TYPE_CHECKING:
107 from types import ClassMethodDescriptorType
108
109 import jinja2 # Slow import.
110 from sqlalchemy.orm import Session
111
112 from airflow.models.abstractoperator import TaskStateChangeCallback
113 from airflow.models.baseoperatorlink import BaseOperatorLink
114 from airflow.models.dag import DAG
115 from airflow.models.operator import Operator
116 from airflow.models.xcom_arg import XComArg
117 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
118 from airflow.triggers.base import BaseTrigger
119 from airflow.utils.task_group import TaskGroup
120 from airflow.utils.types import ArgNotSet
121
122ScheduleInterval = Union[str, timedelta, relativedelta]
123
124TaskPreExecuteHook = Callable[[Context], None]
125TaskPostExecuteHook = Callable[[Context, Any], None]
126
127T = TypeVar("T", bound=FunctionType)
128
129logger = logging.getLogger("airflow.models.baseoperator.BaseOperator")
130
131
132def parse_retries(retries: Any) -> int | None:
133 if retries is None or type(retries) == int: # noqa: E721
134 return retries
135 try:
136 parsed_retries = int(retries)
137 except (TypeError, ValueError):
138 raise AirflowException(f"'retries' type must be int, not {type(retries).__name__}")
139 logger.warning("Implicitly converting 'retries' from %r to int", retries)
140 return parsed_retries
141
142
143def coerce_timedelta(value: float | timedelta, *, key: str) -> timedelta:
144 if isinstance(value, timedelta):
145 return value
146 logger.debug("%s isn't a timedelta object, assuming secs", key)
147 return timedelta(seconds=value)
148
149
150def coerce_resources(resources: dict[str, Any] | None) -> Resources | None:
151 if resources is None:
152 return None
153 return Resources(**resources)
154
155
156def _get_parent_defaults(dag: DAG | None, task_group: TaskGroup | None) -> tuple[dict, ParamsDict]:
157 if not dag:
158 return {}, ParamsDict()
159 dag_args = copy.copy(dag.default_args)
160 dag_params = copy.deepcopy(dag.params)
161 if task_group:
162 if task_group.default_args and not isinstance(task_group.default_args, collections.abc.Mapping):
163 raise TypeError("default_args must be a mapping")
164 dag_args.update(task_group.default_args)
165 return dag_args, dag_params
166
167
168def get_merged_defaults(
169 dag: DAG | None,
170 task_group: TaskGroup | None,
171 task_params: collections.abc.MutableMapping | None,
172 task_default_args: dict | None,
173) -> tuple[dict, ParamsDict]:
174 args, params = _get_parent_defaults(dag, task_group)
175 if task_params:
176 if not isinstance(task_params, collections.abc.Mapping):
177 raise TypeError("params must be a mapping")
178 params.update(task_params)
179 if task_default_args:
180 if not isinstance(task_default_args, collections.abc.Mapping):
181 raise TypeError("default_args must be a mapping")
182 args.update(task_default_args)
183 with contextlib.suppress(KeyError):
184 params.update(task_default_args["params"] or {})
185 return args, params
186
187
188class _PartialDescriptor:
189 """A descriptor that guards against ``.partial`` being called on Task objects."""
190
191 class_method: ClassMethodDescriptorType | None = None
192
193 def __get__(
194 self, obj: BaseOperator, cls: type[BaseOperator] | None = None
195 ) -> Callable[..., OperatorPartial]:
196 # Call this "partial" so it looks nicer in stack traces.
197 def partial(**kwargs):
198 raise TypeError("partial can only be called on Operator classes, not Tasks themselves")
199
200 if obj is not None:
201 return partial
202 return self.class_method.__get__(cls, cls)
203
204
205_PARTIAL_DEFAULTS: dict[str, Any] = {
206 "map_index_template": None,
207 "owner": DEFAULT_OWNER,
208 "trigger_rule": DEFAULT_TRIGGER_RULE,
209 "depends_on_past": False,
210 "ignore_first_depends_on_past": DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
211 "wait_for_past_depends_before_skipping": DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
212 "wait_for_downstream": False,
213 "retries": DEFAULT_RETRIES,
214 "executor": DEFAULT_EXECUTOR,
215 "queue": DEFAULT_QUEUE,
216 "pool_slots": DEFAULT_POOL_SLOTS,
217 "execution_timeout": DEFAULT_TASK_EXECUTION_TIMEOUT,
218 "retry_delay": DEFAULT_RETRY_DELAY,
219 "retry_exponential_backoff": False,
220 "priority_weight": DEFAULT_PRIORITY_WEIGHT,
221 "weight_rule": DEFAULT_WEIGHT_RULE,
222 "inlets": [],
223 "outlets": [],
224 "allow_nested_operators": True,
225}
226
227
228# This is what handles the actual mapping.
229def partial(
230 operator_class: type[BaseOperator],
231 *,
232 task_id: str,
233 dag: DAG | None = None,
234 task_group: TaskGroup | None = None,
235 start_date: datetime | ArgNotSet = NOTSET,
236 end_date: datetime | ArgNotSet = NOTSET,
237 owner: str | ArgNotSet = NOTSET,
238 email: None | str | Iterable[str] | ArgNotSet = NOTSET,
239 params: collections.abc.MutableMapping | None = None,
240 resources: dict[str, Any] | None | ArgNotSet = NOTSET,
241 trigger_rule: str | ArgNotSet = NOTSET,
242 depends_on_past: bool | ArgNotSet = NOTSET,
243 ignore_first_depends_on_past: bool | ArgNotSet = NOTSET,
244 wait_for_past_depends_before_skipping: bool | ArgNotSet = NOTSET,
245 wait_for_downstream: bool | ArgNotSet = NOTSET,
246 retries: int | None | ArgNotSet = NOTSET,
247 queue: str | ArgNotSet = NOTSET,
248 pool: str | ArgNotSet = NOTSET,
249 pool_slots: int | ArgNotSet = NOTSET,
250 execution_timeout: timedelta | None | ArgNotSet = NOTSET,
251 max_retry_delay: None | timedelta | float | ArgNotSet = NOTSET,
252 retry_delay: timedelta | float | ArgNotSet = NOTSET,
253 retry_exponential_backoff: bool | ArgNotSet = NOTSET,
254 priority_weight: int | ArgNotSet = NOTSET,
255 weight_rule: str | PriorityWeightStrategy | ArgNotSet = NOTSET,
256 sla: timedelta | None | ArgNotSet = NOTSET,
257 map_index_template: str | None | ArgNotSet = NOTSET,
258 max_active_tis_per_dag: int | None | ArgNotSet = NOTSET,
259 max_active_tis_per_dagrun: int | None | ArgNotSet = NOTSET,
260 on_execute_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET,
261 on_failure_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET,
262 on_success_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET,
263 on_retry_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET,
264 on_skipped_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET,
265 run_as_user: str | None | ArgNotSet = NOTSET,
266 executor: str | None | ArgNotSet = NOTSET,
267 executor_config: dict | None | ArgNotSet = NOTSET,
268 inlets: Any | None | ArgNotSet = NOTSET,
269 outlets: Any | None | ArgNotSet = NOTSET,
270 doc: str | None | ArgNotSet = NOTSET,
271 doc_md: str | None | ArgNotSet = NOTSET,
272 doc_json: str | None | ArgNotSet = NOTSET,
273 doc_yaml: str | None | ArgNotSet = NOTSET,
274 doc_rst: str | None | ArgNotSet = NOTSET,
275 task_display_name: str | None | ArgNotSet = NOTSET,
276 logger_name: str | None | ArgNotSet = NOTSET,
277 allow_nested_operators: bool = True,
278 **kwargs,
279) -> OperatorPartial:
280 from airflow.models.dag import DagContext
281 from airflow.utils.task_group import TaskGroupContext
282
283 validate_mapping_kwargs(operator_class, "partial", kwargs)
284
285 dag = dag or DagContext.get_current_dag()
286 if dag:
287 task_group = task_group or TaskGroupContext.get_current_task_group(dag)
288 if task_group:
289 task_id = task_group.child_id(task_id)
290
291 # Merge DAG and task group level defaults into user-supplied values.
292 dag_default_args, partial_params = get_merged_defaults(
293 dag=dag,
294 task_group=task_group,
295 task_params=params,
296 task_default_args=kwargs.pop("default_args", None),
297 )
298
299 # Create partial_kwargs from args and kwargs
300 partial_kwargs: dict[str, Any] = {
301 **kwargs,
302 "dag": dag,
303 "task_group": task_group,
304 "task_id": task_id,
305 "map_index_template": map_index_template,
306 "start_date": start_date,
307 "end_date": end_date,
308 "owner": owner,
309 "email": email,
310 "trigger_rule": trigger_rule,
311 "depends_on_past": depends_on_past,
312 "ignore_first_depends_on_past": ignore_first_depends_on_past,
313 "wait_for_past_depends_before_skipping": wait_for_past_depends_before_skipping,
314 "wait_for_downstream": wait_for_downstream,
315 "retries": retries,
316 "queue": queue,
317 "pool": pool,
318 "pool_slots": pool_slots,
319 "execution_timeout": execution_timeout,
320 "max_retry_delay": max_retry_delay,
321 "retry_delay": retry_delay,
322 "retry_exponential_backoff": retry_exponential_backoff,
323 "priority_weight": priority_weight,
324 "weight_rule": weight_rule,
325 "sla": sla,
326 "max_active_tis_per_dag": max_active_tis_per_dag,
327 "max_active_tis_per_dagrun": max_active_tis_per_dagrun,
328 "on_execute_callback": on_execute_callback,
329 "on_failure_callback": on_failure_callback,
330 "on_retry_callback": on_retry_callback,
331 "on_success_callback": on_success_callback,
332 "on_skipped_callback": on_skipped_callback,
333 "run_as_user": run_as_user,
334 "executor": executor,
335 "executor_config": executor_config,
336 "inlets": inlets,
337 "outlets": outlets,
338 "resources": resources,
339 "doc": doc,
340 "doc_json": doc_json,
341 "doc_md": doc_md,
342 "doc_rst": doc_rst,
343 "doc_yaml": doc_yaml,
344 "task_display_name": task_display_name,
345 "logger_name": logger_name,
346 "allow_nested_operators": allow_nested_operators,
347 }
348
349 # Inject DAG-level default args into args provided to this function.
350 partial_kwargs.update((k, v) for k, v in dag_default_args.items() if partial_kwargs.get(k) is NOTSET)
351
352 # Fill fields not provided by the user with default values.
353 partial_kwargs = {k: _PARTIAL_DEFAULTS.get(k) if v is NOTSET else v for k, v in partial_kwargs.items()}
354
355 # Post-process arguments. Should be kept in sync with _TaskDecorator.expand().
356 if "task_concurrency" in kwargs: # Reject deprecated option.
357 raise TypeError("unexpected argument: task_concurrency")
358 if partial_kwargs["wait_for_downstream"]:
359 partial_kwargs["depends_on_past"] = True
360 partial_kwargs["start_date"] = timezone.convert_to_utc(partial_kwargs["start_date"])
361 partial_kwargs["end_date"] = timezone.convert_to_utc(partial_kwargs["end_date"])
362 if partial_kwargs["pool"] is None:
363 partial_kwargs["pool"] = Pool.DEFAULT_POOL_NAME
364 partial_kwargs["retries"] = parse_retries(partial_kwargs["retries"])
365 partial_kwargs["retry_delay"] = coerce_timedelta(partial_kwargs["retry_delay"], key="retry_delay")
366 if partial_kwargs["max_retry_delay"] is not None:
367 partial_kwargs["max_retry_delay"] = coerce_timedelta(
368 partial_kwargs["max_retry_delay"],
369 key="max_retry_delay",
370 )
371 partial_kwargs["executor_config"] = partial_kwargs["executor_config"] or {}
372 partial_kwargs["resources"] = coerce_resources(partial_kwargs["resources"])
373
374 return OperatorPartial(
375 operator_class=operator_class,
376 kwargs=partial_kwargs,
377 params=partial_params,
378 )
379
380
381class ExecutorSafeguard:
382 """
383 The ExecutorSafeguard decorator.
384
385 Checks if the execute method of an operator isn't manually called outside
386 the TaskInstance as we want to avoid bad mixing between decorated and
387 classic operators.
388 """
389
390 test_mode = conf.getboolean("core", "unit_test_mode")
391
392 @classmethod
393 def decorator(cls, func):
394 @wraps(func)
395 def wrapper(self, *args, **kwargs):
396 from airflow.decorators.base import DecoratedOperator
397
398 sentinel = kwargs.pop(f"{self.__class__.__name__}__sentinel", None)
399
400 if not cls.test_mode and not sentinel == _sentinel and not isinstance(self, DecoratedOperator):
401 message = f"{self.__class__.__name__}.{func.__name__} cannot be called outside TaskInstance!"
402 if not self.allow_nested_operators:
403 raise AirflowException(message)
404 self.log.warning(message)
405 return func(self, *args, **kwargs)
406
407 return wrapper
408
409
410class BaseOperatorMeta(abc.ABCMeta):
411 """Metaclass of BaseOperator."""
412
413 @classmethod
414 def _apply_defaults(cls, func: T) -> T:
415 """
416 Look for an argument named "default_args", and fill the unspecified arguments from it.
417
418 Since python2.* isn't clear about which arguments are missing when
419 calling a function, and that this can be quite confusing with multi-level
420 inheritance and argument defaults, this decorator also alerts with
421 specific information about the missing arguments.
422 """
423 # Cache inspect.signature for the wrapper closure to avoid calling it
424 # at every decorated invocation. This is separate sig_cache created
425 # per decoration, i.e. each function decorated using apply_defaults will
426 # have a different sig_cache.
427 sig_cache = inspect.signature(func)
428 non_variadic_params = {
429 name: param
430 for (name, param) in sig_cache.parameters.items()
431 if param.name != "self" and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD)
432 }
433 non_optional_args = {
434 name
435 for name, param in non_variadic_params.items()
436 if param.default == param.empty and name != "task_id"
437 }
438
439 fixup_decorator_warning_stack(func)
440
441 @wraps(func)
442 def apply_defaults(self: BaseOperator, *args: Any, **kwargs: Any) -> Any:
443 from airflow.models.dag import DagContext
444 from airflow.utils.task_group import TaskGroupContext
445
446 if args:
447 raise AirflowException("Use keyword arguments when initializing operators")
448
449 instantiated_from_mapped = kwargs.pop(
450 "_airflow_from_mapped",
451 getattr(self, "_BaseOperator__from_mapped", False),
452 )
453
454 dag: DAG | None = kwargs.get("dag") or DagContext.get_current_dag()
455 task_group: TaskGroup | None = kwargs.get("task_group")
456 if dag and not task_group:
457 task_group = TaskGroupContext.get_current_task_group(dag)
458
459 default_args, merged_params = get_merged_defaults(
460 dag=dag,
461 task_group=task_group,
462 task_params=kwargs.pop("params", None),
463 task_default_args=kwargs.pop("default_args", None),
464 )
465
466 for arg in sig_cache.parameters:
467 if arg not in kwargs and arg in default_args:
468 kwargs[arg] = default_args[arg]
469
470 missing_args = non_optional_args.difference(kwargs)
471 if len(missing_args) == 1:
472 raise AirflowException(f"missing keyword argument {missing_args.pop()!r}")
473 elif missing_args:
474 display = ", ".join(repr(a) for a in sorted(missing_args))
475 raise AirflowException(f"missing keyword arguments {display}")
476
477 if merged_params:
478 kwargs["params"] = merged_params
479
480 hook = getattr(self, "_hook_apply_defaults", None)
481 if hook:
482 args, kwargs = hook(**kwargs, default_args=default_args)
483 default_args = kwargs.pop("default_args", {})
484
485 if not hasattr(self, "_BaseOperator__init_kwargs"):
486 self._BaseOperator__init_kwargs = {}
487 self._BaseOperator__from_mapped = instantiated_from_mapped
488
489 result = func(self, **kwargs, default_args=default_args)
490
491 # Store the args passed to init -- we need them to support task.map serialization!
492 self._BaseOperator__init_kwargs.update(kwargs) # type: ignore
493
494 # Set upstream task defined by XComArgs passed to template fields of the operator.
495 # BUT: only do this _ONCE_, not once for each class in the hierarchy
496 if not instantiated_from_mapped and func == self.__init__.__wrapped__: # type: ignore[misc]
497 self.set_xcomargs_dependencies()
498 # Mark instance as instantiated.
499 self._BaseOperator__instantiated = True
500
501 return result
502
503 apply_defaults.__non_optional_args = non_optional_args # type: ignore
504 apply_defaults.__param_names = set(non_variadic_params) # type: ignore
505
506 return cast(T, apply_defaults)
507
508 def __new__(cls, name, bases, namespace, **kwargs):
509 execute_method = namespace.get("execute")
510 if callable(execute_method) and not getattr(execute_method, "__isabstractmethod__", False):
511 namespace["execute"] = ExecutorSafeguard().decorator(execute_method)
512 new_cls = super().__new__(cls, name, bases, namespace, **kwargs)
513 with contextlib.suppress(KeyError):
514 # Update the partial descriptor with the class method, so it calls the actual function
515 # (but let subclasses override it if they need to)
516 partial_desc = vars(new_cls)["partial"]
517 if isinstance(partial_desc, _PartialDescriptor):
518 partial_desc.class_method = classmethod(partial)
519 new_cls.__init__ = cls._apply_defaults(new_cls.__init__)
520 return new_cls
521
522
523@total_ordering
524class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
525 r"""
526 Abstract base class for all operators.
527
528 Since operators create objects that become nodes in the DAG, BaseOperator
529 contains many recursive methods for DAG crawling behavior. To derive from
530 this class, you are expected to override the constructor and the 'execute'
531 method.
532
533 Operators derived from this class should perform or trigger certain tasks
534 synchronously (wait for completion). Example of operators could be an
535 operator that runs a Pig job (PigOperator), a sensor operator that
536 waits for a partition to land in Hive (HiveSensorOperator), or one that
537 moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these
538 operators (tasks) target specific operations, running specific scripts,
539 functions or data transfers.
540
541 This class is abstract and shouldn't be instantiated. Instantiating a
542 class derived from this one results in the creation of a task object,
543 which ultimately becomes a node in DAG objects. Task dependencies should
544 be set by using the set_upstream and/or set_downstream methods.
545
546 :param task_id: a unique, meaningful id for the task
547 :param owner: the owner of the task. Using a meaningful description
548 (e.g. user/person/team/role name) to clarify ownership is recommended.
549 :param email: the 'to' email address(es) used in email alerts. This can be a
550 single email or multiple ones. Multiple addresses can be specified as a
551 comma or semicolon separated string or by passing a list of strings.
552 :param email_on_retry: Indicates whether email alerts should be sent when a
553 task is retried
554 :param email_on_failure: Indicates whether email alerts should be sent when
555 a task failed
556 :param retries: the number of retries that should be performed before
557 failing the task
558 :param retry_delay: delay between retries, can be set as ``timedelta`` or
559 ``float`` seconds, which will be converted into ``timedelta``,
560 the default is ``timedelta(seconds=300)``.
561 :param retry_exponential_backoff: allow progressively longer waits between
562 retries by using exponential backoff algorithm on retry delay (delay
563 will be converted into seconds)
564 :param max_retry_delay: maximum delay interval between retries, can be set as
565 ``timedelta`` or ``float`` seconds, which will be converted into ``timedelta``.
566 :param start_date: The ``start_date`` for the task, determines
567 the ``execution_date`` for the first task instance. The best practice
568 is to have the start_date rounded
569 to your DAG's ``schedule_interval``. Daily jobs have their start_date
570 some day at 00:00:00, hourly jobs have their start_date at 00:00
571 of a specific hour. Note that Airflow simply looks at the latest
572 ``execution_date`` and adds the ``schedule_interval`` to determine
573 the next ``execution_date``. It is also very important
574 to note that different tasks' dependencies
575 need to line up in time. If task A depends on task B and their
576 start_date are offset in a way that their execution_date don't line
577 up, A's dependencies will never be met. If you are looking to delay
578 a task, for example running a daily task at 2AM, look into the
579 ``TimeSensor`` and ``TimeDeltaSensor``. We advise against using
580 dynamic ``start_date`` and recommend using fixed ones. Read the
581 FAQ entry about start_date for more information.
582 :param end_date: if specified, the scheduler won't go beyond this date
583 :param depends_on_past: when set to true, task instances will run
584 sequentially and only if the previous instance has succeeded or has been skipped.
585 The task instance for the start_date is allowed to run.
586 :param wait_for_past_depends_before_skipping: when set to true, if the task instance
587 should be marked as skipped, and depends_on_past is true, the ti will stay on None state
588 waiting the task of the previous run
589 :param wait_for_downstream: when set to true, an instance of task
590 X will wait for tasks immediately downstream of the previous instance
591 of task X to finish successfully or be skipped before it runs. This is useful if the
592 different instances of a task X alter the same asset, and this asset
593 is used by tasks downstream of task X. Note that depends_on_past
594 is forced to True wherever wait_for_downstream is used. Also note that
595 only tasks *immediately* downstream of the previous task instance are waited
596 for; the statuses of any tasks further downstream are ignored.
597 :param dag: a reference to the dag the task is attached to (if any)
598 :param priority_weight: priority weight of this task against other task.
599 This allows the executor to trigger higher priority tasks before
600 others when things get backed up. Set priority_weight as a higher
601 number for more important tasks.
602 :param weight_rule: weighting method used for the effective total
603 priority weight of the task. Options are:
604 ``{ downstream | upstream | absolute }`` default is ``downstream``
605 When set to ``downstream`` the effective weight of the task is the
606 aggregate sum of all downstream descendants. As a result, upstream
607 tasks will have higher weight and will be scheduled more aggressively
608 when using positive weight values. This is useful when you have
609 multiple dag run instances and desire to have all upstream tasks to
610 complete for all runs before each dag can continue processing
611 downstream tasks. When set to ``upstream`` the effective weight is the
612 aggregate sum of all upstream ancestors. This is the opposite where
613 downstream tasks have higher weight and will be scheduled more
614 aggressively when using positive weight values. This is useful when you
615 have multiple dag run instances and prefer to have each dag complete
616 before starting upstream tasks of other dags. When set to
617 ``absolute``, the effective weight is the exact ``priority_weight``
618 specified without additional weighting. You may want to do this when
619 you know exactly what priority weight each task should have.
620 Additionally, when set to ``absolute``, there is bonus effect of
621 significantly speeding up the task creation process as for very large
622 DAGs. Options can be set as string or using the constants defined in
623 the static class ``airflow.utils.WeightRule``
624 |experimental|
625 Since 2.9.0, Airflow allows to define custom priority weight strategy,
626 by creating a subclass of
627 ``airflow.task.priority_strategy.PriorityWeightStrategy`` and registering
628 in a plugin, then providing the class path or the class instance via
629 ``weight_rule`` parameter. The custom priority weight strategy will be
630 used to calculate the effective total priority weight of the task instance.
631 :param queue: which queue to target when running this job. Not
632 all executors implement queue management, the CeleryExecutor
633 does support targeting specific queues.
634 :param pool: the slot pool this task should run in, slot pools are a
635 way to limit concurrency for certain tasks
636 :param pool_slots: the number of pool slots this task should use (>= 1)
637 Values less than 1 are not allowed.
638 :param sla: time by which the job is expected to succeed. Note that
639 this represents the ``timedelta`` after the period is closed. For
640 example if you set an SLA of 1 hour, the scheduler would send an email
641 soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance
642 has not succeeded yet.
643 The scheduler pays special attention for jobs with an SLA and
644 sends alert
645 emails for SLA misses. SLA misses are also recorded in the database
646 for future reference. All tasks that share the same SLA time
647 get bundled in a single email, sent soon after that time. SLA
648 notification are sent once and only once for each task instance.
649 :param execution_timeout: max time allowed for the execution of
650 this task instance, if it goes beyond it will raise and fail.
651 :param on_failure_callback: a function or list of functions to be called when a task instance
652 of this task fails. a context dictionary is passed as a single
653 parameter to this function. Context contains references to related
654 objects to the task instance and is documented under the macros
655 section of the API.
656 :param on_execute_callback: much like the ``on_failure_callback`` except
657 that it is executed right before the task is executed.
658 :param on_retry_callback: much like the ``on_failure_callback`` except
659 that it is executed when retries occur.
660 :param on_success_callback: much like the ``on_failure_callback`` except
661 that it is executed when the task succeeds.
662 :param on_skipped_callback: much like the ``on_failure_callback`` except
663 that it is executed when skipped occur; this callback will be called only if AirflowSkipException get raised.
664 Explicitly it is NOT called if a task is not started to be executed because of a preceding branching
665 decision in the DAG or a trigger rule which causes execution to skip so that the task execution
666 is never scheduled.
667 :param pre_execute: a function to be called immediately before task
668 execution, receiving a context dictionary; raising an exception will
669 prevent the task from being executed.
670
671 |experimental|
672 :param post_execute: a function to be called immediately after task
673 execution, receiving a context dictionary and task result; raising an
674 exception will prevent the task from succeeding.
675
676 |experimental|
677 :param trigger_rule: defines the rule by which dependencies are applied
678 for the task to get triggered. Options are:
679 ``{ all_success | all_failed | all_done | all_skipped | one_success | one_done |
680 one_failed | none_failed | none_failed_min_one_success | none_skipped | always}``
681 default is ``all_success``. Options can be set as string or
682 using the constants defined in the static class
683 ``airflow.utils.TriggerRule``
684 :param resources: A map of resource parameter names (the argument names of the
685 Resources constructor) to their values.
686 :param run_as_user: unix username to impersonate while running the task
687 :param max_active_tis_per_dag: When set, a task will be able to limit the concurrent
688 runs across execution_dates.
689 :param max_active_tis_per_dagrun: When set, a task will be able to limit the concurrent
690 task instances per DAG run.
691 :param executor: Which executor to target when running this task. NOT YET SUPPORTED
692 :param executor_config: Additional task-level configuration parameters that are
693 interpreted by a specific executor. Parameters are namespaced by the name of
694 executor.
695
696 **Example**: to run this task in a specific docker container through
697 the KubernetesExecutor ::
698
699 MyOperator(..., executor_config={"KubernetesExecutor": {"image": "myCustomDockerImage"}})
700
701 :param do_xcom_push: if True, an XCom is pushed containing the Operator's
702 result
703 :param multiple_outputs: if True and do_xcom_push is True, pushes multiple XComs, one for each
704 key in the returned dictionary result. If False and do_xcom_push is True, pushes a single XCom.
705 :param task_group: The TaskGroup to which the task should belong. This is typically provided when not
706 using a TaskGroup as a context manager.
707 :param doc: Add documentation or notes to your Task objects that is visible in
708 Task Instance details View in the Webserver
709 :param doc_md: Add documentation (in Markdown format) or notes to your Task objects
710 that is visible in Task Instance details View in the Webserver
711 :param doc_rst: Add documentation (in RST format) or notes to your Task objects
712 that is visible in Task Instance details View in the Webserver
713 :param doc_json: Add documentation (in JSON format) or notes to your Task objects
714 that is visible in Task Instance details View in the Webserver
715 :param doc_yaml: Add documentation (in YAML format) or notes to your Task objects
716 that is visible in Task Instance details View in the Webserver
717 :param task_display_name: The display name of the task which appears on the UI.
718 :param logger_name: Name of the logger used by the Operator to emit logs.
719 If set to `None` (default), the logger name will fall back to
720 `airflow.task.operators.{class.__module__}.{class.__name__}` (e.g. SimpleHttpOperator will have
721 *airflow.task.operators.airflow.providers.http.operators.http.SimpleHttpOperator* as logger).
722 :param allow_nested_operators: if True, when an operator is executed within another one a warning message
723 will be logged. If False, then an exception will be raised if the operator is badly used (e.g. nested
724 within another one). In future releases of Airflow this parameter will be removed and an exception
725 will always be thrown when operators are nested within each other (default is True).
726
727 **Example**: example of a bad operator mixin usage::
728
729 @task(provide_context=True)
730 def say_hello_world(**context):
731 hello_world_task = BashOperator(
732 task_id="hello_world_task",
733 bash_command="python -c \"print('Hello, world!')\"",
734 dag=dag,
735 )
736 hello_world_task.execute(context)
737 """
738
739 # Implementing Operator.
740 template_fields: Sequence[str] = ()
741 template_ext: Sequence[str] = ()
742
743 template_fields_renderers: dict[str, str] = {}
744
745 # Defines the color in the UI
746 ui_color: str = "#fff"
747 ui_fgcolor: str = "#000"
748
749 pool: str = ""
750
751 # base list which includes all the attrs that don't need deep copy.
752 _base_operator_shallow_copy_attrs: tuple[str, ...] = (
753 "user_defined_macros",
754 "user_defined_filters",
755 "params",
756 )
757
758 # each operator should override this class attr for shallow copy attrs.
759 shallow_copy_attrs: Sequence[str] = ()
760
761 # Defines the operator level extra links
762 operator_extra_links: Collection[BaseOperatorLink] = ()
763
764 # The _serialized_fields are lazily loaded when get_serialized_fields() method is called
765 __serialized_fields: frozenset[str] | None = None
766
767 partial: Callable[..., OperatorPartial] = _PartialDescriptor() # type: ignore
768
769 _comps = {
770 "task_id",
771 "dag_id",
772 "owner",
773 "email",
774 "email_on_retry",
775 "retry_delay",
776 "retry_exponential_backoff",
777 "max_retry_delay",
778 "start_date",
779 "end_date",
780 "depends_on_past",
781 "wait_for_downstream",
782 "priority_weight",
783 "sla",
784 "execution_timeout",
785 "on_execute_callback",
786 "on_failure_callback",
787 "on_success_callback",
788 "on_retry_callback",
789 "on_skipped_callback",
790 "do_xcom_push",
791 "multiple_outputs",
792 "allow_nested_operators",
793 "executor",
794 }
795
796 # Defines if the operator supports lineage without manual definitions
797 supports_lineage = False
798
799 # If True then the class constructor was called
800 __instantiated = False
801 # List of args as passed to `init()`, after apply_defaults() has been updated. Used to "recreate" the task
802 # when mapping
803 __init_kwargs: dict[str, Any]
804
805 # Set to True before calling execute method
806 _lock_for_execution = False
807
808 _dag: DAG | None = None
809 task_group: TaskGroup | None = None
810
811 # subdag parameter is only set for SubDagOperator.
812 # Setting it to None by default as other Operators do not have that field
813 subdag: DAG | None = None
814
815 start_date: pendulum.DateTime | None = None
816 end_date: pendulum.DateTime | None = None
817
818 # Set to True for an operator instantiated by a mapped operator.
819 __from_mapped = False
820
821 start_trigger: BaseTrigger | None = None
822 next_method: str | None = None
823
824 def __init__(
825 self,
826 task_id: str,
827 owner: str = DEFAULT_OWNER,
828 email: str | Iterable[str] | None = None,
829 email_on_retry: bool = conf.getboolean("email", "default_email_on_retry", fallback=True),
830 email_on_failure: bool = conf.getboolean("email", "default_email_on_failure", fallback=True),
831 retries: int | None = DEFAULT_RETRIES,
832 retry_delay: timedelta | float = DEFAULT_RETRY_DELAY,
833 retry_exponential_backoff: bool = False,
834 max_retry_delay: timedelta | float | None = None,
835 start_date: datetime | None = None,
836 end_date: datetime | None = None,
837 depends_on_past: bool = False,
838 ignore_first_depends_on_past: bool = DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
839 wait_for_past_depends_before_skipping: bool = DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
840 wait_for_downstream: bool = False,
841 dag: DAG | None = None,
842 params: collections.abc.MutableMapping | None = None,
843 default_args: dict | None = None,
844 priority_weight: int = DEFAULT_PRIORITY_WEIGHT,
845 weight_rule: str | PriorityWeightStrategy = DEFAULT_WEIGHT_RULE,
846 queue: str = DEFAULT_QUEUE,
847 pool: str | None = None,
848 pool_slots: int = DEFAULT_POOL_SLOTS,
849 sla: timedelta | None = None,
850 execution_timeout: timedelta | None = DEFAULT_TASK_EXECUTION_TIMEOUT,
851 on_execute_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None,
852 on_failure_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None,
853 on_success_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None,
854 on_retry_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None,
855 on_skipped_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None,
856 pre_execute: TaskPreExecuteHook | None = None,
857 post_execute: TaskPostExecuteHook | None = None,
858 trigger_rule: str = DEFAULT_TRIGGER_RULE,
859 resources: dict[str, Any] | None = None,
860 run_as_user: str | None = None,
861 task_concurrency: int | None = None,
862 map_index_template: str | None = None,
863 max_active_tis_per_dag: int | None = None,
864 max_active_tis_per_dagrun: int | None = None,
865 executor: str | None = None,
866 executor_config: dict | None = None,
867 do_xcom_push: bool = True,
868 multiple_outputs: bool = False,
869 inlets: Any | None = None,
870 outlets: Any | None = None,
871 task_group: TaskGroup | None = None,
872 doc: str | None = None,
873 doc_md: str | None = None,
874 doc_json: str | None = None,
875 doc_yaml: str | None = None,
876 doc_rst: str | None = None,
877 task_display_name: str | None = None,
878 logger_name: str | None = None,
879 allow_nested_operators: bool = True,
880 **kwargs,
881 ):
882 from airflow.models.dag import DagContext
883 from airflow.utils.task_group import TaskGroupContext
884
885 self.__init_kwargs = {}
886
887 super().__init__()
888
889 kwargs.pop("_airflow_mapped_validation_only", None)
890 if kwargs:
891 if not conf.getboolean("operators", "ALLOW_ILLEGAL_ARGUMENTS"):
892 raise AirflowException(
893 f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). "
894 f"Invalid arguments were:\n**kwargs: {kwargs}",
895 )
896 warnings.warn(
897 f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). "
898 "Support for passing such arguments will be dropped in future. "
899 f"Invalid arguments were:\n**kwargs: {kwargs}",
900 category=RemovedInAirflow3Warning,
901 stacklevel=3,
902 )
903 validate_key(task_id)
904
905 dag = dag or DagContext.get_current_dag()
906 task_group = task_group or TaskGroupContext.get_current_task_group(dag)
907
908 self.task_id = task_group.child_id(task_id) if task_group else task_id
909 if not self.__from_mapped and task_group:
910 task_group.add(self)
911
912 self.owner = owner
913 self.email = email
914 self.email_on_retry = email_on_retry
915 self.email_on_failure = email_on_failure
916
917 if execution_timeout is not None and not isinstance(execution_timeout, timedelta):
918 raise ValueError(
919 f"execution_timeout must be timedelta object but passed as type: {type(execution_timeout)}"
920 )
921 self.execution_timeout = execution_timeout
922
923 self.on_execute_callback = on_execute_callback
924 self.on_failure_callback = on_failure_callback
925 self.on_success_callback = on_success_callback
926 self.on_retry_callback = on_retry_callback
927 self.on_skipped_callback = on_skipped_callback
928 self._pre_execute_hook = pre_execute
929 self._post_execute_hook = post_execute
930
931 if start_date and not isinstance(start_date, datetime):
932 self.log.warning("start_date for %s isn't datetime.datetime", self)
933 elif start_date:
934 self.start_date = timezone.convert_to_utc(start_date)
935
936 if end_date:
937 self.end_date = timezone.convert_to_utc(end_date)
938
939 if executor:
940 warnings.warn(
941 "Specifying executors for operators is not yet"
942 f"supported, the value {executor!r} will have no effect",
943 category=UserWarning,
944 stacklevel=2,
945 )
946 self.executor = executor
947 self.executor_config = executor_config or {}
948 self.run_as_user = run_as_user
949 self.retries = parse_retries(retries)
950 self.queue = queue
951 self.pool = Pool.DEFAULT_POOL_NAME if pool is None else pool
952 self.pool_slots = pool_slots
953 if self.pool_slots < 1:
954 dag_str = f" in dag {dag.dag_id}" if dag else ""
955 raise ValueError(f"pool slots for {self.task_id}{dag_str} cannot be less than 1")
956 self.sla = sla
957
958 if trigger_rule == "dummy":
959 warnings.warn(
960 "dummy Trigger Rule is deprecated. Please use `TriggerRule.ALWAYS`.",
961 RemovedInAirflow3Warning,
962 stacklevel=2,
963 )
964 trigger_rule = TriggerRule.ALWAYS
965
966 if trigger_rule == "none_failed_or_skipped":
967 warnings.warn(
968 "none_failed_or_skipped Trigger Rule is deprecated. "
969 "Please use `none_failed_min_one_success`.",
970 RemovedInAirflow3Warning,
971 stacklevel=2,
972 )
973 trigger_rule = TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
974
975 if not TriggerRule.is_valid(trigger_rule):
976 raise AirflowException(
977 f"The trigger_rule must be one of {TriggerRule.all_triggers()},"
978 f"'{dag.dag_id if dag else ''}.{task_id}'; received '{trigger_rule}'."
979 )
980
981 self.trigger_rule: TriggerRule = TriggerRule(trigger_rule)
982 FailStopDagInvalidTriggerRule.check(dag=dag, trigger_rule=self.trigger_rule)
983
984 self.depends_on_past: bool = depends_on_past
985 self.ignore_first_depends_on_past: bool = ignore_first_depends_on_past
986 self.wait_for_past_depends_before_skipping: bool = wait_for_past_depends_before_skipping
987 self.wait_for_downstream: bool = wait_for_downstream
988 if wait_for_downstream:
989 self.depends_on_past = True
990
991 self.retry_delay = coerce_timedelta(retry_delay, key="retry_delay")
992 self.retry_exponential_backoff = retry_exponential_backoff
993 self.max_retry_delay = (
994 max_retry_delay
995 if max_retry_delay is None
996 else coerce_timedelta(max_retry_delay, key="max_retry_delay")
997 )
998
999 # At execution_time this becomes a normal dict
1000 self.params: ParamsDict | dict = ParamsDict(params)
1001 if priority_weight is not None and not isinstance(priority_weight, int):
1002 raise AirflowException(
1003 f"`priority_weight` for task '{self.task_id}' only accepts integers, "
1004 f"received '{type(priority_weight)}'."
1005 )
1006 self.priority_weight = priority_weight
1007 self.weight_rule = validate_and_load_priority_weight_strategy(weight_rule)
1008 self.resources = coerce_resources(resources)
1009 if task_concurrency and not max_active_tis_per_dag:
1010 # TODO: Remove in Airflow 3.0
1011 warnings.warn(
1012 "The 'task_concurrency' parameter is deprecated. Please use 'max_active_tis_per_dag'.",
1013 RemovedInAirflow3Warning,
1014 stacklevel=2,
1015 )
1016 max_active_tis_per_dag = task_concurrency
1017 self.max_active_tis_per_dag: int | None = max_active_tis_per_dag
1018 self.max_active_tis_per_dagrun: int | None = max_active_tis_per_dagrun
1019 self.do_xcom_push: bool = do_xcom_push
1020 self.map_index_template: str | None = map_index_template
1021 self.multiple_outputs: bool = multiple_outputs
1022
1023 self.doc_md = doc_md
1024 self.doc_json = doc_json
1025 self.doc_yaml = doc_yaml
1026 self.doc_rst = doc_rst
1027 self.doc = doc
1028 # Populate the display field only if provided and different from task id
1029 self._task_display_property_value = (
1030 task_display_name if task_display_name and task_display_name != task_id else None
1031 )
1032
1033 self.upstream_task_ids: set[str] = set()
1034 self.downstream_task_ids: set[str] = set()
1035
1036 if dag:
1037 self.dag = dag
1038
1039 self._log_config_logger_name = "airflow.task.operators"
1040 self._logger_name = logger_name
1041 self.allow_nested_operators: bool = allow_nested_operators
1042
1043 # Lineage
1044 self.inlets: list = []
1045 self.outlets: list = []
1046
1047 if inlets:
1048 self.inlets = (
1049 inlets
1050 if isinstance(inlets, list)
1051 else [
1052 inlets,
1053 ]
1054 )
1055
1056 if outlets:
1057 self.outlets = (
1058 outlets
1059 if isinstance(outlets, list)
1060 else [
1061 outlets,
1062 ]
1063 )
1064
1065 if isinstance(self.template_fields, str):
1066 warnings.warn(
1067 f"The `template_fields` value for {self.task_type} is a string "
1068 "but should be a list or tuple of string. Wrapping it in a list for execution. "
1069 f"Please update {self.task_type} accordingly.",
1070 UserWarning,
1071 stacklevel=2,
1072 )
1073 self.template_fields = [self.template_fields]
1074
1075 self._is_setup = False
1076 self._is_teardown = False
1077 if SetupTeardownContext.active:
1078 SetupTeardownContext.update_context_map(self)
1079
1080 def __eq__(self, other):
1081 if type(self) is type(other):
1082 # Use getattr() instead of __dict__ as __dict__ doesn't return
1083 # correct values for properties.
1084 return all(getattr(self, c, None) == getattr(other, c, None) for c in self._comps)
1085 return False
1086
1087 def __ne__(self, other):
1088 return not self == other
1089
1090 def __hash__(self):
1091 hash_components = [type(self)]
1092 for component in self._comps:
1093 val = getattr(self, component, None)
1094 try:
1095 hash(val)
1096 hash_components.append(val)
1097 except TypeError:
1098 hash_components.append(repr(val))
1099 return hash(tuple(hash_components))
1100
1101 # including lineage information
1102 def __or__(self, other):
1103 """
1104 Return [This Operator] | [Operator].
1105
1106 The inlets of other will be set to pick up the outlets from this operator.
1107 Other will be set as a downstream task of this operator.
1108 """
1109 if isinstance(other, BaseOperator):
1110 if not self.outlets and not self.supports_lineage:
1111 raise ValueError("No outlets defined for this operator")
1112 other.add_inlets([self.task_id])
1113 self.set_downstream(other)
1114 else:
1115 raise TypeError(f"Right hand side ({other}) is not an Operator")
1116
1117 return self
1118
1119 # /Composing Operators ---------------------------------------------
1120
1121 def __gt__(self, other):
1122 """
1123 Return [Operator] > [Outlet].
1124
1125 If other is an attr annotated object it is set as an outlet of this Operator.
1126 """
1127 if not isinstance(other, Iterable):
1128 other = [other]
1129
1130 for obj in other:
1131 if not attr.has(obj):
1132 raise TypeError(f"Left hand side ({obj}) is not an outlet")
1133 self.add_outlets(other)
1134
1135 return self
1136
1137 def __lt__(self, other):
1138 """
1139 Return [Inlet] > [Operator] or [Operator] < [Inlet].
1140
1141 If other is an attr annotated object it is set as an inlet to this operator.
1142 """
1143 if not isinstance(other, Iterable):
1144 other = [other]
1145
1146 for obj in other:
1147 if not attr.has(obj):
1148 raise TypeError(f"{obj} cannot be an inlet")
1149 self.add_inlets(other)
1150
1151 return self
1152
1153 def __setattr__(self, key, value):
1154 super().__setattr__(key, value)
1155 if self.__from_mapped or self._lock_for_execution:
1156 return # Skip any custom behavior for validation and during execute.
1157 if key in self.__init_kwargs:
1158 self.__init_kwargs[key] = value
1159 if self.__instantiated and key in self.template_fields:
1160 # Resolve upstreams set by assigning an XComArg after initializing
1161 # an operator, example:
1162 # op = BashOperator()
1163 # op.bash_command = "sleep 1"
1164 self.set_xcomargs_dependencies()
1165
1166 def add_inlets(self, inlets: Iterable[Any]):
1167 """Set inlets to this operator."""
1168 self.inlets.extend(inlets)
1169
1170 def add_outlets(self, outlets: Iterable[Any]):
1171 """Define the outlets of this operator."""
1172 self.outlets.extend(outlets)
1173
1174 def get_inlet_defs(self):
1175 """Get inlet definitions on this task.
1176
1177 :meta private:
1178 """
1179 return self.inlets
1180
1181 def get_outlet_defs(self):
1182 """Get outlet definitions on this task.
1183
1184 :meta private:
1185 """
1186 return self.outlets
1187
1188 def get_dag(self) -> DAG | None:
1189 return self._dag
1190
1191 @property # type: ignore[override]
1192 def dag(self) -> DAG: # type: ignore[override]
1193 """Returns the Operator's DAG if set, otherwise raises an error."""
1194 if self._dag:
1195 return self._dag
1196 else:
1197 raise AirflowException(f"Operator {self} has not been assigned to a DAG yet")
1198
1199 @dag.setter
1200 def dag(self, dag: DAG | None):
1201 """Operators can be assigned to one DAG, one time. Repeat assignments to that same DAG are ok."""
1202 from airflow.models.dag import DAG
1203
1204 if dag is None:
1205 self._dag = None
1206 return
1207 if not isinstance(dag, DAG):
1208 raise TypeError(f"Expected DAG; received {dag.__class__.__name__}")
1209 elif self.has_dag() and self.dag is not dag:
1210 raise AirflowException(f"The DAG assigned to {self} can not be changed.")
1211
1212 if self.__from_mapped:
1213 pass # Don't add to DAG -- the mapped task takes the place.
1214 elif dag.task_dict.get(self.task_id) is not self:
1215 dag.add_task(self)
1216
1217 self._dag = dag
1218
1219 @property
1220 def task_display_name(self) -> str:
1221 return self._task_display_property_value or self.task_id
1222
1223 def has_dag(self):
1224 """Return True if the Operator has been assigned to a DAG."""
1225 return self._dag is not None
1226
1227 deps: frozenset[BaseTIDep] = frozenset(
1228 {
1229 NotInRetryPeriodDep(),
1230 PrevDagrunDep(),
1231 TriggerRuleDep(),
1232 NotPreviouslySkippedDep(),
1233 MappedTaskUpstreamDep(),
1234 }
1235 )
1236 """
1237 Returns the set of dependencies for the operator. These differ from execution
1238 context dependencies in that they are specific to tasks and can be
1239 extended/overridden by subclasses.
1240 """
1241
1242 def prepare_for_execution(self) -> BaseOperator:
1243 """Lock task for execution to disable custom action in ``__setattr__`` and return a copy."""
1244 other = copy.copy(self)
1245 other._lock_for_execution = True
1246 return other
1247
1248 def set_xcomargs_dependencies(self) -> None:
1249 """
1250 Resolve upstream dependencies of a task.
1251
1252 In this way passing an ``XComArg`` as value for a template field
1253 will result in creating upstream relation between two tasks.
1254
1255 **Example**: ::
1256
1257 with DAG(...):
1258 generate_content = GenerateContentOperator(task_id="generate_content")
1259 send_email = EmailOperator(..., html_content=generate_content.output)
1260
1261 # This is equivalent to
1262 with DAG(...):
1263 generate_content = GenerateContentOperator(task_id="generate_content")
1264 send_email = EmailOperator(..., html_content="{{ task_instance.xcom_pull('generate_content') }}")
1265 generate_content >> send_email
1266
1267 """
1268 from airflow.models.xcom_arg import XComArg
1269
1270 for field in self.template_fields:
1271 if hasattr(self, field):
1272 arg = getattr(self, field)
1273 XComArg.apply_upstream_relationship(self, arg)
1274
1275 @prepare_lineage
1276 def pre_execute(self, context: Any):
1277 """Execute right before self.execute() is called."""
1278 if self._pre_execute_hook is None:
1279 return
1280 ExecutionCallableRunner(
1281 self._pre_execute_hook,
1282 context_get_outlet_events(context),
1283 logger=self.log,
1284 ).run(context)
1285
1286 def execute(self, context: Context) -> Any:
1287 """
1288 Derive when creating an operator.
1289
1290 Context is the same dictionary used as when rendering jinja templates.
1291
1292 Refer to get_template_context for more context.
1293 """
1294 raise NotImplementedError()
1295
1296 @apply_lineage
1297 def post_execute(self, context: Any, result: Any = None):
1298 """
1299 Execute right after self.execute() is called.
1300
1301 It is passed the execution context and any results returned by the operator.
1302 """
1303 if self._post_execute_hook is None:
1304 return
1305 ExecutionCallableRunner(
1306 self._post_execute_hook,
1307 context_get_outlet_events(context),
1308 logger=self.log,
1309 ).run(context, result)
1310
1311 def on_kill(self) -> None:
1312 """
1313 Override this method to clean up subprocesses when a task instance gets killed.
1314
1315 Any use of the threading, subprocess or multiprocessing module within an
1316 operator needs to be cleaned up, or it will leave ghost processes behind.
1317 """
1318
1319 def __deepcopy__(self, memo):
1320 # Hack sorting double chained task lists by task_id to avoid hitting
1321 # max_depth on deepcopy operations.
1322 sys.setrecursionlimit(5000) # TODO fix this in a better way
1323
1324 cls = self.__class__
1325 result = cls.__new__(cls)
1326 memo[id(self)] = result
1327
1328 shallow_copy = cls.shallow_copy_attrs + cls._base_operator_shallow_copy_attrs
1329
1330 for k, v in self.__dict__.items():
1331 if k == "_BaseOperator__instantiated":
1332 # Don't set this until the _end_, as it changes behaviour of __setattr__
1333 continue
1334 if k not in shallow_copy:
1335 setattr(result, k, copy.deepcopy(v, memo))
1336 else:
1337 setattr(result, k, copy.copy(v))
1338 result.__instantiated = self.__instantiated
1339 return result
1340
1341 def __getstate__(self):
1342 state = dict(self.__dict__)
1343 if self._log:
1344 del state["_log"]
1345
1346 return state
1347
1348 def __setstate__(self, state):
1349 self.__dict__ = state
1350
1351 def render_template_fields(
1352 self,
1353 context: Context,
1354 jinja_env: jinja2.Environment | None = None,
1355 ) -> None:
1356 """Template all attributes listed in *self.template_fields*.
1357
1358 This mutates the attributes in-place and is irreversible.
1359
1360 :param context: Context dict with values to apply on content.
1361 :param jinja_env: Jinja's environment to use for rendering.
1362 """
1363 if not jinja_env:
1364 jinja_env = self.get_template_env()
1365 self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
1366
1367 @provide_session
1368 def clear(
1369 self,
1370 start_date: datetime | None = None,
1371 end_date: datetime | None = None,
1372 upstream: bool = False,
1373 downstream: bool = False,
1374 session: Session = NEW_SESSION,
1375 ):
1376 """Clear the state of task instances associated with the task, following the parameters specified."""
1377 qry = select(TaskInstance).where(TaskInstance.dag_id == self.dag_id)
1378
1379 if start_date:
1380 qry = qry.where(TaskInstance.execution_date >= start_date)
1381 if end_date:
1382 qry = qry.where(TaskInstance.execution_date <= end_date)
1383
1384 tasks = [self.task_id]
1385
1386 if upstream:
1387 tasks += [t.task_id for t in self.get_flat_relatives(upstream=True)]
1388
1389 if downstream:
1390 tasks += [t.task_id for t in self.get_flat_relatives(upstream=False)]
1391
1392 qry = qry.where(TaskInstance.task_id.in_(tasks))
1393 results = session.scalars(qry).all()
1394 count = len(results)
1395 clear_task_instances(results, session, dag=self.dag)
1396 session.commit()
1397 return count
1398
1399 @provide_session
1400 def get_task_instances(
1401 self,
1402 start_date: datetime | None = None,
1403 end_date: datetime | None = None,
1404 session: Session = NEW_SESSION,
1405 ) -> list[TaskInstance]:
1406 """Get task instances related to this task for a specific date range."""
1407 from airflow.models import DagRun
1408
1409 query = (
1410 select(TaskInstance)
1411 .join(TaskInstance.dag_run)
1412 .where(TaskInstance.dag_id == self.dag_id)
1413 .where(TaskInstance.task_id == self.task_id)
1414 )
1415 if start_date:
1416 query = query.where(DagRun.execution_date >= start_date)
1417 if end_date:
1418 query = query.where(DagRun.execution_date <= end_date)
1419 return session.scalars(query.order_by(DagRun.execution_date)).all()
1420
1421 @provide_session
1422 def run(
1423 self,
1424 start_date: datetime | None = None,
1425 end_date: datetime | None = None,
1426 ignore_first_depends_on_past: bool = True,
1427 wait_for_past_depends_before_skipping: bool = False,
1428 ignore_ti_state: bool = False,
1429 mark_success: bool = False,
1430 test_mode: bool = False,
1431 session: Session = NEW_SESSION,
1432 ) -> None:
1433 """Run a set of task instances for a date range."""
1434 from airflow.models import DagRun
1435 from airflow.utils.types import DagRunType
1436
1437 # Assertions for typing -- we need a dag, for this function, and when we have a DAG we are
1438 # _guaranteed_ to have start_date (else we couldn't have been added to a DAG)
1439 if TYPE_CHECKING:
1440 assert self.start_date
1441
1442 start_date = pendulum.instance(start_date or self.start_date)
1443 end_date = pendulum.instance(end_date or self.end_date or timezone.utcnow())
1444
1445 for info in self.dag.iter_dagrun_infos_between(start_date, end_date, align=False):
1446 ignore_depends_on_past = info.logical_date == start_date and ignore_first_depends_on_past
1447 try:
1448 dag_run = session.scalars(
1449 select(DagRun).where(
1450 DagRun.dag_id == self.dag_id,
1451 DagRun.execution_date == info.logical_date,
1452 )
1453 ).one()
1454 ti = TaskInstance(self, run_id=dag_run.run_id)
1455 except NoResultFound:
1456 # This is _mostly_ only used in tests
1457 dr = DagRun(
1458 dag_id=self.dag_id,
1459 run_id=DagRun.generate_run_id(DagRunType.MANUAL, info.logical_date),
1460 run_type=DagRunType.MANUAL,
1461 execution_date=info.logical_date,
1462 data_interval=info.data_interval,
1463 )
1464 ti = TaskInstance(self, run_id=dr.run_id)
1465 ti.dag_run = dr
1466 session.add(dr)
1467 session.flush()
1468
1469 ti.run(
1470 mark_success=mark_success,
1471 ignore_depends_on_past=ignore_depends_on_past,
1472 wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping,
1473 ignore_ti_state=ignore_ti_state,
1474 test_mode=test_mode,
1475 session=session,
1476 )
1477
1478 def dry_run(self) -> None:
1479 """Perform dry run for the operator - just render template fields."""
1480 self.log.info("Dry run")
1481 for field in self.template_fields:
1482 try:
1483 content = getattr(self, field)
1484 except AttributeError:
1485 raise AttributeError(
1486 f"{field!r} is configured as a template field "
1487 f"but {self.task_type} does not have this attribute."
1488 )
1489
1490 if content and isinstance(content, str):
1491 self.log.info("Rendering template for %s", field)
1492 self.log.info(content)
1493
1494 def get_direct_relatives(self, upstream: bool = False) -> Iterable[Operator]:
1495 """Get list of the direct relatives to the current task, upstream or downstream."""
1496 if upstream:
1497 return self.upstream_list
1498 else:
1499 return self.downstream_list
1500
1501 def __repr__(self):
1502 return f"<Task({self.task_type}): {self.task_id}>"
1503
1504 @property
1505 def operator_class(self) -> type[BaseOperator]: # type: ignore[override]
1506 return self.__class__
1507
1508 @property
1509 def task_type(self) -> str:
1510 """@property: type of the task."""
1511 return self.__class__.__name__
1512
1513 @property
1514 def operator_name(self) -> str:
1515 """@property: use a more friendly display name for the operator, if set."""
1516 try:
1517 return self.custom_operator_name # type: ignore
1518 except AttributeError:
1519 return self.task_type
1520
1521 @property
1522 def roots(self) -> list[BaseOperator]:
1523 """Required by DAGNode."""
1524 return [self]
1525
1526 @property
1527 def leaves(self) -> list[BaseOperator]:
1528 """Required by DAGNode."""
1529 return [self]
1530
1531 @property
1532 def output(self) -> XComArg:
1533 """Returns reference to XCom pushed by current operator."""
1534 from airflow.models.xcom_arg import XComArg
1535
1536 return XComArg(operator=self)
1537
1538 @property
1539 def is_setup(self) -> bool:
1540 """Whether the operator is a setup task.
1541
1542 :meta private:
1543 """
1544 return self._is_setup
1545
1546 @is_setup.setter
1547 def is_setup(self, value: bool) -> None:
1548 """Setter for is_setup property.
1549
1550 :meta private:
1551 """
1552 if self.is_teardown and value:
1553 raise ValueError(f"Cannot mark task '{self.task_id}' as setup; task is already a teardown.")
1554 self._is_setup = value
1555
1556 @property
1557 def is_teardown(self) -> bool:
1558 """Whether the operator is a teardown task.
1559
1560 :meta private:
1561 """
1562 return self._is_teardown
1563
1564 @is_teardown.setter
1565 def is_teardown(self, value: bool) -> None:
1566 """
1567 Setter for is_teardown property.
1568
1569 :meta private:
1570 """
1571 if self.is_setup and value:
1572 raise ValueError(f"Cannot mark task '{self.task_id}' as teardown; task is already a setup.")
1573 self._is_teardown = value
1574
1575 @staticmethod
1576 def xcom_push(
1577 context: Any,
1578 key: str,
1579 value: Any,
1580 execution_date: datetime | None = None,
1581 ) -> None:
1582 """
1583 Make an XCom available for tasks to pull.
1584
1585 :param context: Execution Context Dictionary
1586 :param key: A key for the XCom
1587 :param value: A value for the XCom. The value is pickled and stored
1588 in the database.
1589 :param execution_date: if provided, the XCom will not be visible until
1590 this date. This can be used, for example, to send a message to a
1591 task on a future date without it being immediately visible.
1592 """
1593 context["ti"].xcom_push(key=key, value=value, execution_date=execution_date)
1594
1595 @staticmethod
1596 @provide_session
1597 def xcom_pull(
1598 context: Any,
1599 task_ids: str | list[str] | None = None,
1600 dag_id: str | None = None,
1601 key: str = XCOM_RETURN_KEY,
1602 include_prior_dates: bool | None = None,
1603 session: Session = NEW_SESSION,
1604 ) -> Any:
1605 """
1606 Pull XComs that optionally meet certain criteria.
1607
1608 The default value for `key` limits the search to XComs
1609 that were returned by other tasks (as opposed to those that were pushed
1610 manually). To remove this filter, pass key=None (or any desired value).
1611
1612 If a single task_id string is provided, the result is the value of the
1613 most recent matching XCom from that task_id. If multiple task_ids are
1614 provided, a tuple of matching values is returned. None is returned
1615 whenever no matches are found.
1616
1617 :param context: Execution Context Dictionary
1618 :param key: A key for the XCom. If provided, only XComs with matching
1619 keys will be returned. The default key is 'return_value', also
1620 available as a constant XCOM_RETURN_KEY. This key is automatically
1621 given to XComs returned by tasks (as opposed to being pushed
1622 manually). To remove the filter, pass key=None.
1623 :param task_ids: Only XComs from tasks with matching ids will be
1624 pulled. Can pass None to remove the filter.
1625 :param dag_id: If provided, only pulls XComs from this DAG.
1626 If None (default), the DAG of the calling task is used.
1627 :param include_prior_dates: If False, only XComs from the current
1628 execution_date are returned. If True, XComs from previous dates
1629 are returned as well.
1630 """
1631 return context["ti"].xcom_pull(
1632 key=key,
1633 task_ids=task_ids,
1634 dag_id=dag_id,
1635 include_prior_dates=include_prior_dates,
1636 session=session,
1637 )
1638
1639 @classmethod
1640 def get_serialized_fields(cls):
1641 """Stringified DAGs and operators contain exactly these fields."""
1642 if not cls.__serialized_fields:
1643 from airflow.models.dag import DagContext
1644
1645 # make sure the following dummy task is not added to current active
1646 # dag in context, otherwise, it will result in
1647 # `RuntimeError: dictionary changed size during iteration`
1648 # Exception in SerializedDAG.serialize_dag() call.
1649 DagContext.push_context_managed_dag(None)
1650 cls.__serialized_fields = frozenset(
1651 vars(BaseOperator(task_id="test")).keys()
1652 - {
1653 "upstream_task_ids",
1654 "default_args",
1655 "dag",
1656 "_dag",
1657 "label",
1658 "_BaseOperator__instantiated",
1659 "_BaseOperator__init_kwargs",
1660 "_BaseOperator__from_mapped",
1661 "_is_setup",
1662 "_is_teardown",
1663 "_on_failure_fail_dagrun",
1664 }
1665 | { # Class level defaults need to be added to this list
1666 "start_date",
1667 "end_date",
1668 "_task_type",
1669 "_operator_name",
1670 "subdag",
1671 "ui_color",
1672 "ui_fgcolor",
1673 "template_ext",
1674 "template_fields",
1675 "template_fields_renderers",
1676 "params",
1677 "is_setup",
1678 "is_teardown",
1679 "on_failure_fail_dagrun",
1680 "map_index_template",
1681 "start_trigger",
1682 "next_method",
1683 "_needs_expansion",
1684 }
1685 )
1686 DagContext.pop_context_managed_dag()
1687
1688 return cls.__serialized_fields
1689
1690 def serialize_for_task_group(self) -> tuple[DagAttributeTypes, Any]:
1691 """Serialize; required by DAGNode."""
1692 return DagAttributeTypes.OP, self.task_id
1693
1694 @property
1695 def inherits_from_empty_operator(self):
1696 """Used to determine if an Operator is inherited from EmptyOperator."""
1697 # This looks like `isinstance(self, EmptyOperator) would work, but this also
1698 # needs to cope when `self` is a Serialized instance of a EmptyOperator or one
1699 # of its subclasses (which don't inherit from anything but BaseOperator).
1700 return getattr(self, "_is_empty", False)
1701
1702 def defer(
1703 self,
1704 *,
1705 trigger: BaseTrigger,
1706 method_name: str,
1707 kwargs: dict[str, Any] | None = None,
1708 timeout: timedelta | None = None,
1709 ):
1710 """
1711 Mark this Operator "deferred", suspending its execution until the provided trigger fires an event.
1712
1713 This is achieved by raising a special exception (TaskDeferred)
1714 which is caught in the main _execute_task wrapper.
1715 """
1716 raise TaskDeferred(trigger=trigger, method_name=method_name, kwargs=kwargs, timeout=timeout)
1717
1718 def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] | None, context: Context):
1719 """Call this method when a deferred task is resumed."""
1720 # __fail__ is a special signal value for next_method that indicates
1721 # this task was scheduled specifically to fail.
1722 if next_method == "__fail__":
1723 next_kwargs = next_kwargs or {}
1724 traceback = next_kwargs.get("traceback")
1725 if traceback is not None:
1726 self.log.error("Trigger failed:\n%s", "\n".join(traceback))
1727 raise TaskDeferralError(next_kwargs.get("error", "Unknown"))
1728 # Grab the callable off the Operator/Task and add in any kwargs
1729 execute_callable = getattr(self, next_method)
1730 if next_kwargs:
1731 execute_callable = functools.partial(execute_callable, **next_kwargs)
1732 return execute_callable(context)
1733
1734 def unmap(self, resolve: None | dict[str, Any] | tuple[Context, Session]) -> BaseOperator:
1735 """Get the "normal" operator from the current operator.
1736
1737 Since a BaseOperator is not mapped to begin with, this simply returns
1738 the original operator.
1739
1740 :meta private:
1741 """
1742 return self
1743
1744
1745# TODO: Deprecate for Airflow 3.0
1746Chainable = Union[DependencyMixin, Sequence[DependencyMixin]]
1747
1748
1749def chain(*tasks: DependencyMixin | Sequence[DependencyMixin]) -> None:
1750 r"""
1751 Given a number of tasks, builds a dependency chain.
1752
1753 This function accepts values of BaseOperator (aka tasks), EdgeModifiers (aka Labels), XComArg, TaskGroups,
1754 or lists containing any mix of these types (or a mix in the same list). If you want to chain between two
1755 lists you must ensure they have the same length.
1756
1757 Using classic operators/sensors:
1758
1759 .. code-block:: python
1760
1761 chain(t1, [t2, t3], [t4, t5], t6)
1762
1763 is equivalent to::
1764
1765 / -> t2 -> t4 \
1766 t1 -> t6
1767 \ -> t3 -> t5 /
1768
1769 .. code-block:: python
1770
1771 t1.set_downstream(t2)
1772 t1.set_downstream(t3)
1773 t2.set_downstream(t4)
1774 t3.set_downstream(t5)
1775 t4.set_downstream(t6)
1776 t5.set_downstream(t6)
1777
1778 Using task-decorated functions aka XComArgs:
1779
1780 .. code-block:: python
1781
1782 chain(x1(), [x2(), x3()], [x4(), x5()], x6())
1783
1784 is equivalent to::
1785
1786 / -> x2 -> x4 \
1787 x1 -> x6
1788 \ -> x3 -> x5 /
1789
1790 .. code-block:: python
1791
1792 x1 = x1()
1793 x2 = x2()
1794 x3 = x3()
1795 x4 = x4()
1796 x5 = x5()
1797 x6 = x6()
1798 x1.set_downstream(x2)
1799 x1.set_downstream(x3)
1800 x2.set_downstream(x4)
1801 x3.set_downstream(x5)
1802 x4.set_downstream(x6)
1803 x5.set_downstream(x6)
1804
1805 Using TaskGroups:
1806
1807 .. code-block:: python
1808
1809 chain(t1, task_group1, task_group2, t2)
1810
1811 t1.set_downstream(task_group1)
1812 task_group1.set_downstream(task_group2)
1813 task_group2.set_downstream(t2)
1814
1815
1816 It is also possible to mix between classic operator/sensor, EdgeModifiers, XComArg, and TaskGroups:
1817
1818 .. code-block:: python
1819
1820 chain(t1, [Label("branch one"), Label("branch two")], [x1(), x2()], task_group1, x3())
1821
1822 is equivalent to::
1823
1824 / "branch one" -> x1 \
1825 t1 -> task_group1 -> x3
1826 \ "branch two" -> x2 /
1827
1828 .. code-block:: python
1829
1830 x1 = x1()
1831 x2 = x2()
1832 x3 = x3()
1833 label1 = Label("branch one")
1834 label2 = Label("branch two")
1835 t1.set_downstream(label1)
1836 label1.set_downstream(x1)
1837 t2.set_downstream(label2)
1838 label2.set_downstream(x2)
1839 x1.set_downstream(task_group1)
1840 x2.set_downstream(task_group1)
1841 task_group1.set_downstream(x3)
1842
1843 # or
1844
1845 x1 = x1()
1846 x2 = x2()
1847 x3 = x3()
1848 t1.set_downstream(x1, edge_modifier=Label("branch one"))
1849 t1.set_downstream(x2, edge_modifier=Label("branch two"))
1850 x1.set_downstream(task_group1)
1851 x2.set_downstream(task_group1)
1852 task_group1.set_downstream(x3)
1853
1854
1855 :param tasks: Individual and/or list of tasks, EdgeModifiers, XComArgs, or TaskGroups to set dependencies
1856 """
1857 for up_task, down_task in zip(tasks, tasks[1:]):
1858 if isinstance(up_task, DependencyMixin):
1859 up_task.set_downstream(down_task)
1860 continue
1861 if isinstance(down_task, DependencyMixin):
1862 down_task.set_upstream(up_task)
1863 continue
1864 if not isinstance(up_task, Sequence) or not isinstance(down_task, Sequence):
1865 raise TypeError(f"Chain not supported between instances of {type(up_task)} and {type(down_task)}")
1866 up_task_list = up_task
1867 down_task_list = down_task
1868 if len(up_task_list) != len(down_task_list):
1869 raise AirflowException(
1870 f"Chain not supported for different length Iterable. "
1871 f"Got {len(up_task_list)} and {len(down_task_list)}."
1872 )
1873 for up_t, down_t in zip(up_task_list, down_task_list):
1874 up_t.set_downstream(down_t)
1875
1876
1877def cross_downstream(
1878 from_tasks: Sequence[DependencyMixin],
1879 to_tasks: DependencyMixin | Sequence[DependencyMixin],
1880):
1881 r"""
1882 Set downstream dependencies for all tasks in from_tasks to all tasks in to_tasks.
1883
1884 Using classic operators/sensors:
1885
1886 .. code-block:: python
1887
1888 cross_downstream(from_tasks=[t1, t2, t3], to_tasks=[t4, t5, t6])
1889
1890 is equivalent to::
1891
1892 t1 ---> t4
1893 \ /
1894 t2 -X -> t5
1895 / \
1896 t3 ---> t6
1897
1898 .. code-block:: python
1899
1900 t1.set_downstream(t4)
1901 t1.set_downstream(t5)
1902 t1.set_downstream(t6)
1903 t2.set_downstream(t4)
1904 t2.set_downstream(t5)
1905 t2.set_downstream(t6)
1906 t3.set_downstream(t4)
1907 t3.set_downstream(t5)
1908 t3.set_downstream(t6)
1909
1910 Using task-decorated functions aka XComArgs:
1911
1912 .. code-block:: python
1913
1914 cross_downstream(from_tasks=[x1(), x2(), x3()], to_tasks=[x4(), x5(), x6()])
1915
1916 is equivalent to::
1917
1918 x1 ---> x4
1919 \ /
1920 x2 -X -> x5
1921 / \
1922 x3 ---> x6
1923
1924 .. code-block:: python
1925
1926 x1 = x1()
1927 x2 = x2()
1928 x3 = x3()
1929 x4 = x4()
1930 x5 = x5()
1931 x6 = x6()
1932 x1.set_downstream(x4)
1933 x1.set_downstream(x5)
1934 x1.set_downstream(x6)
1935 x2.set_downstream(x4)
1936 x2.set_downstream(x5)
1937 x2.set_downstream(x6)
1938 x3.set_downstream(x4)
1939 x3.set_downstream(x5)
1940 x3.set_downstream(x6)
1941
1942 It is also possible to mix between classic operator/sensor and XComArg tasks:
1943
1944 .. code-block:: python
1945
1946 cross_downstream(from_tasks=[t1, x2(), t3], to_tasks=[x1(), t2, x3()])
1947
1948 is equivalent to::
1949
1950 t1 ---> x1
1951 \ /
1952 x2 -X -> t2
1953 / \
1954 t3 ---> x3
1955
1956 .. code-block:: python
1957
1958 x1 = x1()
1959 x2 = x2()
1960 x3 = x3()
1961 t1.set_downstream(x1)
1962 t1.set_downstream(t2)
1963 t1.set_downstream(x3)
1964 x2.set_downstream(x1)
1965 x2.set_downstream(t2)
1966 x2.set_downstream(x3)
1967 t3.set_downstream(x1)
1968 t3.set_downstream(t2)
1969 t3.set_downstream(x3)
1970
1971 :param from_tasks: List of tasks or XComArgs to start from.
1972 :param to_tasks: List of tasks or XComArgs to set as downstream dependencies.
1973 """
1974 for task in from_tasks:
1975 task.set_downstream(to_tasks)
1976
1977
1978def chain_linear(*elements: DependencyMixin | Sequence[DependencyMixin]):
1979 """
1980 Simplify task dependency definition.
1981
1982 E.g.: suppose you want precedence like so::
1983
1984 ╭─op2─╮ ╭─op4─╮
1985 op1─┤ ├─├─op5─┤─op7
1986 ╰-op3─╯ ╰-op6─╯
1987
1988 Then you can accomplish like so::
1989
1990 chain_linear(op1, [op2, op3], [op4, op5, op6], op7)
1991
1992 :param elements: a list of operators / lists of operators
1993 """
1994 if not elements:
1995 raise ValueError("No tasks provided; nothing to do.")
1996 prev_elem = None
1997 deps_set = False
1998 for curr_elem in elements:
1999 if isinstance(curr_elem, EdgeModifier):
2000 raise ValueError("Labels are not supported by chain_linear")
2001 if prev_elem is not None:
2002 for task in prev_elem:
2003 task >> curr_elem
2004 if not deps_set:
2005 deps_set = True
2006 prev_elem = [curr_elem] if isinstance(curr_elem, DependencyMixin) else curr_elem
2007 if not deps_set:
2008 raise ValueError("No dependencies were set. Did you forget to expand with `*`?")
2009
2010
2011def __getattr__(name):
2012 """
2013 PEP-562: Lazy loaded attributes on python modules.
2014
2015 :meta private:
2016 """
2017 path = __deprecated_imports.get(name)
2018 if not path:
2019 raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
2020
2021 from airflow.utils.module_loading import import_string
2022
2023 warnings.warn(
2024 f"Import `{__name__}.{name}` is deprecated. Please use `{path}.{name}`.",
2025 RemovedInAirflow3Warning,
2026 stacklevel=2,
2027 )
2028 val = import_string(f"{path}.{name}")
2029
2030 # Store for next time
2031 globals()[name] = val
2032 return val
2033
2034
2035__deprecated_imports = {
2036 "BaseOperatorLink": "airflow.models.baseoperatorlink",
2037}