Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/models/baseoperator.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

665 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18""" 

19Base operator for all operators. 

20 

21:sphinx-autoapi-skip: 

22""" 

23 

24from __future__ import annotations 

25 

26import abc 

27import collections.abc 

28import contextlib 

29import copy 

30import functools 

31import inspect 

32import logging 

33import sys 

34import warnings 

35from datetime import datetime, timedelta 

36from functools import total_ordering, wraps 

37from types import FunctionType 

38from typing import ( 

39 TYPE_CHECKING, 

40 Any, 

41 Callable, 

42 Collection, 

43 Iterable, 

44 Sequence, 

45 TypeVar, 

46 Union, 

47 cast, 

48) 

49 

50import attr 

51import pendulum 

52from dateutil.relativedelta import relativedelta 

53from sqlalchemy import select 

54from sqlalchemy.orm.exc import NoResultFound 

55 

56from airflow.configuration import conf 

57from airflow.exceptions import ( 

58 AirflowException, 

59 FailStopDagInvalidTriggerRule, 

60 RemovedInAirflow3Warning, 

61 TaskDeferralError, 

62 TaskDeferred, 

63) 

64from airflow.lineage import apply_lineage, prepare_lineage 

65from airflow.models.abstractoperator import ( 

66 DEFAULT_EXECUTOR, 

67 DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, 

68 DEFAULT_OWNER, 

69 DEFAULT_POOL_SLOTS, 

70 DEFAULT_PRIORITY_WEIGHT, 

71 DEFAULT_QUEUE, 

72 DEFAULT_RETRIES, 

73 DEFAULT_RETRY_DELAY, 

74 DEFAULT_TASK_EXECUTION_TIMEOUT, 

75 DEFAULT_TRIGGER_RULE, 

76 DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, 

77 DEFAULT_WEIGHT_RULE, 

78 AbstractOperator, 

79) 

80from airflow.models.base import _sentinel 

81from airflow.models.mappedoperator import OperatorPartial, validate_mapping_kwargs 

82from airflow.models.param import ParamsDict 

83from airflow.models.pool import Pool 

84from airflow.models.taskinstance import TaskInstance, clear_task_instances 

85from airflow.models.taskmixin import DependencyMixin 

86from airflow.serialization.enums import DagAttributeTypes 

87from airflow.task.priority_strategy import PriorityWeightStrategy, validate_and_load_priority_weight_strategy 

88from airflow.ti_deps.deps.mapped_task_upstream_dep import MappedTaskUpstreamDep 

89from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep 

90from airflow.ti_deps.deps.not_previously_skipped_dep import NotPreviouslySkippedDep 

91from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep 

92from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep 

93from airflow.utils import timezone 

94from airflow.utils.context import Context, context_get_outlet_events 

95from airflow.utils.decorators import fixup_decorator_warning_stack 

96from airflow.utils.edgemodifier import EdgeModifier 

97from airflow.utils.helpers import validate_key 

98from airflow.utils.operator_helpers import ExecutionCallableRunner 

99from airflow.utils.operator_resources import Resources 

100from airflow.utils.session import NEW_SESSION, provide_session 

101from airflow.utils.setup_teardown import SetupTeardownContext 

102from airflow.utils.trigger_rule import TriggerRule 

103from airflow.utils.types import NOTSET 

104from airflow.utils.xcom import XCOM_RETURN_KEY 

105 

106if TYPE_CHECKING: 

107 from types import ClassMethodDescriptorType 

108 

109 import jinja2 # Slow import. 

110 from sqlalchemy.orm import Session 

111 

112 from airflow.models.abstractoperator import TaskStateChangeCallback 

113 from airflow.models.baseoperatorlink import BaseOperatorLink 

114 from airflow.models.dag import DAG 

115 from airflow.models.operator import Operator 

116 from airflow.models.xcom_arg import XComArg 

117 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep 

118 from airflow.triggers.base import BaseTrigger 

119 from airflow.utils.task_group import TaskGroup 

120 from airflow.utils.types import ArgNotSet 

121 

122ScheduleInterval = Union[str, timedelta, relativedelta] 

123 

124TaskPreExecuteHook = Callable[[Context], None] 

125TaskPostExecuteHook = Callable[[Context, Any], None] 

126 

127T = TypeVar("T", bound=FunctionType) 

128 

129logger = logging.getLogger("airflow.models.baseoperator.BaseOperator") 

130 

131 

132def parse_retries(retries: Any) -> int | None: 

133 if retries is None or type(retries) == int: # noqa: E721 

134 return retries 

135 try: 

136 parsed_retries = int(retries) 

137 except (TypeError, ValueError): 

138 raise AirflowException(f"'retries' type must be int, not {type(retries).__name__}") 

139 logger.warning("Implicitly converting 'retries' from %r to int", retries) 

140 return parsed_retries 

141 

142 

143def coerce_timedelta(value: float | timedelta, *, key: str) -> timedelta: 

144 if isinstance(value, timedelta): 

145 return value 

146 logger.debug("%s isn't a timedelta object, assuming secs", key) 

147 return timedelta(seconds=value) 

148 

149 

150def coerce_resources(resources: dict[str, Any] | None) -> Resources | None: 

151 if resources is None: 

152 return None 

153 return Resources(**resources) 

154 

155 

156def _get_parent_defaults(dag: DAG | None, task_group: TaskGroup | None) -> tuple[dict, ParamsDict]: 

157 if not dag: 

158 return {}, ParamsDict() 

159 dag_args = copy.copy(dag.default_args) 

160 dag_params = copy.deepcopy(dag.params) 

161 if task_group: 

162 if task_group.default_args and not isinstance(task_group.default_args, collections.abc.Mapping): 

163 raise TypeError("default_args must be a mapping") 

164 dag_args.update(task_group.default_args) 

165 return dag_args, dag_params 

166 

167 

168def get_merged_defaults( 

169 dag: DAG | None, 

170 task_group: TaskGroup | None, 

171 task_params: collections.abc.MutableMapping | None, 

172 task_default_args: dict | None, 

173) -> tuple[dict, ParamsDict]: 

174 args, params = _get_parent_defaults(dag, task_group) 

175 if task_params: 

176 if not isinstance(task_params, collections.abc.Mapping): 

177 raise TypeError("params must be a mapping") 

178 params.update(task_params) 

179 if task_default_args: 

180 if not isinstance(task_default_args, collections.abc.Mapping): 

181 raise TypeError("default_args must be a mapping") 

182 args.update(task_default_args) 

183 with contextlib.suppress(KeyError): 

184 params.update(task_default_args["params"] or {}) 

185 return args, params 

186 

187 

188class _PartialDescriptor: 

189 """A descriptor that guards against ``.partial`` being called on Task objects.""" 

190 

191 class_method: ClassMethodDescriptorType | None = None 

192 

193 def __get__( 

194 self, obj: BaseOperator, cls: type[BaseOperator] | None = None 

195 ) -> Callable[..., OperatorPartial]: 

196 # Call this "partial" so it looks nicer in stack traces. 

197 def partial(**kwargs): 

198 raise TypeError("partial can only be called on Operator classes, not Tasks themselves") 

199 

200 if obj is not None: 

201 return partial 

202 return self.class_method.__get__(cls, cls) 

203 

204 

205_PARTIAL_DEFAULTS: dict[str, Any] = { 

206 "map_index_template": None, 

207 "owner": DEFAULT_OWNER, 

208 "trigger_rule": DEFAULT_TRIGGER_RULE, 

209 "depends_on_past": False, 

210 "ignore_first_depends_on_past": DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, 

211 "wait_for_past_depends_before_skipping": DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, 

212 "wait_for_downstream": False, 

213 "retries": DEFAULT_RETRIES, 

214 "executor": DEFAULT_EXECUTOR, 

215 "queue": DEFAULT_QUEUE, 

216 "pool_slots": DEFAULT_POOL_SLOTS, 

217 "execution_timeout": DEFAULT_TASK_EXECUTION_TIMEOUT, 

218 "retry_delay": DEFAULT_RETRY_DELAY, 

219 "retry_exponential_backoff": False, 

220 "priority_weight": DEFAULT_PRIORITY_WEIGHT, 

221 "weight_rule": DEFAULT_WEIGHT_RULE, 

222 "inlets": [], 

223 "outlets": [], 

224 "allow_nested_operators": True, 

225} 

226 

227 

228# This is what handles the actual mapping. 

229def partial( 

230 operator_class: type[BaseOperator], 

231 *, 

232 task_id: str, 

233 dag: DAG | None = None, 

234 task_group: TaskGroup | None = None, 

235 start_date: datetime | ArgNotSet = NOTSET, 

236 end_date: datetime | ArgNotSet = NOTSET, 

237 owner: str | ArgNotSet = NOTSET, 

238 email: None | str | Iterable[str] | ArgNotSet = NOTSET, 

239 params: collections.abc.MutableMapping | None = None, 

240 resources: dict[str, Any] | None | ArgNotSet = NOTSET, 

241 trigger_rule: str | ArgNotSet = NOTSET, 

242 depends_on_past: bool | ArgNotSet = NOTSET, 

243 ignore_first_depends_on_past: bool | ArgNotSet = NOTSET, 

244 wait_for_past_depends_before_skipping: bool | ArgNotSet = NOTSET, 

245 wait_for_downstream: bool | ArgNotSet = NOTSET, 

246 retries: int | None | ArgNotSet = NOTSET, 

247 queue: str | ArgNotSet = NOTSET, 

248 pool: str | ArgNotSet = NOTSET, 

249 pool_slots: int | ArgNotSet = NOTSET, 

250 execution_timeout: timedelta | None | ArgNotSet = NOTSET, 

251 max_retry_delay: None | timedelta | float | ArgNotSet = NOTSET, 

252 retry_delay: timedelta | float | ArgNotSet = NOTSET, 

253 retry_exponential_backoff: bool | ArgNotSet = NOTSET, 

254 priority_weight: int | ArgNotSet = NOTSET, 

255 weight_rule: str | PriorityWeightStrategy | ArgNotSet = NOTSET, 

256 sla: timedelta | None | ArgNotSet = NOTSET, 

257 map_index_template: str | None | ArgNotSet = NOTSET, 

258 max_active_tis_per_dag: int | None | ArgNotSet = NOTSET, 

259 max_active_tis_per_dagrun: int | None | ArgNotSet = NOTSET, 

260 on_execute_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET, 

261 on_failure_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET, 

262 on_success_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET, 

263 on_retry_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET, 

264 on_skipped_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET, 

265 run_as_user: str | None | ArgNotSet = NOTSET, 

266 executor: str | None | ArgNotSet = NOTSET, 

267 executor_config: dict | None | ArgNotSet = NOTSET, 

268 inlets: Any | None | ArgNotSet = NOTSET, 

269 outlets: Any | None | ArgNotSet = NOTSET, 

270 doc: str | None | ArgNotSet = NOTSET, 

271 doc_md: str | None | ArgNotSet = NOTSET, 

272 doc_json: str | None | ArgNotSet = NOTSET, 

273 doc_yaml: str | None | ArgNotSet = NOTSET, 

274 doc_rst: str | None | ArgNotSet = NOTSET, 

275 task_display_name: str | None | ArgNotSet = NOTSET, 

276 logger_name: str | None | ArgNotSet = NOTSET, 

277 allow_nested_operators: bool = True, 

278 **kwargs, 

279) -> OperatorPartial: 

