1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18
19from __future__ import annotations
20
21from typing import TYPE_CHECKING
22
23from pluggy import HookspecMarker
24
25if TYPE_CHECKING:
26 # These imports are for type checking only - no runtime dependency
27 from airflow.models.taskinstance import TaskInstance
28 from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
29 from airflow.utils.state import TaskInstanceState
30
31hookspec = HookspecMarker("airflow")
32
33
34@hookspec
35def on_task_instance_running(
36 previous_state: TaskInstanceState | None,
37 task_instance: RuntimeTaskInstance | TaskInstance,
38):
39 """Execute when task state changes to RUNNING. previous_state can be None."""
40
41
42@hookspec
43def on_task_instance_success(
44 previous_state: TaskInstanceState | None,
45 task_instance: RuntimeTaskInstance | TaskInstance,
46):
47 """Execute when task state changes to SUCCESS. previous_state can be None."""
48
49
50@hookspec
51def on_task_instance_failed(
52 previous_state: TaskInstanceState | None,
53 task_instance: RuntimeTaskInstance | TaskInstance,
54 error: None | str | BaseException,
55):
56 """Execute when task state changes to FAIL. previous_state can be None."""
57
58
59@hookspec
60def on_task_instance_skipped(
61 previous_state: TaskInstanceState | None,
62 task_instance: RuntimeTaskInstance | TaskInstance,
63):
64 """
65 Execute when a task instance skips itself during execution.
66
67 This hook is called only when a task has started execution and then
68 intentionally skips itself (e.g., by raising AirflowSkipException).
69
70 Note: This function will NOT cover tasks that were skipped by scheduler, before execution began, such as:
71 - Skips due to trigger rules (e.g., upstream failures)
72 - Skips from operators like BranchPythonOperator, ShortCircuitOperator, or similar mechanisms
73 - Any other situation in which the scheduler decides not to schedule a task for execution
74
75 For comprehensive tracking of skipped tasks, use DAG-level listeners
76 (on_dag_run_success/on_dag_run_failed) which may have access to all task states.
77
78 :param previous_state: Previous state of the task instance (can be None)
79 :param task_instance: The task instance object (RuntimeTaskInstance when called
80 from task execution context, TaskInstance when called from API server)
81 """