Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/policies.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

63 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20from typing import TYPE_CHECKING 

21 

22import pluggy 

23 

24local_settings_hookspec = pluggy.HookspecMarker("airflow.policy") 

25hookimpl = pluggy.HookimplMarker("airflow.policy") 

26 

27__all__: list[str] = ["hookimpl"] 

28 

29if TYPE_CHECKING: 

30 from airflow.models.taskinstance import TaskInstance 

31 

32 

33@local_settings_hookspec 

34def task_policy(task) -> None: 

35 """ 

36 Allow altering tasks after they are loaded in the DagBag. 

37 

38 It allows administrator to rewire some task's parameters. Alternatively you can raise 

39 ``AirflowClusterPolicyViolation`` exception to stop DAG from being executed. 

40 

41 Here are a few examples of how this can be useful: 

42 

43 * You could enforce a specific queue (say the ``spark`` queue) for tasks using the ``SparkOperator`` to 

44 make sure that these tasks get wired to the right workers 

45 * You could enforce a task timeout policy, making sure that no tasks run for more than 48 hours 

46 

47 :param task: task to be mutated 

48 """ 

49 

50 

51@local_settings_hookspec 

52def dag_policy(dag) -> None: 

53 """ 

54 Allow altering DAGs after they are loaded in the DagBag. 

55 

56 It allows administrator to rewire some DAG's parameters. 

57 Alternatively you can raise ``AirflowClusterPolicyViolation`` exception 

58 to stop DAG from being executed. 

59 

60 Here are a few examples of how this can be useful: 

61 

62 * You could enforce default user for DAGs 

63 * Check if every DAG has configured tags 

64 

65 :param dag: dag to be mutated 

66 """ 

67 

68 

69@local_settings_hookspec 

70def task_instance_mutation_hook(task_instance: TaskInstance) -> None: 

71 """ 

72 Allow altering task instances before being queued by the Airflow scheduler. 

73 

74 This could be used, for instance, to modify the task instance during retries. 

75 

76 :param task_instance: task instance to be mutated 

77 """ 

78 

79 

80@local_settings_hookspec 

81def pod_mutation_hook(pod) -> None: 

82 """ 

83 Mutate pod before scheduling. 

84 

85 This setting allows altering ``kubernetes.client.models.V1Pod`` object before they are passed to the 

86 Kubernetes client for scheduling. 

87 

88 This could be used, for instance, to add sidecar or init containers to every worker pod launched by 

89 KubernetesExecutor or KubernetesPodOperator. 

90 """ 

91 

92 

93@local_settings_hookspec(firstresult=True) 

94def get_airflow_context_vars(context) -> dict[str, str]: # type: ignore[empty-body] 

95 """ 

96 Inject airflow context vars into default airflow context vars. 

97 

98 This setting allows getting the airflow context vars, which are key value pairs. They are then injected 

99 to default airflow context vars, which in the end are available as environment variables when running 

100 tasks dag_id, task_id, logical_date, dag_run_id, try_number are reserved keys. 

101 

102 :param context: The context for the task_instance of interest. 

103 """ 

104 

105 

106@local_settings_hookspec(firstresult=True) 

107def get_dagbag_import_timeout(dag_file_path: str) -> int | float: # type: ignore[empty-body] 

108 """ 

109 Allow for dynamic control of the DAG file parsing timeout based on the DAG file path. 

110 

111 It is useful when there are a few DAG files requiring longer parsing times, while others do not. 

112 You can control them separately instead of having one value for all DAG files. 

113 

114 If the return value is less than or equal to 0, it means no timeout during the DAG parsing. 

115 """ 

116 

117 

118class DefaultPolicy: 

119 """ 

120 Default implementations of the policy functions. 

121 

122 :meta private: 

123 """ 

124 

125 # Default implementations of the policy functions 

126 

127 @staticmethod 

128 @hookimpl 

129 def get_dagbag_import_timeout(dag_file_path: str): 

130 from airflow.configuration import conf 

131 

132 return conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT") 

133 

134 @staticmethod 

135 @hookimpl 

136 def get_airflow_context_vars(context): 

137 return {} 

138 

139 

140def make_plugin_from_local_settings(pm: pluggy.PluginManager, module, names: set[str]): 

141 """ 

142 Turn the functions from airflow_local_settings module into a custom/local plugin. 

143 

144 Allows plugin-registered functions to co-operate with pluggy/setuptool 

145 entrypoint plugins of the same methods. 

146 

147 Airflow local settings will be "win" (i.e. they have the final say) as they are the last plugin 

148 registered. 

149 

150 :meta private: 

151 """ 

152 import inspect 

153 import textwrap 

154 

155 import attr 

156 

157 hook_methods = set() 

158 

159 def _make_shim_fn(name, desired_sig, target): 

160 # Functions defined in airflow_local_settings are called by positional parameters, so the names don't 

161 # have to match what we define in the "template" policy. 

162 # 

163 # However Pluggy validates the names match (and will raise an error if they don't!) 

164 # 

165 # To maintain compat, if we detect the names don't match, we will wrap it with a dynamically created 

166 # shim function that looks somewhat like this: 

167 # 

168 # def dag_policy_name_mismatch_shim(dag): 

169 # airflow_local_settings.dag_policy(dag) 

170 # 

171 codestr = textwrap.dedent( 

172 f""" 

173 def {name}_name_mismatch_shim{desired_sig}: 

174 return __target({" ,".join(desired_sig.parameters)}) 

175 """ 

176 ) 

177 code = compile(codestr, "<policy-shim>", "single") 

178 scope = {"__target": target} 

179 exec(code, scope, scope) 

180 return scope[f"{name}_name_mismatch_shim"] 

181 

182 @attr.define(frozen=True) 

183 class AirflowLocalSettingsPolicy: 

184 hook_methods: tuple[str, ...] 

185 

186 __name__ = "AirflowLocalSettingsPolicy" 

187 

188 def __dir__(self): 

189 return self.hook_methods 

190 

191 for name in names: 

192 if not hasattr(pm.hook, name): 

193 continue 

194 

195 policy = getattr(module, name) 

196 

197 if not policy: 

198 continue 

199 

200 local_sig = inspect.signature(policy) 

201 policy_sig = inspect.signature(globals()[name]) 

202 # We only care if names/order/number of parameters match, not type hints 

203 if local_sig.parameters.keys() != policy_sig.parameters.keys(): 

204 policy = _make_shim_fn(name, policy_sig, target=policy) 

205 

206 setattr(AirflowLocalSettingsPolicy, name, staticmethod(hookimpl(policy, specname=name))) 

207 hook_methods.add(name) 

208 

209 if hook_methods: 

210 pm.register(AirflowLocalSettingsPolicy(hook_methods=tuple(hook_methods))) 

211 

212 return hook_methods