280 from airflow.models.dag import DagContext 

281 from airflow.utils.task_group import TaskGroupContext 

282 

283 validate_mapping_kwargs(operator_class, "partial", kwargs) 

284 

285 dag = dag or DagContext.get_current_dag() 

286 if dag: 

287 task_group = task_group or TaskGroupContext.get_current_task_group(dag) 

288 if task_group: 

289 task_id = task_group.child_id(task_id) 

290 

291 # Merge DAG and task group level defaults into user-supplied values. 

292 dag_default_args, partial_params = get_merged_defaults( 

293 dag=dag, 

294 task_group=task_group, 

295 task_params=params, 

296 task_default_args=kwargs.pop("default_args", None), 

297 ) 

298 

299 # Create partial_kwargs from args and kwargs 

300 partial_kwargs: dict[str, Any] = { 

301 **kwargs, 

302 "dag": dag, 

303 "task_group": task_group, 

304 "task_id": task_id, 

305 "map_index_template": map_index_template, 

306 "start_date": start_date, 

307 "end_date": end_date, 

308 "owner": owner, 

309 "email": email, 

310 "trigger_rule": trigger_rule, 

311 "depends_on_past": depends_on_past, 

312 "ignore_first_depends_on_past": ignore_first_depends_on_past, 

313 "wait_for_past_depends_before_skipping": wait_for_past_depends_before_skipping, 

314 "wait_for_downstream": wait_for_downstream, 

315 "retries": retries, 

316 "queue": queue, 

317 "pool": pool, 

318 "pool_slots": pool_slots, 

319 "execution_timeout": execution_timeout, 

320 "max_retry_delay": max_retry_delay, 

321 "retry_delay": retry_delay, 

322 "retry_exponential_backoff": retry_exponential_backoff, 

323 "priority_weight": priority_weight, 

324 "weight_rule": weight_rule, 

325 "sla": sla, 

326 "max_active_tis_per_dag": max_active_tis_per_dag, 

327 "max_active_tis_per_dagrun": max_active_tis_per_dagrun, 

328 "on_execute_callback": on_execute_callback, 

329 "on_failure_callback": on_failure_callback, 

330 "on_retry_callback": on_retry_callback, 

331 "on_success_callback": on_success_callback, 

332 "on_skipped_callback": on_skipped_callback, 

333 "run_as_user": run_as_user, 

334 "executor": executor, 

335 "executor_config": executor_config, 

336 "inlets": inlets, 

337 "outlets": outlets, 

338 "resources": resources, 

339 "doc": doc, 

340 "doc_json": doc_json, 

341 "doc_md": doc_md, 

342 "doc_rst": doc_rst, 

343 "doc_yaml": doc_yaml, 

344 "task_display_name": task_display_name, 

345 "logger_name": logger_name, 

346 "allow_nested_operators": allow_nested_operators, 

347 } 

348 

349 # Inject DAG-level default args into args provided to this function. 

350 partial_kwargs.update((k, v) for k, v in dag_default_args.items() if partial_kwargs.get(k) is NOTSET) 

351 

352 # Fill fields not provided by the user with default values. 

353 partial_kwargs = {k: _PARTIAL_DEFAULTS.get(k) if v is NOTSET else v for k, v in partial_kwargs.items()} 

354 

355 # Post-process arguments. Should be kept in sync with _TaskDecorator.expand(). 

356 if "task_concurrency" in kwargs: # Reject deprecated option. 

357 raise TypeError("unexpected argument: task_concurrency") 

358 if partial_kwargs["wait_for_downstream"]: 

359 partial_kwargs["depends_on_past"] = True 

360 partial_kwargs["start_date"] = timezone.convert_to_utc(partial_kwargs["start_date"]) 

361 partial_kwargs["end_date"] = timezone.convert_to_utc(partial_kwargs["end_date"]) 

362 if partial_kwargs["pool"] is None: 

363 partial_kwargs["pool"] = Pool.DEFAULT_POOL_NAME 

364 partial_kwargs["retries"] = parse_retries(partial_kwargs["retries"]) 

365 partial_kwargs["retry_delay"] = coerce_timedelta(partial_kwargs["retry_delay"], key="retry_delay") 

366 if partial_kwargs["max_retry_delay"] is not None: 

367 partial_kwargs["max_retry_delay"] = coerce_timedelta( 

368 partial_kwargs["max_retry_delay"], 

369 key="max_retry_delay", 

370 ) 

371 partial_kwargs["executor_config"] = partial_kwargs["executor_config"] or {} 

372 partial_kwargs["resources"] = coerce_resources(partial_kwargs["resources"]) 

373 

374 return OperatorPartial( 

375 operator_class=operator_class, 

376 kwargs=partial_kwargs, 

377 params=partial_params, 

378 ) 

379 

380 

381class ExecutorSafeguard: 

382 """ 

383 The ExecutorSafeguard decorator. 

384 

385 Checks if the execute method of an operator isn't manually called outside 

386 the TaskInstance as we want to avoid bad mixing between decorated and 

387 classic operators. 

388 """ 

389 

390 test_mode = conf.getboolean("core", "unit_test_mode") 

391 

392 @classmethod 

393 def decorator(cls, func): 

394 @wraps(func) 

395 def wrapper(self, *args, **kwargs): 

396 from airflow.decorators.base import DecoratedOperator 

397 

398 sentinel = kwargs.pop(f"{self.__class__.__name__}__sentinel", None) 

399 

400 if not cls.test_mode and not sentinel == _sentinel and not isinstance(self, DecoratedOperator): 

401 message = f"{self.__class__.__name__}.{func.__name__} cannot be called outside TaskInstance!" 

402 if not self.allow_nested_operators: 

403 raise AirflowException(message) 

404 self.log.warning(message) 

405 return func(self, *args, **kwargs) 

406 

407 return wrapper 

408 

409 

410class BaseOperatorMeta(abc.ABCMeta): 

411 """Metaclass of BaseOperator.""" 

412 

413 @classmethod 

414 def _apply_defaults(cls, func: T) -> T: 

415 """ 

416 Look for an argument named "default_args", and fill the unspecified arguments from it. 

417 

418 Since python2.* isn't clear about which arguments are missing when 

419 calling a function, and that this can be quite confusing with multi-level 

420 inheritance and argument defaults, this decorator also alerts with 

421 specific information about the missing arguments. 

422 """ 

423 # Cache inspect.signature for the wrapper closure to avoid calling it 

424 # at every decorated invocation. This is separate sig_cache created 

425 # per decoration, i.e. each function decorated using apply_defaults will 

426 # have a different sig_cache. 

427 sig_cache = inspect.signature(func) 

428 non_variadic_params = { 

429 name: param 

430 for (name, param) in sig_cache.parameters.items() 

431 if param.name != "self" and param.kind not in (param.VAR_POSITIONAL, param.VAR_KEYWORD) 

432 } 

433 non_optional_args = { 

434 name 

435 for name, param in non_variadic_params.items() 

436 if param.default == param.empty and name != "task_id" 

437 } 

438 

439 fixup_decorator_warning_stack(func) 

440 

441 @wraps(func) 

442 def apply_defaults(self: BaseOperator, *args: Any, **kwargs: Any) -> Any: 

443 from airflow.models.dag import DagContext 

444 from airflow.utils.task_group import TaskGroupContext 

445 

446 if args: 

447 raise AirflowException("Use keyword arguments when initializing operators") 

448 

449 instantiated_from_mapped = kwargs.pop( 

450 "_airflow_from_mapped", 

451 getattr(self, "_BaseOperator__from_mapped", False), 

452 ) 

453 

454 dag: DAG | None = kwargs.get("dag") or DagContext.get_current_dag() 

455 task_group: TaskGroup | None = kwargs.get("task_group") 

456 if dag and not task_group: 

457 task_group = TaskGroupContext.get_current_task_group(dag) 

458 

459 default_args, merged_params = get_merged_defaults( 

460 dag=dag, 

461 task_group=task_group, 

462 task_params=kwargs.pop("params", None), 

463 task_default_args=kwargs.pop("default_args", None), 

464 ) 

465 

466 for arg in sig_cache.parameters: 

467 if arg not in kwargs and arg in default_args: 

468 kwargs[arg] = default_args[arg] 

469 

470 missing_args = non_optional_args.difference(kwargs) 

471 if len(missing_args) == 1: 

472 raise AirflowException(f"missing keyword argument {missing_args.pop()!r}") 

473 elif missing_args: 

474 display = ", ".join(repr(a) for a in sorted(missing_args)) 

475 raise AirflowException(f"missing keyword arguments {display}") 

476 

477 if merged_params: 

478 kwargs["params"] = merged_params 

479 

480 hook = getattr(self, "_hook_apply_defaults", None) 

481 if hook: 

482 args, kwargs = hook(**kwargs, default_args=default_args) 

483 default_args = kwargs.pop("default_args", {}) 

484 

485 if not hasattr(self, "_BaseOperator__init_kwargs"): 

486 self._BaseOperator__init_kwargs = {} 

487 self._BaseOperator__from_mapped = instantiated_from_mapped 

488 

489 result = func(self, **kwargs, default_args=default_args) 

490 

491 # Store the args passed to init -- we need them to support task.map serialization! 

492 self._BaseOperator__init_kwargs.update(kwargs) # type: ignore 

493 

494 # Set upstream task defined by XComArgs passed to template fields of the operator. 

