Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/policies.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

64 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20from typing import TYPE_CHECKING 

21 

22import pluggy 

23 

24local_settings_hookspec = pluggy.HookspecMarker("airflow.policy") 

25hookimpl = pluggy.HookimplMarker("airflow.policy") 

26 

27__all__: list[str] = ["hookimpl"] 

28 

29if TYPE_CHECKING: 

30 from airflow.models.baseoperator import BaseOperator 

31 from airflow.models.dag import DAG 

32 from airflow.models.taskinstance import TaskInstance 

33 

34 

35@local_settings_hookspec 

36def task_policy(task: BaseOperator) -> None: 

37 """ 

38 Allow altering tasks after they are loaded in the DagBag. 

39 

40 It allows administrator to rewire some task's parameters. Alternatively you can raise 

41 ``AirflowClusterPolicyViolation`` exception to stop DAG from being executed. 

42 

43 Here are a few examples of how this can be useful: 

44 

45 * You could enforce a specific queue (say the ``spark`` queue) for tasks using the ``SparkOperator`` to 

46 make sure that these tasks get wired to the right workers 

47 * You could enforce a task timeout policy, making sure that no tasks run for more than 48 hours 

48 

49 :param task: task to be mutated 

50 """ 

51 

52 

53@local_settings_hookspec 

54def dag_policy(dag: DAG) -> None: 

55 """ 

56 Allow altering DAGs after they are loaded in the DagBag. 

57 

58 It allows administrator to rewire some DAG's parameters. 

59 Alternatively you can raise ``AirflowClusterPolicyViolation`` exception 

60 to stop DAG from being executed. 

61 

62 Here are a few examples of how this can be useful: 

63 

64 * You could enforce default user for DAGs 

65 * Check if every DAG has configured tags 

66 

67 :param dag: dag to be mutated 

68 """ 

69 

70 

71@local_settings_hookspec 

72def task_instance_mutation_hook(task_instance: TaskInstance) -> None: 

73 """ 

74 Allow altering task instances before being queued by the Airflow scheduler. 

75 

76 This could be used, for instance, to modify the task instance during retries. 

77 

78 :param task_instance: task instance to be mutated 

79 """ 

80 

81 

82@local_settings_hookspec 

83def pod_mutation_hook(pod) -> None: 

84 """ 

85 Mutate pod before scheduling. 

86 

87 This setting allows altering ``kubernetes.client.models.V1Pod`` object before they are passed to the 

88 Kubernetes client for scheduling. 

89 

90 This could be used, for instance, to add sidecar or init containers to every worker pod launched by 

91 KubernetesExecutor or KubernetesPodOperator. 

92 """ 

93 

94 

95@local_settings_hookspec(firstresult=True) 

96def get_airflow_context_vars(context) -> dict[str, str]: # type: ignore[empty-body] 

97 """ 

98 Inject airflow context vars into default airflow context vars. 

99 

100 This setting allows getting the airflow context vars, which are key value pairs. They are then injected 

101 to default airflow context vars, which in the end are available as environment variables when running 

102 tasks dag_id, task_id, execution_date, dag_run_id, try_number are reserved keys. 

103 

104 :param context: The context for the task_instance of interest. 

105 """ 

106 

107 

108@local_settings_hookspec(firstresult=True) 

109def get_dagbag_import_timeout(dag_file_path: str) -> int | float: # type: ignore[empty-body] 

110 """ 

111 Allow for dynamic control of the DAG file parsing timeout based on the DAG file path. 

112 

113 It is useful when there are a few DAG files requiring longer parsing times, while others do not. 

114 You can control them separately instead of having one value for all DAG files. 

115 

116 If the return value is less than or equal to 0, it means no timeout during the DAG parsing. 

117 """ 

118 

119 

120class DefaultPolicy: 

121 """Default implementations of the policy functions. 

122 

123 :meta private: 

124 """ 

125 

126 # Default implementations of the policy functions 

127 

128 @staticmethod 

129 @hookimpl 

130 def get_dagbag_import_timeout(dag_file_path: str): 

131 from airflow.configuration import conf 

132 

133 return conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT") 

134 

135 @staticmethod 

136 @hookimpl 

137 def get_airflow_context_vars(context): 

138 return {} 

139 

140 

141def make_plugin_from_local_settings(pm: pluggy.PluginManager, module, names: set[str]): 

142 """ 

143 Turn the functions from airflow_local_settings module into a custom/local plugin. 

144 

145 Allows plugin-registered functions to co-operate with pluggy/setuptool 

146 entrypoint plugins of the same methods. 

147 

148 Airflow local settings will be "win" (i.e. they have the final say) as they are the last plugin 

149 registered. 

150 

151 :meta private: 

152 """ 

153 import inspect 

154 import textwrap 

155 

156 import attr 

157 

158 hook_methods = set() 

159 

160 def _make_shim_fn(name, desired_sig, target): 

161 # Functions defined in airflow_local_settings are called by positional parameters, so the names don't 

162 # have to match what we define in the "template" policy. 

163 # 

164 # However Pluggy validates the names match (and will raise an error if they don't!) 

165 # 

166 # To maintain compat, if we detect the names don't match, we will wrap it with a dynamically created 

167 # shim function that looks somewhat like this: 

168 # 

169 # def dag_policy_name_mismatch_shim(dag): 

170 # airflow_local_settings.dag_policy(dag) 

171 # 

172 codestr = textwrap.dedent( 

173 f""" 

174 def {name}_name_mismatch_shim{desired_sig}: 

175 return __target({' ,'.join(desired_sig.parameters)}) 

176 """ 

177 ) 

178 code = compile(codestr, "<policy-shim>", "single") 

179 scope = {"__target": target} 

180 exec(code, scope, scope) 

181 return scope[f"{name}_name_mismatch_shim"] 

182 

183 @attr.define(frozen=True) 

184 class AirflowLocalSettingsPolicy: 

185 hook_methods: tuple[str, ...] 

186 

187 __name__ = "AirflowLocalSettingsPolicy" 

188 

189 def __dir__(self): 

190 return self.hook_methods 

191 

192 for name in names: 

193 if not hasattr(pm.hook, name): 

194 continue 

195 

196 policy = getattr(module, name) 

197 

198 if not policy: 

199 continue 

200 

201 local_sig = inspect.signature(policy) 

202 policy_sig = inspect.signature(globals()[name]) 

203 # We only care if names/order/number of parameters match, not type hints 

204 if local_sig.parameters.keys() != policy_sig.parameters.keys(): 

205 policy = _make_shim_fn(name, policy_sig, target=policy) 

206 

207 setattr(AirflowLocalSettingsPolicy, name, staticmethod(hookimpl(policy, specname=name))) 

208 hook_methods.add(name) 

209 

210 if hook_methods: 

211 pm.register(AirflowLocalSettingsPolicy(hook_methods=tuple(hook_methods))) 

212 

213 return hook_methods