Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/log/file_task_handler.py: 22%

169 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""File logging handler for tasks.""" 

19from __future__ import annotations 

20 

21import logging 

22import os 

23import warnings 

24from pathlib import Path 

25from typing import TYPE_CHECKING, Any 

26from urllib.parse import urljoin 

27 

28from airflow.configuration import AirflowConfigException, conf 

29from airflow.exceptions import RemovedInAirflow3Warning 

30from airflow.utils.context import Context 

31from airflow.utils.helpers import parse_template_string, render_template_to_string 

32from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler 

33from airflow.utils.session import create_session 

34from airflow.utils.state import State 

35 

36if TYPE_CHECKING: 

37 from airflow.models import TaskInstance 

38 from airflow.utils.log.logging_mixin import SetContextPropagate 

39 

40 

41class FileTaskHandler(logging.Handler): 

42 """ 

43 FileTaskHandler is a python log handler that handles and reads 

44 task instance logs. It creates and delegates log handling 

45 to `logging.FileHandler` after receiving task instance context. 

46 It reads logs from task instance's host machine. 

47 

48 :param base_log_folder: Base log folder to place logs. 

49 :param filename_template: template filename string 

50 """ 

51 

52 def __init__(self, base_log_folder: str, filename_template: str | None = None): 

53 super().__init__() 

54 self.handler: logging.FileHandler | None = None 

55 self.local_base = base_log_folder 

56 if filename_template is not None: 

57 warnings.warn( 

58 "Passing filename_template to a log handler is deprecated and has no effect", 

59 RemovedInAirflow3Warning, 

60 # We want to reference the stack that actually instantiates the 

61 # handler, not the one that calls super()__init__. 

62 stacklevel=(2 if type(self) == FileTaskHandler else 3), 

63 ) 

64 

65 def set_context(self, ti: TaskInstance) -> None | SetContextPropagate: 

66 """ 

67 Provide task_instance context to airflow task handler. 

68 

69 :param ti: task instance object 

70 """ 

71 local_loc = self._init_file(ti) 

72 self.handler = NonCachingFileHandler(local_loc, encoding="utf-8") 

73 if self.formatter: 

74 self.handler.setFormatter(self.formatter) 

75 self.handler.setLevel(self.level) 

76 return None 

77 

78 def emit(self, record): 

79 if self.handler: 

80 self.handler.emit(record) 

81 

82 def flush(self): 

83 if self.handler: 

84 self.handler.flush() 

85 

86 def close(self): 

87 if self.handler: 

88 self.handler.close() 

89 

90 def _render_filename(self, ti: TaskInstance, try_number: int) -> str: 

91 with create_session() as session: 

92 dag_run = ti.get_dagrun(session=session) 

93 template = dag_run.get_log_template(session=session).filename 

94 str_tpl, jinja_tpl = parse_template_string(template) 

95 

96 if jinja_tpl: 

97 if hasattr(ti, "task"): 

98 context = ti.get_template_context() 

99 else: 

100 context = Context(ti=ti, ts=dag_run.logical_date.isoformat()) 

101 context["try_number"] = try_number 

102 return render_template_to_string(jinja_tpl, context) 

103 elif str_tpl: 

104 try: 

105 dag = ti.task.dag 

106 except AttributeError: # ti.task is not always set. 

107 data_interval = (dag_run.data_interval_start, dag_run.data_interval_end) 

108 else: 

109 if TYPE_CHECKING: 

110 assert dag is not None 

111 data_interval = dag.get_run_data_interval(dag_run) 

112 if data_interval[0]: 

113 data_interval_start = data_interval[0].isoformat() 

114 else: 

115 data_interval_start = "" 

116 if data_interval[1]: 

117 data_interval_end = data_interval[1].isoformat() 

118 else: 

119 data_interval_end = "" 

120 return str_tpl.format( 

121 dag_id=ti.dag_id, 

122 task_id=ti.task_id, 

123 run_id=ti.run_id, 

124 data_interval_start=data_interval_start, 

125 data_interval_end=data_interval_end, 

126 execution_date=ti.get_dagrun().logical_date.isoformat(), 

127 try_number=try_number, 

128 ) 

129 else: 

130 raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen") 

131 

132 def _read_grouped_logs(self): 

133 return False 

134 

135 @staticmethod 

136 def _should_check_k8s(queue): 

137 """ 

138 If the task is running through kubernetes executor, return True. 

139 