495 # BUT: only do this _ONCE_, not once for each class in the hierarchy 

496 if not instantiated_from_mapped and func == self.__init__.__wrapped__: # type: ignore[misc] 

497 self.set_xcomargs_dependencies() 

498 # Mark instance as instantiated. 

499 self._BaseOperator__instantiated = True 

500 

501 return result 

502 

503 apply_defaults.__non_optional_args = non_optional_args # type: ignore 

504 apply_defaults.__param_names = set(non_variadic_params) # type: ignore 

505 

506 return cast(T, apply_defaults) 

507 

508 def __new__(cls, name, bases, namespace, **kwargs): 

509 execute_method = namespace.get("execute") 

510 if callable(execute_method) and not getattr(execute_method, "__isabstractmethod__", False): 

511 namespace["execute"] = ExecutorSafeguard().decorator(execute_method) 

512 new_cls = super().__new__(cls, name, bases, namespace, **kwargs) 

513 with contextlib.suppress(KeyError): 

514 # Update the partial descriptor with the class method, so it calls the actual function 

515 # (but let subclasses override it if they need to) 

516 partial_desc = vars(new_cls)["partial"] 

517 if isinstance(partial_desc, _PartialDescriptor): 

518 partial_desc.class_method = classmethod(partial) 

519 new_cls.__init__ = cls._apply_defaults(new_cls.__init__) 

520 return new_cls 

521 

522 

523@total_ordering 

524class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta): 

525 r""" 

526 Abstract base class for all operators. 

527 

528 Since operators create objects that become nodes in the DAG, BaseOperator 

529 contains many recursive methods for DAG crawling behavior. To derive from 

530 this class, you are expected to override the constructor and the 'execute' 

531 method. 

532 

533 Operators derived from this class should perform or trigger certain tasks 

534 synchronously (wait for completion). Example of operators could be an 

535 operator that runs a Pig job (PigOperator), a sensor operator that 

536 waits for a partition to land in Hive (HiveSensorOperator), or one that 

537 moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these 

538 operators (tasks) target specific operations, running specific scripts, 

539 functions or data transfers. 

540 

541 This class is abstract and shouldn't be instantiated. Instantiating a 

542 class derived from this one results in the creation of a task object, 

543 which ultimately becomes a node in DAG objects. Task dependencies should 

544 be set by using the set_upstream and/or set_downstream methods. 

545 

546 :param task_id: a unique, meaningful id for the task 

547 :param owner: the owner of the task. Using a meaningful description 

548 (e.g. user/person/team/role name) to clarify ownership is recommended. 

549 :param email: the 'to' email address(es) used in email alerts. This can be a 

550 single email or multiple ones. Multiple addresses can be specified as a 

551 comma or semicolon separated string or by passing a list of strings. 

552 :param email_on_retry: Indicates whether email alerts should be sent when a 

553 task is retried 

554 :param email_on_failure: Indicates whether email alerts should be sent when 

555 a task failed 

556 :param retries: the number of retries that should be performed before 

557 failing the task 

558 :param retry_delay: delay between retries, can be set as ``timedelta`` or 

559 ``float`` seconds, which will be converted into ``timedelta``, 

560 the default is ``timedelta(seconds=300)``. 

561 :param retry_exponential_backoff: allow progressively longer waits between 

562 retries by using exponential backoff algorithm on retry delay (delay 

563 will be converted into seconds) 

564 :param max_retry_delay: maximum delay interval between retries, can be set as 

565 ``timedelta`` or ``float`` seconds, which will be converted into ``timedelta``. 

566 :param start_date: The ``start_date`` for the task, determines 

567 the ``execution_date`` for the first task instance. The best practice 

568 is to have the start_date rounded 

569 to your DAG's ``schedule_interval``. Daily jobs have their start_date 

570 some day at 00:00:00, hourly jobs have their start_date at 00:00 

571 of a specific hour. Note that Airflow simply looks at the latest 

572 ``execution_date`` and adds the ``schedule_interval`` to determine 

573 the next ``execution_date``. It is also very important 

574 to note that different tasks' dependencies 

575 need to line up in time. If task A depends on task B and their 

576 start_date are offset in a way that their execution_date don't line 

577 up, A's dependencies will never be met. If you are looking to delay 

578 a task, for example running a daily task at 2AM, look into the 

579 ``TimeSensor`` and ``TimeDeltaSensor``. We advise against using 

580 dynamic ``start_date`` and recommend using fixed ones. Read the 

581 FAQ entry about start_date for more information. 

582 :param end_date: if specified, the scheduler won't go beyond this date 

583 :param depends_on_past: when set to true, task instances will run 

584 sequentially and only if the previous instance has succeeded or has been skipped. 

585 The task instance for the start_date is allowed to run. 

586 :param wait_for_past_depends_before_skipping: when set to true, if the task instance 

587 should be marked as skipped, and depends_on_past is true, the ti will stay on None state 

588 waiting the task of the previous run 

589 :param wait_for_downstream: when set to true, an instance of task 

590 X will wait for tasks immediately downstream of the previous instance 

591 of task X to finish successfully or be skipped before it runs. This is useful if the 

592 different instances of a task X alter the same asset, and this asset 

593 is used by tasks downstream of task X. Note that depends_on_past 

594 is forced to True wherever wait_for_downstream is used. Also note that 

595 only tasks *immediately* downstream of the previous task instance are waited 

596 for; the statuses of any tasks further downstream are ignored. 

597 :param dag: a reference to the dag the task is attached to (if any) 

598 :param priority_weight: priority weight of this task against other task. 

599 This allows the executor to trigger higher priority tasks before 

600 others when things get backed up. Set priority_weight as a higher 

601 number for more important tasks. 

602 :param weight_rule: weighting method used for the effective total 

603 priority weight of the task. Options are: 

604 ``{ downstream | upstream | absolute }`` default is ``downstream`` 

605 When set to ``downstream`` the effective weight of the task is the 

606 aggregate sum of all downstream descendants. As a result, upstream 

607 tasks will have higher weight and will be scheduled more aggressively 

608 when using positive weight values. This is useful when you have 

609 multiple dag run instances and desire to have all upstream tasks to 

610 complete for all runs before each dag can continue processing 

611 downstream tasks. When set to ``upstream`` the effective weight is the 

612 aggregate sum of all upstream ancestors. This is the opposite where 

613 downstream tasks have higher weight and will be scheduled more 

614 aggressively when using positive weight values. This is useful when you 

615 have multiple dag run instances and prefer to have each dag complete 

616 before starting upstream tasks of other dags. When set to 

617 ``absolute``, the effective weight is the exact ``priority_weight`` 

618 specified without additional weighting. You may want to do this when 

619 you know exactly what priority weight each task should have. 

620 Additionally, when set to ``absolute``, there is bonus effect of 

621 significantly speeding up the task creation process as for very large 

622 DAGs. Options can be set as string or using the constants defined in 

623 the static class ``airflow.utils.WeightRule`` 

624 |experimental| 

625 Since 2.9.0, Airflow allows to define custom priority weight strategy, 

626 by creating a subclass of 

627 ``airflow.task.priority_strategy.PriorityWeightStrategy`` and registering 

628 in a plugin, then providing the class path or the class instance via 

629 ``weight_rule`` parameter. The custom priority weight strategy will be 

630 used to calculate the effective total priority weight of the task instance. 

631 :param queue: which queue to target when running this job. Not 

632 all executors implement queue management, the CeleryExecutor 

633 does support targeting specific queues. 

634 :param pool: the slot pool this task should run in, slot pools are a 

635 way to limit concurrency for certain tasks 

636 :param pool_slots: the number of pool slots this task should use (>= 1) 

637 Values less than 1 are not allowed. 

638 :param sla: time by which the job is expected to succeed. Note that 

639 this represents the ``timedelta`` after the period is closed. For 

640 example if you set an SLA of 1 hour, the scheduler would send an email 

641 soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance 

642 has not succeeded yet. 

643 The scheduler pays special attention for jobs with an SLA and 

644 sends alert 

645 emails for SLA misses. SLA misses are also recorded in the database 

646 for future reference. All tasks that share the same SLA time 

647 get bundled in a single email, sent soon after that time. SLA 

648 notification are sent once and only once for each task instance. 

649 :param execution_timeout: max time allowed for the execution of 

650 this task instance, if it goes beyond it will raise and fail. 

651 :param on_failure_callback: a function or list of functions to be called when a task instance 

652 of this task fails. a context dictionary is passed as a single 

653 parameter to this function. Context contains references to related 

654 objects to the task instance and is documented under the macros 

655 section of the API. 

656 :param on_execute_callback: much like the ``on_failure_callback`` except 

657 that it is executed right before the task is executed. 

658 :param on_retry_callback: much like the ``on_failure_callback`` except 

659 that it is executed when retries occur. 

660 :param on_success_callback: much like the ``on_failure_callback`` except 

661 that it is executed when the task succeeds. 

662 :param on_skipped_callback: much like the ``on_failure_callback`` except 

663 that it is executed when skipped occur; this callback will be called only if AirflowSkipException get raised. 

664 Explicitly it is NOT called if a task is not started to be executed because of a preceding branching 

665 decision in the DAG or a trigger rule which causes execution to skip so that the task execution 

666 is never scheduled. 

667 :param pre_execute: a function to be called immediately before task 

668 execution, receiving a context dictionary; raising an exception will 

669 prevent the task from being executed. 

670 

671 |experimental| 

672 :param post_execute: a function to be called immediately after task 

673 execution, receiving a context dictionary and task result; raising an 

674 exception will prevent the task from succeeding. 

675 

676 |experimental| 

677 :param trigger_rule: defines the rule by which dependencies are applied 

678 for the task to get triggered. Options are: 

679 ``{ all_success | all_failed | all_done | all_skipped | one_success | one_done | 

680 one_failed | none_failed | none_failed_min_one_success | none_skipped | always}`` 

681 default is ``all_success``. Options can be set as string or 

682 using the constants defined in the static class 

683 ``airflow.utils.TriggerRule`` 

684 :param resources: A map of resource parameter names (the argument names of the 

685 Resources constructor) to their values. 

686 :param run_as_user: unix username to impersonate while running the task 

687 :param max_active_tis_per_dag: When set, a task will be able to limit the concurrent 

688 runs across execution_dates. 

689 :param max_active_tis_per_dagrun: When set, a task will be able to limit the concurrent 

690 task instances per DAG run. 

691 :param executor: Which executor to target when running this task. NOT YET SUPPORTED 

692 :param executor_config: Additional task-level configuration parameters that are 

693 interpreted by a specific executor. Parameters are namespaced by the name of 

694 executor. 

695 

696 **Example**: to run this task in a specific docker container through 

697 the KubernetesExecutor :: 

698 

699 MyOperator(..., executor_config={"KubernetesExecutor": {"image": "myCustomDockerImage"}}) 

700 

701 :param do_xcom_push: if True, an XCom is pushed containing the Operator's 

702 result 

703 :param multiple_outputs: if True and do_xcom_push is True, pushes multiple XComs, one for each 

704 key in the returned dictionary result. If False and do_xcom_push is True, pushes a single XCom. 

705 :param task_group: The TaskGroup to which the task should belong. This is typically provided when not 

706 using a TaskGroup as a context manager. 

707 :param doc: Add documentation or notes to your Task objects that is visible in 

708 Task Instance details View in the Webserver 

709 :param doc_md: Add documentation (in Markdown format) or notes to your Task objects 

710 that is visible in Task Instance details View in the Webserver 

711 :param doc_rst: Add documentation (in RST format) or notes to your Task objects 

712 that is visible in Task Instance details View in the Webserver 

713 :param doc_json: Add documentation (in JSON format) or notes to your Task objects 

714 that is visible in Task Instance details View in the Webserver 

715 :param doc_yaml: Add documentation (in YAML format) or notes to your Task objects 

716 that is visible in Task Instance details View in the Webserver 

717 :param task_display_name: The display name of the task which appears on the UI. 

718 :param logger_name: Name of the logger used by the Operator to emit logs. 

719 If set to `None` (default), the logger name will fall back to 

720 `airflow.task.operators.{class.__module__}.{class.__name__}` (e.g. SimpleHttpOperator will have 

721 *airflow.task.operators.airflow.providers.http.operators.http.SimpleHttpOperator* as logger). 

722 :param allow_nested_operators: if True, when an operator is executed within another one a warning message 

723 will be logged. If False, then an exception will be raised if the operator is badly used (e.g. nested 

724 within another one). In future releases of Airflow this parameter will be removed and an exception 

725 will always be thrown when operators are nested within each other (default is True). 

726 

727 **Example**: example of a bad operator mixin usage:: 

728 

729 @task(provide_context=True) 

730 def say_hello_world(**context): 

731 hello_world_task = BashOperator( 

732 task_id="hello_world_task", 

733 bash_command="python -c \"print('Hello, world!')\"", 

734 dag=dag, 

735 ) 

736 hello_world_task.execute(context) 

737 """ 

