Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/policies.py: 42%
64 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
18from __future__ import annotations
20from typing import TYPE_CHECKING
22import pluggy
24local_settings_hookspec = pluggy.HookspecMarker("airflow.policy")
25hookimpl = pluggy.HookimplMarker("airflow.policy")
27__all__: list[str] = ["hookimpl"]
29if TYPE_CHECKING:
30 from airflow.models.baseoperator import BaseOperator
31 from airflow.models.dag import DAG
32 from airflow.models.taskinstance import TaskInstance
35@local_settings_hookspec
36def task_policy(task: BaseOperator) -> None:
37 """
38 This policy setting allows altering tasks after they are loaded in the DagBag.
40 It allows administrator to rewire some task's parameters. Alternatively you can raise
41 ``AirflowClusterPolicyViolation`` exception to stop DAG from being executed.
43 Here are a few examples of how this can be useful:
45 * You could enforce a specific queue (say the ``spark`` queue) for tasks using the ``SparkOperator`` to
46 make sure that these tasks get wired to the right workers
47 * You could enforce a task timeout policy, making sure that no tasks run for more than 48 hours
49 :param task: task to be mutated
50 """
53@local_settings_hookspec
54def dag_policy(dag: DAG) -> None:
55 """
56 This policy setting allows altering DAGs after they are loaded in the DagBag.
58 It allows administrator to rewire some DAG's parameters.
59 Alternatively you can raise ``AirflowClusterPolicyViolation`` exception
60 to stop DAG from being executed.
62 Here are a few examples of how this can be useful:
64 * You could enforce default user for DAGs
65 * Check if every DAG has configured tags
67 :param dag: dag to be mutated
68 """
71@local_settings_hookspec
72def task_instance_mutation_hook(task_instance: TaskInstance) -> None:
73 """
74 This setting allows altering task instances before being queued by the Airflow scheduler.
76 This could be used, for instance, to modify the task instance during retries.
78 :param task_instance: task instance to be mutated
79 """
82@local_settings_hookspec
83def pod_mutation_hook(pod) -> None:
84 """
85 Mutate pod before scheduling.
87 This setting allows altering ``kubernetes.client.models.V1Pod`` object before they are passed to the
88 Kubernetes client for scheduling.
90 This could be used, for instance, to add sidecar or init containers to every worker pod launched by
91 KubernetesExecutor or KubernetesPodOperator.
92 """
95@local_settings_hookspec(firstresult=True)
96def get_airflow_context_vars(context) -> dict[str, str]: # type: ignore[empty-body]
97 """
98 This setting allows getting the airflow context vars, which are key value pairs. They are then injected
99 to default airflow context vars, which in the end are available as environment variables when running
100 tasks dag_id, task_id, execution_date, dag_run_id, try_number are reserved keys.
102 :param context: The context for the task_instance of interest.
103 """
106@local_settings_hookspec(firstresult=True)
107def get_dagbag_import_timeout(dag_file_path: str) -> int | float: # type: ignore[empty-body]
108 """
109 This setting allows for dynamic control of the DAG file parsing timeout based on the DAG file path.
111 It is useful when there are a few DAG files requiring longer parsing times, while others do not.
112 You can control them separately instead of having one value for all DAG files.
114 If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
115 """
118class DefaultPolicy:
119 """Default implementations of the policy functions.
121 :meta private:
122 """
124 # Default implementations of the policy functions
126 @staticmethod
127 @hookimpl
128 def get_dagbag_import_timeout(dag_file_path: str):
129 from airflow.configuration import conf
131 return conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT")
133 @staticmethod
134 @hookimpl
135 def get_airflow_context_vars(context):
136 return {}
139def make_plugin_from_local_settings(pm: pluggy.PluginManager, module, names: list[str]):
140 """
141 Turn the functions from airflow_local_settings module into a custom/local plugin, so that
142 plugin-registered functions can co-operate with pluggy/setuptool entrypoint plugins of the same methods.
144 Airflow local settings will be "win" (i.e. they have the final say) as they are the last plugin
145 registered.
147 :meta private:
148 """
149 import inspect
150 import textwrap
152 import attr
154 hook_methods = set()
156 def _make_shim_fn(name, desired_sig, target):
157 # Functions defined in airflow_local_settings are called by positional parameters, so the names don't
158 # have to match what we define in the "template" policy.
159 #
160 # However Pluggy validates the names match (and will raise an error if they don't!)
161 #
162 # To maintain compat, if we detect the names don't match, we will wrap it with a dynamically created
163 # shim function that looks somewhat like this:
164 #
165 # def dag_policy_name_mismatch_shim(dag):
166 # airflow_local_settings.dag_policy(dag)
167 #
168 codestr = textwrap.dedent(
169 f"""
170 def {name}_name_mismatch_shim{str(desired_sig)}:
171 return __target({' ,'.join(desired_sig.parameters)})
172 """
173 )
174 code = compile(codestr, "<policy-shim>", "single")
175 scope = {"__target": target}
176 exec(code, scope, scope)
177 return scope[f"{name}_name_mismatch_shim"]
179 @attr.define(frozen=True)
180 class AirflowLocalSettingsPolicy:
181 hook_methods: tuple[str, ...]
183 __name__ = "AirflowLocalSettingsPolicy"
185 def __dir__(self):
186 return self.hook_methods
188 for name in names:
189 if not hasattr(pm.hook, name):
190 continue
192 policy = getattr(module, name)
194 if not policy:
195 continue
197 local_sig = inspect.signature(policy)
198 policy_sig = inspect.signature(globals()[name])
199 # We only care if names/order/number of parameters match, not type hints
200 if local_sig.parameters.keys() != policy_sig.parameters.keys():
201 policy = _make_shim_fn(name, policy_sig, target=policy)
203 setattr(AirflowLocalSettingsPolicy, name, staticmethod(hookimpl(policy, specname=name)))
204 hook_methods.add(name)
206 if hook_methods:
207 pm.register(AirflowLocalSettingsPolicy(hook_methods=tuple(hook_methods)))
209 return hook_methods