140 When logs aren't available locally, in this case we read from k8s pod logs. 

141 """ 

142 executor = conf.get("core", "executor") 

143 if executor == "KubernetesExecutor": 

144 return True 

145 elif executor == "LocalKubernetesExecutor": 

146 if queue == conf.get("local_kubernetes_executor", "kubernetes_queue"): 

147 return True 

148 elif executor == "CeleryKubernetesExecutor": 

149 if queue == conf.get("celery_kubernetes_executor", "kubernetes_queue"): 

150 return True 

151 return False 

152 

153 def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | None = None): 

154 """ 

155 Template method that contains custom logic of reading 

156 logs given the try_number. 

157 

158 :param ti: task instance record 

159 :param try_number: current try_number to read log from 

160 :param metadata: log metadata, 

161 can be used for steaming log reading and auto-tailing. 

162 Following attributes are used: 

163 log_pos: (absolute) Char position to which the log 

164 which was retrieved in previous calls, this 

165 part will be skipped and only following test 

166 returned to be added to tail. 

167 

168 :return: log message as a string and metadata. 

169 Following attributes are used in metadata: 

170 end_of_log: Boolean, True if end of log is reached or False 

171 if further calls might get more log text. 

172 This is determined by the status of the TaskInstance 

173 log_pos: (absolute) Char position to which the log is retrieved 

174 """ 

175 from airflow.utils.jwt_signer import JWTSigner 

176 

177 # Task instance here might be different from task instance when 

178 # initializing the handler. Thus explicitly getting log location 

179 # is needed to get correct log path. 

180 log_relative_path = self._render_filename(ti, try_number) 

181 location = os.path.join(self.local_base, log_relative_path) 

182 

183 log = "" 

184 if os.path.exists(location): 

185 try: 

186 with open(location, encoding="utf-8", errors="surrogateescape") as file: 

187 log += f"*** Reading local file: {location}\n" 

188 log += "".join(file.readlines()) 

189 except Exception as e: 

190 log = f"*** Failed to load local log file: {location}\n" 

191 log += f"*** {str(e)}\n" 

192 return log, {"end_of_log": True} 

193 elif self._should_check_k8s(ti.queue): 

194 pod_override = ti.executor_config.get("pod_override") 

195 if pod_override and pod_override.metadata and pod_override.metadata.namespace: 

196 namespace = pod_override.metadata.namespace 

197 else: 

198 namespace = conf.get("kubernetes_executor", "namespace") 

199 try: 

200 from airflow.kubernetes.kube_client import get_kube_client 

201 

202 kube_client = get_kube_client() 

203 

204 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n" 

205 res = kube_client.read_namespaced_pod_log( 

206 name=ti.hostname, 

207 namespace=namespace, 

208 container="base", 

209 follow=False, 

210 tail_lines=100, 

211 _preload_content=False, 

212 ) 

213 

214 for line in res: 

215 log += line.decode() 

216 

217 except Exception as f: 

218 log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n" 

219 return log, {"end_of_log": True} 

220 else: 

221 import httpx 

222 

223 url = self._get_log_retrieval_url(ti, log_relative_path) 

224 log += f"*** Log file does not exist: {location}\n" 

225 log += f"*** Fetching from: {url}\n" 

226 try: 

227 timeout = None # No timeout 

228 try: 

229 timeout = conf.getint("webserver", "log_fetch_timeout_sec") 

230 except (AirflowConfigException, ValueError): 

231 pass 

232 

233 signer = JWTSigner( 

234 secret_key=conf.get("webserver", "secret_key"), 

235 expiration_time_in_seconds=conf.getint( 

236 "webserver", "log_request_clock_grace", fallback=30 

237 ), 

238 audience="task-instance-logs", 

239 ) 

240 response = httpx.get( 

241 url, 

242 timeout=timeout, 

243 headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})}, 

244 ) 

245 response.encoding = "utf-8" 

246 

247 if response.status_code == 403: 

248 log += ( 

249 "*** !!!! Please make sure that all your Airflow components (e.g. " 

250 "schedulers, webservers and workers) have " 

251 "the same 'secret_key' configured in 'webserver' section and " 

252 "time is synchronized on all your machines (for example with ntpd) !!!!!\n***" 

253 ) 

254 log += ( 

255 "*** See more at https://airflow.apache.org/docs/apache-airflow/" 

256 "stable/configurations-ref.html#secret-key\n***" 

257 ) 

258 # Check if the resource was properly fetched 

259 response.raise_for_status() 

260 

261 log += "\n" + response.text 

262 except Exception as e: 

263 log += f"*** Failed to fetch log file from worker. {str(e)}\n" 

264 return log, {"end_of_log": True} 

265 

266 # Process tailing if log is not at it's end 

267 end_of_log = ti.try_number != try_number or ti.state not in State.running 

268 log_pos = len(log) 

269 if metadata and "log_pos" in metadata: 

270 previous_chars = metadata["log_pos"] 

271 log = log[previous_chars:] # Cut off previously passed log test as new tail 

272 

273 return log, {"end_of_log": end_of_log, "log_pos": log_pos} 

274 

275 @staticmethod 

276 def _get_log_retrieval_url(ti: TaskInstance, log_relative_path: str) -> str: 

277 url = urljoin( 

278 f"http://{ti.hostname}:{conf.get('logging', 'WORKER_LOG_SERVER_PORT')}/log/", 

279 log_relative_path, 

280 ) 

281 return url 

282 

283 def read(self, task_instance, try_number=None, metadata=None): 

284 """ 