738 

739 # Implementing Operator. 

740 template_fields: Sequence[str] = () 

741 template_ext: Sequence[str] = () 

742 

743 template_fields_renderers: dict[str, str] = {} 

744 

745 # Defines the color in the UI 

746 ui_color: str = "#fff" 

747 ui_fgcolor: str = "#000" 

748 

749 pool: str = "" 

750 

751 # base list which includes all the attrs that don't need deep copy. 

752 _base_operator_shallow_copy_attrs: tuple[str, ...] = ( 

753 "user_defined_macros", 

754 "user_defined_filters", 

755 "params", 

756 ) 

757 

758 # each operator should override this class attr for shallow copy attrs. 

759 shallow_copy_attrs: Sequence[str] = () 

760 

761 # Defines the operator level extra links 

762 operator_extra_links: Collection[BaseOperatorLink] = () 

763 

764 # The _serialized_fields are lazily loaded when get_serialized_fields() method is called 

765 __serialized_fields: frozenset[str] | None = None 

766 

767 partial: Callable[..., OperatorPartial] = _PartialDescriptor() # type: ignore 

768 

769 _comps = { 

770 "task_id", 

771 "dag_id", 

772 "owner", 

773 "email", 

774 "email_on_retry", 

775 "retry_delay", 

776 "retry_exponential_backoff", 

777 "max_retry_delay", 

778 "start_date", 

779 "end_date", 

780 "depends_on_past", 

781 "wait_for_downstream", 

782 "priority_weight", 

783 "sla", 

784 "execution_timeout", 

785 "on_execute_callback", 

786 "on_failure_callback", 

787 "on_success_callback", 

788 "on_retry_callback", 

789 "on_skipped_callback", 

790 "do_xcom_push", 

791 "multiple_outputs", 

792 "allow_nested_operators", 

793 "executor", 

794 } 

795 

796 # Defines if the operator supports lineage without manual definitions 

797 supports_lineage = False 

798 

799 # If True then the class constructor was called 

800 __instantiated = False 

801 # List of args as passed to `init()`, after apply_defaults() has been updated. Used to "recreate" the task 

802 # when mapping 

803 __init_kwargs: dict[str, Any] 

804 

805 # Set to True before calling execute method 

806 _lock_for_execution = False 

807 

808 _dag: DAG | None = None 

809 task_group: TaskGroup | None = None 

810 

811 # subdag parameter is only set for SubDagOperator. 

812 # Setting it to None by default as other Operators do not have that field 

813 subdag: DAG | None = None 

814 

815 start_date: pendulum.DateTime | None = None 

816 end_date: pendulum.DateTime | None = None 

817 

818 # Set to True for an operator instantiated by a mapped operator. 

819 __from_mapped = False 

820 

821 start_trigger: BaseTrigger | None = None 

822 next_method: str | None = None 

823 

824 def __init__( 

825 self, 

826 task_id: str, 

827 owner: str = DEFAULT_OWNER, 

828 email: str | Iterable[str] | None = None, 

829 email_on_retry: bool = conf.getboolean("email", "default_email_on_retry", fallback=True), 

830 email_on_failure: bool = conf.getboolean("email", "default_email_on_failure", fallback=True), 

831 retries: int | None = DEFAULT_RETRIES, 

832 retry_delay: timedelta | float = DEFAULT_RETRY_DELAY, 

833 retry_exponential_backoff: bool = False, 

834 max_retry_delay: timedelta | float | None = None, 

835 start_date: datetime | None = None, 

836 end_date: datetime | None = None, 

837 depends_on_past: bool = False, 

838 ignore_first_depends_on_past: bool = DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, 

839 wait_for_past_depends_before_skipping: bool = DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, 

840 wait_for_downstream: bool = False, 

841 dag: DAG | None = None, 

842 params: collections.abc.MutableMapping | None = None, 

843 default_args: dict | None = None, 

844 priority_weight: int = DEFAULT_PRIORITY_WEIGHT, 

845 weight_rule: str | PriorityWeightStrategy = DEFAULT_WEIGHT_RULE, 

846 queue: str = DEFAULT_QUEUE, 

847 pool: str | None = None, 

848 pool_slots: int = DEFAULT_POOL_SLOTS, 

849 sla: timedelta | None = None, 

850 execution_timeout: timedelta | None = DEFAULT_TASK_EXECUTION_TIMEOUT, 

851 on_execute_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None, 

852 on_failure_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None, 

853 on_success_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None, 

854 on_retry_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None, 

855 on_skipped_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] = None, 

856 pre_execute: TaskPreExecuteHook | None = None, 

857 post_execute: TaskPostExecuteHook | None = None, 

858 trigger_rule: str = DEFAULT_TRIGGER_RULE, 

859 resources: dict[str, Any] | None = None, 

860 run_as_user: str | None = None, 

861 task_concurrency: int | None = None, 

862 map_index_template: str | None = None, 

863 max_active_tis_per_dag: int | None = None, 

864 max_active_tis_per_dagrun: int | None = None, 

865 executor: str | None = None, 

866 executor_config: dict | None = None, 

867 do_xcom_push: bool = True, 

868 multiple_outputs: bool = False, 

869 inlets: Any | None = None, 

870 outlets: Any | None = None, 

871 task_group: TaskGroup | None = None, 

872 doc: str | None = None, 

873 doc_md: str | None = None, 

874 doc_json: str | None = None, 

875 doc_yaml: str | None = None, 

876 doc_rst: str | None = None, 

877 task_display_name: str | None = None, 

878 logger_name: str | None = None, 

879 allow_nested_operators: bool = True, 

880 **kwargs, 

881 ): 

882 from airflow.models.dag import DagContext 

883 from airflow.utils.task_group import TaskGroupContext 

884 

885 self.__init_kwargs = {} 

886 

887 super().__init__() 

888 

889 kwargs.pop("_airflow_mapped_validation_only", None) 

890 if kwargs: 

891 if not conf.getboolean("operators", "ALLOW_ILLEGAL_ARGUMENTS"): 

892 raise AirflowException( 

893 f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). " 

894 f"Invalid arguments were:\n**kwargs: {kwargs}", 

895 ) 

896 warnings.warn( 

897 f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). " 

898 "Support for passing such arguments will be dropped in future. " 

899 f"Invalid arguments were:\n**kwargs: {kwargs}", 

900 category=RemovedInAirflow3Warning, 

901 stacklevel=3, 

902 ) 

903 validate_key(task_id) 

904 

905 dag = dag or DagContext.get_current_dag() 

906 task_group = task_group or TaskGroupContext.get_current_task_group(dag) 

907 

908 self.task_id = task_group.child_id(task_id) if task_group else task_id 

909 if not self.__from_mapped and task_group: 

910 task_group.add(self) 

911 

912 self.owner = owner 

913 self.email = email 

914 self.email_on_retry = email_on_retry 

915 self.email_on_failure = email_on_failure 

916 

917 if execution_timeout is not None and not isinstance(execution_timeout, timedelta): 

918 raise ValueError( 

919 f"execution_timeout must be timedelta object but passed as type: {type(execution_timeout)}" 

920 ) 

921 self.execution_timeout = execution_timeout 

922 

923 self.on_execute_callback = on_execute_callback 

924 self.on_failure_callback = on_failure_callback 

925 self.on_success_callback = on_success_callback 

926 self.on_retry_callback = on_retry_callback 

927 self.on_skipped_callback = on_skipped_callback 

928 self._pre_execute_hook = pre_execute 

929 self._post_execute_hook = post_execute 

930 

931 if start_date and not isinstance(start_date, datetime): 

932 self.log.warning("start_date for %s isn't datetime.datetime", self) 

933 elif start_date: 

934 self.start_date = timezone.convert_to_utc(start_date) 

935 

936 if end_date: 

937 self.end_date = timezone.convert_to_utc(end_date) 

938 

939 if executor: 

940 warnings.warn( 

941 "Specifying executors for operators is not yet" 

942 f"supported, the value {executor!r} will have no effect", 

943 category=UserWarning, 

944 stacklevel=2, 

945 ) 

946 self.executor = executor 

947 self.executor_config = executor_config or {} 

948 self.run_as_user = run_as_user 

949 self.retries = parse_retries(retries) 

950 self.queue = queue 

951 self.pool = Pool.DEFAULT_POOL_NAME if pool is None else pool 

952 self.pool_slots = pool_slots 

953 if self.pool_slots < 1: 

954 dag_str = f" in dag {dag.dag_id}" if dag else "" 

955 raise ValueError(f"pool slots for {self.task_id}{dag_str} cannot be less than 1") 

956 self.sla = sla 

957 

958 if trigger_rule == "dummy": 

959 warnings.warn( 

960 "dummy Trigger Rule is deprecated. Please use `TriggerRule.ALWAYS`.", 

961 RemovedInAirflow3Warning, 

962 stacklevel=2, 

963 ) 

964 trigger_rule = TriggerRule.ALWAYS 

965 

966 if trigger_rule == "none_failed_or_skipped": 

967 warnings.warn( 

968 "none_failed_or_skipped Trigger Rule is deprecated. " 

969 "Please use `none_failed_min_one_success`.", 

970 RemovedInAirflow3Warning, 

971 stacklevel=2, 

972 ) 

973 trigger_rule = TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS 

974 

975 if not TriggerRule.is_valid(trigger_rule): 

976 raise AirflowException( 

977 f"The trigger_rule must be one of {TriggerRule.all_triggers()}," 

978 f"'{dag.dag_id if dag else ''}.{task_id}'; received '{trigger_rule}'." 

979 ) 

980 

981 self.trigger_rule: TriggerRule = TriggerRule(trigger_rule) 

982 FailStopDagInvalidTriggerRule.check(dag=dag, trigger_rule=self.trigger_rule) 

983 

984 self.depends_on_past: bool = depends_on_past 

985 self.ignore_first_depends_on_past: bool = ignore_first_depends_on_past 

986 self.wait_for_past_depends_before_skipping: bool = wait_for_past_depends_before_skipping 

987 self.wait_for_downstream: bool = wait_for_downstream 

988 if wait_for_downstream: 

989 self.depends_on_past = True 

990 

991 self.retry_delay = coerce_timedelta(retry_delay, key="retry_delay") 

992 self.retry_exponential_backoff = retry_exponential_backoff 

993 self.max_retry_delay = ( 

994 max_retry_delay 

995 if max_retry_delay is None 

996 else coerce_timedelta(max_retry_delay, key="max_retry_delay") 

997 ) 

998 

999 # At execution_time this becomes a normal dict 

1000 self.params: ParamsDict | dict = ParamsDict(params) 

1001 if priority_weight is not None and not isinstance(priority_weight, int): 

1002 raise AirflowException( 

1003 f"`priority_weight` for task '{self.task_id}' only accepts integers, " 

1004 f"received '{type(priority_weight)}'." 

1005 ) 

1006 self.priority_weight = priority_weight 

1007 self.weight_rule = validate_and_load_priority_weight_strategy(weight_rule) 

1008 self.resources = coerce_resources(resources) 

1009 if task_concurrency and not max_active_tis_per_dag: 

1010 # TODO: Remove in Airflow 3.0 

1011 warnings.warn( 

1012 "The 'task_concurrency' parameter is deprecated. Please use 'max_active_tis_per_dag'.", 

1013 RemovedInAirflow3Warning, 

1014 stacklevel=2, 

1015 ) 

1016 max_active_tis_per_dag = task_concurrency 

1017 self.max_active_tis_per_dag: int | None = max_active_tis_per_dag 

1018 self.max_active_tis_per_dagrun: int | None = max_active_tis_per_dagrun 

1019 self.do_xcom_push: bool = do_xcom_push 

1020 self.map_index_template: str | None = map_index_template 

1021 self.multiple_outputs: bool = multiple_outputs 

1022 

1023 self.doc_md = doc_md 

1024 self.doc_json = doc_json 

1025 self.doc_yaml = doc_yaml 

1026 self.doc_rst = doc_rst 

1027 self.doc = doc 

1028 # Populate the display field only if provided and different from task id 

1029 self._task_display_property_value = ( 

1030 task_display_name if task_display_name and task_display_name != task_id else None 

1031 ) 

1032 

1033 self.upstream_task_ids: set[str] = set() 

1034 self.downstream_task_ids: set[str] = set() 

1035 

1036 if dag: 

1037 self.dag = dag 

1038 

1039 self._log_config_logger_name = "airflow.task.operators" 

1040 self._logger_name = logger_name 

1041 self.allow_nested_operators: bool = allow_nested_operators 

1042 

1043 # Lineage 

1044 self.inlets: list = [] 

1045 self.outlets: list = [] 

1046 

1047 if inlets: 

1048 self.inlets = ( 

1049 inlets 

1050 if isinstance(inlets, list) 

1051 else [ 

1052 inlets, 

1053 ] 

1054 ) 

1055 

1056 if outlets: 

1057 self.outlets = ( 

1058 outlets 

1059 if isinstance(outlets, list) 

1060 else [ 

1061 outlets, 

1062 ] 

1063 ) 

1064 

1065 if isinstance(self.template_fields, str): 

1066 warnings.warn( 

1067 f"The `template_fields` value for {self.task_type} is a string " 

1068 "but should be a list or tuple of string. Wrapping it in a list for execution. " 

1069 f"Please update {self.task_type} accordingly.", 

1070 UserWarning, 

1071 stacklevel=2, 

1072 ) 

1073 self.template_fields = [self.template_fields] 

1074 

1075 self._is_setup = False 

1076 self._is_teardown = False 

1077 if SetupTeardownContext.active: 

1078 SetupTeardownContext.update_context_map(self) 

1079 

1080 def __eq__(self, other): 

1081 if type(self) is type(other): 

1082 # Use getattr() instead of __dict__ as __dict__ doesn't return 

1083 # correct values for properties. 

1084 return all(getattr(self, c, None) == getattr(other, c, None) for c in self._comps) 

1085 return False 

1086 

1087 def __ne__(self, other): 

1088 return not self == other 

1089 

1090 def __hash__(self): 

1091 hash_components = [type(self)] 

1092 for component in self._comps: 

1093 val = getattr(self, component, None) 

1094 try: 

1095 hash(val) 

1096 hash_components.append(val) 

1097 except TypeError: 

1098 hash_components.append(repr(val)) 

1099 return hash(tuple(hash_components)) 

1100 

1101 # including lineage information 

1102 def __or__(self, other): 

1103 """ 

1104 Return [This Operator] | [Operator]. 

1105 

1106 The inlets of other will be set to pick up the outlets from this operator. 

1107 Other will be set as a downstream task of this operator. 

1108 """ 

1109 if isinstance(other, BaseOperator): 

1110 if not self.outlets and not self.supports_lineage: 

1111 raise ValueError("No outlets defined for this operator") 

1112 other.add_inlets([self.task_id]) 

1113 self.set_downstream(other) 

1114 else: 

1115 raise TypeError(f"Right hand side ({other}) is not an Operator") 

1116 

1117 return self 

1118 

1119 # /Composing Operators --------------------------------------------- 

1120 

1121 def __gt__(self, other): 

1122 """ 

1123 Return [Operator] > [Outlet]. 

1124 

1125 If other is an attr annotated object it is set as an outlet of this Operator. 

1126 """ 

1127 if not isinstance(other, Iterable): 

1128 other = [other] 

1129 

1130 for obj in other: 

1131 if not attr.has(obj): 

1132 raise TypeError(f"Left hand side ({obj}) is not an outlet") 

1133 self.add_outlets(other) 

1134 

1135 return self 

1136 

1137 def __lt__(self, other): 

1138 """ 

1139 Return [Inlet] > [Operator] or [Operator] < [Inlet]. 

1140 

1141 If other is an attr annotated object it is set as an inlet to this operator. 

1142 """ 

1143 if not isinstance(other, Iterable): 

1144 other = [other] 

1145 

1146 for obj in other: 

1147 if not attr.has(obj): 

1148 raise TypeError(f"{obj} cannot be an inlet") 

1149 self.add_inlets(other) 

1150 

1151 return self 

1152 

1153 def __setattr__(self, key, value): 

1154 super().__setattr__(key, value) 

1155 if self.__from_mapped or self._lock_for_execution: 

1156 return # Skip any custom behavior for validation and during execute. 

1157 if key in self.__init_kwargs: 

1158 self.__init_kwargs[key] = value 

1159 if self.__instantiated and key in self.template_fields: 

1160 # Resolve upstreams set by assigning an XComArg after initializing 

1161 # an operator, example: 

1162 # op = BashOperator() 

1163 # op.bash_command = "sleep 1" 

1164 self.set_xcomargs_dependencies() 

1165 

1166 def add_inlets(self, inlets: Iterable[Any]): 

1167 """Set inlets to this operator.""" 

1168 self.inlets.extend(inlets) 

1169 

1170 def add_outlets(self, outlets: Iterable[Any]): 

1171 """Define the outlets of this operator.""" 

1172 self.outlets.extend(outlets) 

1173 

1174 def get_inlet_defs(self): 

1175 """Get inlet definitions on this task. 

1176 

1177 :meta private: 

1178 """ 

1179 return self.inlets 

1180 

1181 def get_outlet_defs(self): 

1182 """Get outlet definitions on this task. 

1183 

1184 :meta private: 

1185 """ 

1186 return self.outlets 

1187 

1188 def get_dag(self) -> DAG | None: 

1189 return self._dag 

1190 

1191 @property # type: ignore[override] 

1192 def dag(self) -> DAG: # type: ignore[override] 

1193 """Returns the Operator's DAG if set, otherwise raises an error.""" 

1194 if self._dag: 

1195 return self._dag 

1196 else: 

1197 raise AirflowException(f"Operator {self} has not been assigned to a DAG yet") 

1198 

1199 @dag.setter 

1200 def dag(self, dag: DAG | None): 

1201 """Operators can be assigned to one DAG, one time. Repeat assignments to that same DAG are ok.""" 

1202 from airflow.models.dag import DAG 

1203 

1204 if dag is None: 

1205 self._dag = None 