285 Read logs of given task instance from local machine. 

286 

287 :param task_instance: task instance object 

288 :param try_number: task instance try_number to read logs from. If None 

289 it returns all logs separated by try_number 

290 :param metadata: log metadata, 

291 can be used for steaming log reading and auto-tailing. 

292 :return: a list of listed tuples which order log string by host 

293 """ 

294 # Task instance increments its try number when it starts to run. 

295 # So the log for a particular task try will only show up when 

296 # try number gets incremented in DB, i.e logs produced the time 

297 # after cli run and before try_number + 1 in DB will not be displayed. 

298 

299 if try_number is None: 

300 next_try = task_instance.next_try_number 

301 try_numbers = list(range(1, next_try)) 

302 elif try_number < 1: 

303 logs = [ 

304 [("default_host", f"Error fetching the logs. Try number {try_number} is invalid.")], 

305 ] 

306 return logs, [{"end_of_log": True}] 

307 else: 

308 try_numbers = [try_number] 

309 

310 logs = [""] * len(try_numbers) 

311 metadata_array = [{}] * len(try_numbers) 

312 for i, try_number_element in enumerate(try_numbers): 

313 log, out_metadata = self._read(task_instance, try_number_element, metadata) 

314 # es_task_handler return logs grouped by host. wrap other handler returning log string 

315 # with default/ empty host so that UI can render the response in the same way 

316 logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)] 

317 metadata_array[i] = out_metadata 

318 

319 return logs, metadata_array 

320 

321 def _init_file(self, ti): 

322 """ 

323 Create log directory and give it correct permissions. 

324 

325 :param ti: task instance object 

326 :return: relative log path of the given task instance 

327 """ 

328 # To handle log writing when tasks are impersonated, the log files need to 

329 # be writable by the user that runs the Airflow command and the user 

330 # that is impersonated. This is mainly to handle corner cases with the 

331 # SubDagOperator. When the SubDagOperator is run, all of the operators 

332 # run under the impersonated user and create appropriate log files 

333 # as the impersonated user. However, if the user manually runs tasks 

334 # of the SubDagOperator through the UI, then the log files are created 

335 # by the user that runs the Airflow command. For example, the Airflow 

336 # run command may be run by the `airflow_sudoable` user, but the Airflow 

337 # tasks may be run by the `airflow` user. If the log files are not 

338 # writable by both users, then it's possible that re-running a task 

339 # via the UI (or vice versa) results in a permission error as the task 

340 # tries to write to a log file created by the other user. 

341 relative_path = self._render_filename(ti, ti.try_number) 

342 full_path = os.path.join(self.local_base, relative_path) 

343 directory = os.path.dirname(full_path) 

344 # Create the log file and give it group writable permissions 

345 # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag 

346 # operator is not compatible with impersonation (e.g. if a Celery executor is used 

347 # for a SubDag operator and the SubDag operator has a different owner than the 

348 # parent DAG) 

349 Path(directory).mkdir(mode=0o777, parents=True, exist_ok=True) 

350 

351 if not os.path.exists(full_path): 

352 open(full_path, "a").close() 

353 # TODO: Investigate using 444 instead of 666. 

354 try: 

355 os.chmod(full_path, 0o666) 

356 except OSError: 

357 logging.warning("OSError while change ownership of the log file") 

358 

359 return full_path