1206 return 

1207 if not isinstance(dag, DAG): 

1208 raise TypeError(f"Expected DAG; received {dag.__class__.__name__}") 

1209 elif self.has_dag() and self.dag is not dag: 

1210 raise AirflowException(f"The DAG assigned to {self} can not be changed.") 

1211 

1212 if self.__from_mapped: 

1213 pass # Don't add to DAG -- the mapped task takes the place. 

1214 elif dag.task_dict.get(self.task_id) is not self: 

1215 dag.add_task(self) 

1216 

1217 self._dag = dag 

1218 

1219 @property 

1220 def task_display_name(self) -> str: 

1221 return self._task_display_property_value or self.task_id 

1222 

1223 def has_dag(self): 

1224 """Return True if the Operator has been assigned to a DAG.""" 

1225 return self._dag is not None 

1226 

1227 deps: frozenset[BaseTIDep] = frozenset( 

1228 { 

1229 NotInRetryPeriodDep(), 

1230 PrevDagrunDep(), 

1231 TriggerRuleDep(), 

1232 NotPreviouslySkippedDep(), 

1233 MappedTaskUpstreamDep(), 

1234 } 

1235 ) 

1236 """ 

1237 Returns the set of dependencies for the operator. These differ from execution 

1238 context dependencies in that they are specific to tasks and can be 

1239 extended/overridden by subclasses. 

1240 """ 

1241 

1242 def prepare_for_execution(self) -> BaseOperator: 

1243 """Lock task for execution to disable custom action in ``__setattr__`` and return a copy.""" 

1244 other = copy.copy(self) 

1245 other._lock_for_execution = True 

1246 return other 

1247 

1248 def set_xcomargs_dependencies(self) -> None: 

1249 """ 

1250 Resolve upstream dependencies of a task. 

1251 

1252 In this way passing an ``XComArg`` as value for a template field 

1253 will result in creating upstream relation between two tasks. 

1254 

1255 **Example**: :: 

1256 

1257 with DAG(...): 

1258 generate_content = GenerateContentOperator(task_id="generate_content") 

1259 send_email = EmailOperator(..., html_content=generate_content.output) 

1260 

1261 # This is equivalent to 

1262 with DAG(...): 

1263 generate_content = GenerateContentOperator(task_id="generate_content") 

1264 send_email = EmailOperator(..., html_content="{{ task_instance.xcom_pull('generate_content') }}") 

1265 generate_content >> send_email 

1266 

1267 """ 

1268 from airflow.models.xcom_arg import XComArg 

1269 

1270 for field in self.template_fields: 

1271 if hasattr(self, field): 

1272 arg = getattr(self, field) 

1273 XComArg.apply_upstream_relationship(self, arg) 

1274 

1275 @prepare_lineage 

1276 def pre_execute(self, context: Any): 

1277 """Execute right before self.execute() is called.""" 

1278 if self._pre_execute_hook is None: 

1279 return 

1280 ExecutionCallableRunner( 

1281 self._pre_execute_hook, 

1282 context_get_outlet_events(context), 

1283 logger=self.log, 

1284 ).run(context) 

1285 

1286 def execute(self, context: Context) -> Any: 

1287 """ 

1288 Derive when creating an operator. 

1289 

1290 Context is the same dictionary used as when rendering jinja templates. 

1291 

1292 Refer to get_template_context for more context. 

1293 """ 

1294 raise NotImplementedError() 

1295 

1296 @apply_lineage 

1297 def post_execute(self, context: Any, result: Any = None): 

1298 """ 

1299 Execute right after self.execute() is called. 

1300 

1301 It is passed the execution context and any results returned by the operator. 

1302 """ 

1303 if self._post_execute_hook is None: 

1304 return 

1305 ExecutionCallableRunner( 

1306 self._post_execute_hook, 

1307 context_get_outlet_events(context), 

1308 logger=self.log, 

1309 ).run(context, result) 

1310 

1311 def on_kill(self) -> None: 

1312 """ 

1313 Override this method to clean up subprocesses when a task instance gets killed. 

1314 

1315 Any use of the threading, subprocess or multiprocessing module within an 

1316 operator needs to be cleaned up, or it will leave ghost processes behind. 

1317 """ 

1318 

1319 def __deepcopy__(self, memo): 

1320 # Hack sorting double chained task lists by task_id to avoid hitting 

1321 # max_depth on deepcopy operations. 

1322 sys.setrecursionlimit(5000) # TODO fix this in a better way 

1323 

1324 cls = self.__class__ 

1325 result = cls.__new__(cls) 

1326 memo[id(self)] = result 

1327 

1328 shallow_copy = cls.shallow_copy_attrs + cls._base_operator_shallow_copy_attrs 

1329 

1330 for k, v in self.__dict__.items(): 

1331 if k == "_BaseOperator__instantiated": 

1332 # Don't set this until the _end_, as it changes behaviour of __setattr__ 

1333 continue 

1334 if k not in shallow_copy: 

1335 setattr(result, k, copy.deepcopy(v, memo)) 

1336 else: 

1337 setattr(result, k, copy.copy(v)) 

1338 result.__instantiated = self.__instantiated 

1339 return result 

1340 

1341 def __getstate__(self): 

1342 state = dict(self.__dict__) 

1343 if self._log: 

1344 del state["_log"] 

1345 

1346 return state 

1347 

1348 def __setstate__(self, state): 

1349 self.__dict__ = state 

1350 

1351 def render_template_fields( 

1352 self, 

1353 context: Context, 

1354 jinja_env: jinja2.Environment | None = None, 

1355 ) -> None: 

1356 """Template all attributes listed in *self.template_fields*. 

1357 

1358 This mutates the attributes in-place and is irreversible. 

1359 

1360 :param context: Context dict with values to apply on content. 

1361 :param jinja_env: Jinja's environment to use for rendering. 

1362 """ 

1363 if not jinja_env: 

1364 jinja_env = self.get_template_env() 

1365 self._do_render_template_fields(self, self.template_fields, context, jinja_env, set()) 

1366 

1367 @provide_session 

1368 def clear( 

1369 self, 

1370 start_date: datetime | None = None, 

1371 end_date: datetime | None = None, 

1372 upstream: bool = False, 

1373 downstream: bool = False, 

1374 session: Session = NEW_SESSION, 

1375 ): 

1376 """Clear the state of task instances associated with the task, following the parameters specified.""" 

1377 qry = select(TaskInstance).where(TaskInstance.dag_id == self.dag_id) 

1378 

1379 if start_date: 

1380 qry = qry.where(TaskInstance.execution_date >= start_date) 

1381 if end_date: 

1382 qry = qry.where(TaskInstance.execution_date <= end_date) 

1383 

1384 tasks = [self.task_id] 

1385 

1386 if upstream: 

1387 tasks += [t.task_id for t in self.get_flat_relatives(upstream=True)] 

1388 

1389 if downstream: 

1390 tasks += [t.task_id for t in self.get_flat_relatives(upstream=False)] 

1391 

1392 qry = qry.where(TaskInstance.task_id.in_(tasks)) 

1393 results = session.scalars(qry).all() 

1394 count = len(results) 

1395 clear_task_instances(results, session, dag=self.dag) 

1396 session.commit() 

1397 return count 

1398 

1399 @provide_session 

1400 def get_task_instances( 

1401 self, 

1402 start_date: datetime | None = None, 

1403 end_date: datetime | None = None, 

1404 session: Session = NEW_SESSION, 

1405 ) -> list[TaskInstance]: 

1406 """Get task instances related to this task for a specific date range.""" 

1407 from airflow.models import DagRun 

1408 

1409 query = ( 

1410 select(TaskInstance) 

1411 .join(TaskInstance.dag_run) 

1412 .where(TaskInstance.dag_id == self.dag_id) 

1413 .where(TaskInstance.task_id == self.task_id) 

1414 ) 

1415 if start_date: 

1416 query = query.where(DagRun.execution_date >= start_date) 

1417 if end_date: 

1418 query = query.where(DagRun.execution_date <= end_date) 

1419 return session.scalars(query.order_by(DagRun.execution_date)).all() 

1420 

1421 @provide_session 

1422 def run( 

1423 self, 

1424 start_date: datetime | None = None, 

1425 end_date: datetime | None = None, 

1426 ignore_first_depends_on_past: bool = True, 

1427 wait_for_past_depends_before_skipping: bool = False, 

1428 ignore_ti_state: bool = False, 

1429 mark_success: bool = False, 

1430 test_mode: bool = False, 

1431 session: Session = NEW_SESSION, 

1432 ) -> None: 

1433 """Run a set of task instances for a date range.""" 

1434 from airflow.models import DagRun 

1435 from airflow.utils.types import DagRunType 

1436 

1437 # Assertions for typing -- we need a dag, for this function, and when we have a DAG we are 

1438 # _guaranteed_ to have start_date (else we couldn't have been added to a DAG) 

1439 if TYPE_CHECKING: 

1440 assert self.start_date 

1441 

1442 start_date = pendulum.instance(start_date or self.start_date) 

1443 end_date = pendulum.instance(end_date or self.end_date or timezone.utcnow()) 

1444 

1445 for info in self.dag.iter_dagrun_infos_between(start_date, end_date, align=False): 

1446 ignore_depends_on_past = info.logical_date == start_date and ignore_first_depends_on_past 

1447 try: 

1448 dag_run = session.scalars( 

1449 select(DagRun).where( 

1450 DagRun.dag_id == self.dag_id, 

1451 DagRun.execution_date == info.logical_date, 

1452 ) 

1453 ).one() 

1454 ti = TaskInstance(self, run_id=dag_run.run_id) 

1455 except NoResultFound: 

1456 # This is _mostly_ only used in tests 

1457 dr = DagRun( 

1458 dag_id=self.dag_id, 

1459 run_id=DagRun.generate_run_id(DagRunType.MANUAL, info.logical_date), 

1460 run_type=DagRunType.MANUAL, 

1461 execution_date=info.logical_date, 

1462 data_interval=info.data_interval, 

1463 ) 

1464 ti = TaskInstance(self, run_id=dr.run_id) 

1465 ti.dag_run = dr 

1466 session.add(dr) 

1467 session.flush() 

1468 

1469 ti.run( 

1470 mark_success=mark_success, 

1471 ignore_depends_on_past=ignore_depends_on_past, 

1472 wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping, 

1473 ignore_ti_state=ignore_ti_state, 

1474 test_mode=test_mode, 

1475 session=session, 

1476 ) 

1477 

1478 def dry_run(self) -> None: 

1479 """Perform dry run for the operator - just render template fields.""" 

1480 self.log.info("Dry run") 

1481 for field in self.template_fields: 

1482 try: 

1483 content = getattr(self, field) 

1484 except AttributeError: 

1485 raise AttributeError( 

1486 f"{field!r} is configured as a template field " 

1487 f"but {self.task_type} does not have this attribute." 

1488 ) 

1489 

1490 if content and isinstance(content, str): 

1491 self.log.info("Rendering template for %s", field) 

1492 self.log.info(content) 

1493 

1494 def get_direct_relatives(self, upstream: bool = False) -> Iterable[Operator]: 

1495 """Get list of the direct relatives to the current task, upstream or downstream.""" 

1496 if upstream: 

1497 return self.upstream_list 

1498 else: 

1499 return self.downstream_list 

1500 

1501 def __repr__(self): 

1502 return f"<Task({self.task_type}): {self.task_id}>" 

1503 

1504 @property 

1505 def operator_class(self) -> type[BaseOperator]: # type: ignore[override] 

1506 return self.__class__ 

1507 

1508 @property 

1509 def task_type(self) -> str: 

1510 """@property: type of the task.""" 

1511 return self.__class__.__name__ 

1512 

1513 @property 

1514 def operator_name(self) -> str: 

1515 """@property: use a more friendly display name for the operator, if set.""" 

1516 try: 

1517 return self.custom_operator_name # type: ignore 

1518 except AttributeError: 

1519 return self.task_type 

1520 

1521 @property 

1522 def roots(self) -> list[BaseOperator]: 

1523 """Required by DAGNode.""" 

1524 return [self] 

1525 

1526 @property 

1527 def leaves(self) -> list[BaseOperator]: 

1528 """Required by DAGNode.""" 

1529 return [self] 

1530 

1531 @property 

1532 def output(self) -> XComArg: 

1533 """Returns reference to XCom pushed by current operator.""" 

1534 from airflow.models.xcom_arg import XComArg 

1535 

1536 return XComArg(operator=self) 

1537 

1538 @property 

1539 def is_setup(self) -> bool: 

1540 """Whether the operator is a setup task. 

1541 

1542 :meta private: 

1543 """ 

1544 return self._is_setup 

1545 

1546 @is_setup.setter 

1547 def is_setup(self, value: bool) -> None: 

1548 """Setter for is_setup property. 

1549 

1550 :meta private: 

1551 """ 

1552 if self.is_teardown and value: 

1553 raise ValueError(f"Cannot mark task '{self.task_id}' as setup; task is already a teardown.") 

1554 self._is_setup = value 

1555 

1556 @property 

1557 def is_teardown(self) -> bool: 

1558 """Whether the operator is a teardown task. 

1559 

1560 :meta private: 

1561 """ 

1562 return self._is_teardown 

1563 

1564 @is_teardown.setter 

1565 def is_teardown(self, value: bool) -> None: 

1566 """ 

1567 Setter for is_teardown property. 

1568 

1569 :meta private: 

1570 """ 

1571 if self.is_setup and value: 

1572 raise ValueError(f"Cannot mark task '{self.task_id}' as teardown; task is already a setup.") 

1573 self._is_teardown = value 

1574 

1575 @staticmethod 

1576 def xcom_push( 

1577 context: Any, 

1578 key: str, 

1579 value: Any, 

1580 execution_date: datetime | None = None, 

1581 ) -> None: 

1582 """ 

1583 Make an XCom available for tasks to pull. 

1584 

1585 :param context: Execution Context Dictionary 

1586 :param key: A key for the XCom 

1587 :param value: A value for the XCom. The value is pickled and stored 

1588 in the database. 

1589 :param execution_date: if provided, the XCom will not be visible until 

1590 this date. This can be used, for example, to send a message to a 

1591 task on a future date without it being immediately visible. 

1592 """ 

1593 context["ti"].xcom_push(key=key, value=value, execution_date=execution_date) 

1594 

1595 @staticmethod 

1596 @provide_session 

1597 def xcom_pull( 

1598 context: Any, 

1599 task_ids: str | list[str] | None = None, 

1600 dag_id: str | None = None, 

1601 key: str = XCOM_RETURN_KEY, 

1602 include_prior_dates: bool | None = None, 

1603 session: Session = NEW_SESSION, 

1604 ) -> Any: 

1605 """ 

1606 Pull XComs that optionally meet certain criteria. 

1607 

1608 The default value for `key` limits the search to XComs 

1609 that were returned by other tasks (as opposed to those that were pushed 

1610 manually). To remove this filter, pass key=None (or any desired value). 

1611 

1612 If a single task_id string is provided, the result is the value of the 

1613 most recent matching XCom from that task_id. If multiple task_ids are 

1614 provided, a tuple of matching values is returned. None is returned 

1615 whenever no matches are found. 

1616 

1617 :param context: Execution Context Dictionary 

1618 :param key: A key for the XCom. If provided, only XComs with matching 

1619 keys will be returned. The default key is 'return_value', also 

1620 available as a constant XCOM_RETURN_KEY. This key is automatically 

1621 given to XComs returned by tasks (as opposed to being pushed 

1622 manually). To remove the filter, pass key=None. 

1623 :param task_ids: Only XComs from tasks with matching ids will be 

1624 pulled. Can pass None to remove the filter. 

1625 :param dag_id: If provided, only pulls XComs from this DAG. 

1626 If None (default), the DAG of the calling task is used. 

1627 :param include_prior_dates: If False, only XComs from the current 

1628 execution_date are returned. If True, XComs from previous dates 

1629 are returned as well. 

1630 """ 

1631 return context["ti"].xcom_pull( 

1632 key=key, 

1633 task_ids=task_ids, 

1634 dag_id=dag_id, 

1635 include_prior_dates=include_prior_dates, 

1636 session=session, 

1637 ) 

1638 

1639 @classmethod 

1640 def get_serialized_fields(cls): 

1641 """Stringified DAGs and operators contain exactly these fields.""" 

1642 if not cls.__serialized_fields: 

1643 from airflow.models.dag import DagContext 

1644 

1645 # make sure the following dummy task is not added to current active 

1646 # dag in context, otherwise, it will result in 

1647 # `RuntimeError: dictionary changed size during iteration` 

1648 # Exception in SerializedDAG.serialize_dag() call. 

1649 DagContext.push_context_managed_dag(None) 

1650 cls.__serialized_fields = frozenset( 

1651 vars(BaseOperator(task_id="test")).keys() 

1652 - { 

1653 "upstream_task_ids", 

1654 "default_args", 

1655 "dag", 

1656 "_dag", 

1657 "label", 

1658 "_BaseOperator__instantiated", 

1659 "_BaseOperator__init_kwargs", 

1660 "_BaseOperator__from_mapped", 

1661 "_is_setup", 

1662 "_is_teardown", 

1663 "_on_failure_fail_dagrun", 

1664 } 

1665 | { # Class level defaults need to be added to this list 

1666 "start_date", 

1667 "end_date", 

1668 "_task_type", 

1669 "_operator_name", 

1670 "subdag", 

1671 "ui_color", 

1672 "ui_fgcolor", 

1673 "template_ext", 

1674 "template_fields", 

1675 "template_fields_renderers", 

1676 "params", 

1677 "is_setup", 

1678 "is_teardown", 

1679 "on_failure_fail_dagrun", 

1680 "map_index_template", 

1681 "start_trigger", 

1682 "next_method", 

1683 "_needs_expansion", 

1684 } 

1685 ) 

1686 DagContext.pop_context_managed_dag() 

1687 

1688 return cls.__serialized_fields 

1689 

1690 def serialize_for_task_group(self) -> tuple[DagAttributeTypes, Any]: 

1691 """Serialize; required by DAGNode.""" 

1692 return DagAttributeTypes.OP, self.task_id 

1693 

1694 @property 

1695 def inherits_from_empty_operator(self): 

1696 """Used to determine if an Operator is inherited from EmptyOperator.""" 

1697 # This looks like `isinstance(self, EmptyOperator) would work, but this also 

1698 # needs to cope when `self` is a Serialized instance of a EmptyOperator or one 

1699 # of its subclasses (which don't inherit from anything but BaseOperator). 

1700 return getattr(self, "_is_empty", False) 

1701 

1702 def defer( 

1703 self, 

1704 *, 

1705 trigger: BaseTrigger, 

1706 method_name: str, 

1707 kwargs: dict[str, Any] | None = None, 

1708 timeout: timedelta | None = None, 

1709 ): 

1710 """ 

1711 Mark this Operator "deferred", suspending its execution until the provided trigger fires an event. 

1712 

1713 This is achieved by raising a special exception (TaskDeferred) 

1714 which is caught in the main _execute_task wrapper. 

1715 """ 

1716 raise TaskDeferred(trigger=trigger, method_name=method_name, kwargs=kwargs, timeout=timeout) 

1717 

1718 def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] | None, context: Context): 

1719 """Call this method when a deferred task is resumed.""" 

1720 # __fail__ is a special signal value for next_method that indicates 

1721 # this task was scheduled specifically to fail. 

1722 if next_method == "__fail__": 

1723 next_kwargs = next_kwargs or {} 

1724 traceback = next_kwargs.get("traceback") 

1725 if traceback is not None: 

1726 self.log.error("Trigger failed:\n%s", "\n".join(traceback)) 

1727 raise TaskDeferralError(next_kwargs.get("error", "Unknown")) 

1728 # Grab the callable off the Operator/Task and add in any kwargs 

1729 execute_callable = getattr(self, next_method) 

1730 if next_kwargs: 

1731 execute_callable = functools.partial(execute_callable, **next_kwargs) 

1732 return execute_callable(context) 

1733 

1734 def unmap(self, resolve: None | dict[str, Any] | tuple[Context, Session]) -> BaseOperator: 

1735 """Get the "normal" operator from the current operator. 

1736 

1737 Since a BaseOperator is not mapped to begin with, this simply returns 

1738 the original operator. 

1739 

1740 :meta private: 

1741 """ 

1742 return self 

1743 

1744 

1745# TODO: Deprecate for Airflow 3.0 

1746Chainable = Union[DependencyMixin, Sequence[DependencyMixin]] 

1747 

1748 

1749def chain(*tasks: DependencyMixin | Sequence[DependencyMixin]) -> None: 

1750 r""" 

1751 Given a number of tasks, builds a dependency chain. 

1752 

1753 This function accepts values of BaseOperator (aka tasks), EdgeModifiers (aka Labels), XComArg, TaskGroups, 

1754 or lists containing any mix of these types (or a mix in the same list). If you want to chain between two 

1755 lists you must ensure they have the same length. 

1756 

1757 Using classic operators/sensors: 

1758 

1759 .. code-block:: python 

1760 

1761 chain(t1, [t2, t3], [t4, t5], t6) 

1762 

1763 is equivalent to:: 

1764 

1765 / -> t2 -> t4 \ 

1766 t1 -> t6 

1767 \ -> t3 -> t5 / 

1768 

1769 .. code-block:: python 

1770 

1771 t1.set_downstream(t2) 

1772 t1.set_downstream(t3) 

1773 t2.set_downstream(t4) 

1774 t3.set_downstream(t5) 

1775 t4.set_downstream(t6) 

1776 t5.set_downstream(t6) 

1777 

1778 Using task-decorated functions aka XComArgs: 

1779 

1780 .. code-block:: python 

1781 

1782 chain(x1(), [x2(), x3()], [x4(), x5()], x6()) 

1783 

1784 is equivalent to:: 

1785 

1786 / -> x2 -> x4 \ 

1787 x1 -> x6 

1788 \ -> x3 -> x5 / 

1789 

1790 .. code-block:: python 

1791 

1792 x1 = x1() 

1793 x2 = x2() 

1794 x3 = x3() 

1795 x4 = x4() 

1796 x5 = x5() 

1797 x6 = x6() 

1798 x1.set_downstream(x2) 

1799 x1.set_downstream(x3) 

1800 x2.set_downstream(x4) 

1801 x3.set_downstream(x5) 

1802 x4.set_downstream(x6) 

1803 x5.set_downstream(x6) 

1804 

1805 Using TaskGroups: 

1806 

1807 .. code-block:: python 

1808 

1809 chain(t1, task_group1, task_group2, t2) 

1810 

1811 t1.set_downstream(task_group1) 

1812 task_group1.set_downstream(task_group2) 

1813 task_group2.set_downstream(t2) 

1814 

1815 

1816 It is also possible to mix between classic operator/sensor, EdgeModifiers, XComArg, and TaskGroups: 

1817 

1818 .. code-block:: python 

1819 

1820 chain(t1, [Label("branch one"), Label("branch two")], [x1(), x2()], task_group1, x3()) 

1821 

1822 is equivalent to:: 

1823 

1824 / "branch one" -> x1 \ 

1825 t1 -> task_group1 -> x3 

1826 \ "branch two" -> x2 / 

1827 

1828 .. code-block:: python 

1829 

1830 x1 = x1() 

1831 x2 = x2() 

1832 x3 = x3() 

1833 label1 = Label("branch one") 

1834 label2 = Label("branch two") 

1835 t1.set_downstream(label1) 

1836 label1.set_downstream(x1) 

1837 t2.set_downstream(label2) 

1838 label2.set_downstream(x2) 

1839 x1.set_downstream(task_group1) 

1840 x2.set_downstream(task_group1) 

1841 task_group1.set_downstream(x3) 

1842 

1843 # or 

1844 

1845 x1 = x1() 

1846 x2 = x2() 

1847 x3 = x3() 

1848 t1.set_downstream(x1, edge_modifier=Label("branch one")) 

1849 t1.set_downstream(x2, edge_modifier=Label("branch two")) 

1850 x1.set_downstream(task_group1) 

1851 x2.set_downstream(task_group1) 

1852 task_group1.set_downstream(x3) 

1853 

1854 

1855 :param tasks: Individual and/or list of tasks, EdgeModifiers, XComArgs, or TaskGroups to set dependencies 

1856 """ 

1857 for up_task, down_task in zip(tasks, tasks[1:]): 

1858 if isinstance(up_task, DependencyMixin): 

1859 up_task.set_downstream(down_task) 

1860 continue 

1861 if isinstance(down_task, DependencyMixin): 

1862 down_task.set_upstream(up_task) 

1863 continue 

1864 if not isinstance(up_task, Sequence) or not isinstance(down_task, Sequence): 

1865 raise TypeError(f"Chain not supported between instances of {type(up_task)} and {type(down_task)}") 

1866 up_task_list = up_task 

1867 down_task_list = down_task 

1868 if len(up_task_list) != len(down_task_list): 

1869 raise AirflowException( 

1870 f"Chain not supported for different length Iterable. " 

1871 f"Got {len(up_task_list)} and {len(down_task_list)}." 

1872 ) 

1873 for up_t, down_t in zip(up_task_list, down_task_list): 

1874 up_t.set_downstream(down_t) 

1875 

1876 

1877def cross_downstream( 

1878 from_tasks: Sequence[DependencyMixin], 

1879 to_tasks: DependencyMixin | Sequence[DependencyMixin], 

1880): 

1881 r""" 

1882 Set downstream dependencies for all tasks in from_tasks to all tasks in to_tasks. 

1883 

1884 Using classic operators/sensors: 

1885 

1886 .. code-block:: python 

1887 

1888 cross_downstream(from_tasks=[t1, t2, t3], to_tasks=[t4, t5, t6]) 

1889 

1890 is equivalent to:: 

1891 

1892 t1 ---> t4 

1893 \ / 

1894 t2 -X -> t5 

1895 / \ 

1896 t3 ---> t6 

1897 

1898 .. code-block:: python 

1899 

1900 t1.set_downstream(t4) 

1901 t1.set_downstream(t5) 

1902 t1.set_downstream(t6) 

1903 t2.set_downstream(t4) 

1904 t2.set_downstream(t5) 

1905 t2.set_downstream(t6) 

1906 t3.set_downstream(t4) 

1907 t3.set_downstream(t5) 

1908 t3.set_downstream(t6) 

1909 

1910 Using task-decorated functions aka XComArgs: 

1911 

1912 .. code-block:: python 

1913 

1914 cross_downstream(from_tasks=[x1(), x2(), x3()], to_tasks=[x4(), x5(), x6()]) 

1915 

1916 is equivalent to:: 

1917 

1918 x1 ---> x4 

1919 \ / 

1920 x2 -X -> x5 

1921 / \ 

1922 x3 ---> x6 

1923 

1924 .. code-block:: python 

1925 

1926 x1 = x1() 

1927 x2 = x2() 

1928 x3 = x3() 

1929 x4 = x4() 

1930 x5 = x5() 

1931 x6 = x6() 

1932 x1.set_downstream(x4) 

1933 x1.set_downstream(x5) 

1934 x1.set_downstream(x6) 

1935 x2.set_downstream(x4) 

1936 x2.set_downstream(x5) 

1937 x2.set_downstream(x6) 

1938 x3.set_downstream(x4) 

1939 x3.set_downstream(x5) 

1940 x3.set_downstream(x6) 

1941 

1942 It is also possible to mix between classic operator/sensor and XComArg tasks: 

1943 

1944 .. code-block:: python 

1945 

1946 cross_downstream(from_tasks=[t1, x2(), t3], to_tasks=[x1(), t2, x3()]) 

1947 

1948 is equivalent to:: 

1949 

1950 t1 ---> x1 

1951 \ / 

1952 x2 -X -> t2 

1953 / \ 

1954 t3 ---> x3 

1955 

1956 .. code-block:: python 

1957 

1958 x1 = x1() 

1959 x2 = x2() 

1960 x3 = x3() 

1961 t1.set_downstream(x1) 

1962 t1.set_downstream(t2) 

1963 t1.set_downstream(x3) 

1964 x2.set_downstream(x1) 

1965 x2.set_downstream(t2) 

1966 x2.set_downstream(x3) 

1967 t3.set_downstream(x1) 

1968 t3.set_downstream(t2) 

1969 t3.set_downstream(x3) 

1970 

1971 :param from_tasks: List of tasks or XComArgs to start from. 

1972 :param to_tasks: List of tasks or XComArgs to set as downstream dependencies. 

1973 """ 

1974 for task in from_tasks: 

1975 task.set_downstream(to_tasks) 

1976 

1977 

1978def chain_linear(*elements: DependencyMixin | Sequence[DependencyMixin]): 

1979 """ 

1980 Simplify task dependency definition. 

1981 

1982 E.g.: suppose you want precedence like so:: 

1983 

1984 ╭─op2─╮ ╭─op4─╮ 

1985 op1─┤ ├─├─op5─┤─op7 

1986 ╰-op3─╯ ╰-op6─╯ 

1987 

1988 Then you can accomplish like so:: 

1989 

1990 chain_linear(op1, [op2, op3], [op4, op5, op6], op7) 

1991 

1992 :param elements: a list of operators / lists of operators 

1993 """ 

1994 if not elements: 

1995 raise ValueError("No tasks provided; nothing to do.") 

1996 prev_elem = None 

1997 deps_set = False 

1998 for curr_elem in elements: 

1999 if isinstance(curr_elem, EdgeModifier): 

2000 raise ValueError("Labels are not supported by chain_linear") 

2001 if prev_elem is not None: 

2002 for task in prev_elem: 

2003 task >> curr_elem 

2004 if not deps_set: 

2005 deps_set = True 

2006 prev_elem = [curr_elem] if isinstance(curr_elem, DependencyMixin) else curr_elem 

2007 if not deps_set: 

2008 raise ValueError("No dependencies were set. Did you forget to expand with `*`?") 

2009 

2010 

2011def __getattr__(name): 

2012 """ 

2013 PEP-562: Lazy loaded attributes on python modules. 

2014 

2015 :meta private: 

2016 """ 

2017 path = __deprecated_imports.get(name) 

2018 if not path: 

2019 raise AttributeError(f"module {__name__!r} has no attribute {name!r}") 

2020 

2021 from airflow.utils.module_loading import import_string 

2022 

2023 warnings.warn( 

2024 f"Import `{__name__}.{name}` is deprecated. Please use `{path}.{name}`.", 

2025 RemovedInAirflow3Warning, 

2026 stacklevel=2, 

2027 ) 

2028 val = import_string(f"{path}.{name}") 

2029 

2030 # Store for next time 

2031 globals()[name] = val 

2032 return val 

2033 

2034 

2035__deprecated_imports = { 

2036 "BaseOperatorLink": "airflow.models.baseoperatorlink", 

2037}