Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/log/file_task_handler.py: 25%

255 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""File logging handler for tasks.""" 

19from __future__ import annotations 

20 

21import logging 

22import os 

23import warnings 

24from contextlib import suppress 

25from enum import Enum 

26from functools import cached_property 

27from pathlib import Path 

28from typing import TYPE_CHECKING, Any, Callable, Iterable 

29from urllib.parse import urljoin 

30 

31import pendulum 

32 

33from airflow.configuration import conf 

34from airflow.exceptions import RemovedInAirflow3Warning 

35from airflow.executors.executor_loader import ExecutorLoader 

36from airflow.utils.context import Context 

37from airflow.utils.helpers import parse_template_string, render_template_to_string 

38from airflow.utils.log.logging_mixin import SetContextPropagate 

39from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler 

40from airflow.utils.session import create_session 

41from airflow.utils.state import State, TaskInstanceState 

42 

43if TYPE_CHECKING: 

44 from airflow.models import TaskInstance 

45 

46logger = logging.getLogger(__name__) 

47 

48 

49class LogType(str, Enum): 

50 """ 

51 Type of service from which we retrieve logs. 

52 

53 :meta private: 

54 """ 

55 

56 TRIGGER = "trigger" 

57 WORKER = "worker" 

58 

59 

60def _set_task_deferred_context_var(): 

61 """ 

62 Tell task log handler that task exited with deferral. 

63 

64 This exists for the sole purpose of telling elasticsearch handler not to 

65 emit end_of_log mark after task deferral. 

66 

67 Depending on how the task is run, we may need to set this in task command or in local task job. 

68 Kubernetes executor requires the local task job invocation; local executor requires the task 

69 command invocation. 

70 

71 :meta private: 

72 """ 

73 logger = logging.getLogger() 

74 with suppress(StopIteration): 

75 h = next(h for h in logger.handlers if hasattr(h, "ctx_task_deferred")) 

76 h.ctx_task_deferred = True 

77 

78 

79def _fetch_logs_from_service(url, log_relative_path): 

80 import httpx 

81 

82 from airflow.utils.jwt_signer import JWTSigner 

83 

84 timeout = conf.getint("webserver", "log_fetch_timeout_sec", fallback=None) 

85 signer = JWTSigner( 

86 secret_key=conf.get("webserver", "secret_key"), 

87 expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30), 

88 audience="task-instance-logs", 

89 ) 

90 response = httpx.get( 

91 url, 

92 timeout=timeout, 

93 headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})}, 

94 ) 

95 response.encoding = "utf-8" 

96 return response 

97 

98 

99_parse_timestamp = conf.getimport("logging", "interleave_timestamp_parser", fallback=None) 

100 

101if not _parse_timestamp: 

102 

103 def _parse_timestamp(line: str): 

104 timestamp_str, _ = line.split(" ", 1) 

105 return pendulum.parse(timestamp_str.strip("[]")) 

106 

107 

108def _parse_timestamps_in_log_file(lines: Iterable[str]): 

109 timestamp = None 

110 next_timestamp = None 

111 for idx, line in enumerate(lines): 

112 if not line: 

113 continue 

114 with suppress(Exception): 

115 # next_timestamp unchanged if line can't be parsed 

116 next_timestamp = _parse_timestamp(line) 

117 if next_timestamp: 

118 timestamp = next_timestamp 

119 yield timestamp, idx, line 

120 

121 

122def _interleave_logs(*logs): 

123 records = [] 

124 for log in logs: 

125 records.extend(_parse_timestamps_in_log_file(log.splitlines())) 

126 last = None 

127 for _, _, v in sorted( 

128 records, key=lambda x: (x[0], x[1]) if x[0] else (pendulum.datetime(2000, 1, 1), x[1]) 

129 ): 

130 if v != last: # dedupe 

131 yield v 

132 last = v 

133 

134 

135class FileTaskHandler(logging.Handler): 

136 """ 

137 FileTaskHandler is a python log handler that handles and reads 

138 task instance logs. It creates and delegates log handling 

139 to `logging.FileHandler` after receiving task instance context. 

140 It reads logs from task instance's host machine. 

141 

142 :param base_log_folder: Base log folder to place logs. 

143 :param filename_template: template filename string 

144 """ 

145 

146 trigger_should_wrap = True 

147 

148 def __init__(self, base_log_folder: str, filename_template: str | None = None): 

149 super().__init__() 

150 self.handler: logging.FileHandler | None = None 

151 self.local_base = base_log_folder 

152 if filename_template is not None: 

153 warnings.warn( 

154 "Passing filename_template to a log handler is deprecated and has no effect", 

155 RemovedInAirflow3Warning, 

156 # We want to reference the stack that actually instantiates the 

157 # handler, not the one that calls super()__init__. 

158 stacklevel=(2 if type(self) == FileTaskHandler else 3), 

159 ) 

160 self.maintain_propagate: bool = False 

161 """ 

162 If true, overrides default behavior of setting propagate=False 

163 

164 :meta private: 

165 """ 

166 

167 self.ctx_task_deferred = False 

168 """ 

169 If true, task exited with deferral to trigger. 

170 

171 Some handlers emit "end of log" markers, and may not wish to do so when task defers. 

172 """ 

173 

174 def set_context(self, ti: TaskInstance) -> None | SetContextPropagate: 

175 """ 

176 Provide task_instance context to airflow task handler. 

177 

178 Generally speaking returns None. But if attr `maintain_propagate` has 

179 been set to propagate, then returns sentinel MAINTAIN_PROPAGATE. This 

180 has the effect of overriding the default behavior to set `propagate` 

181 to False whenever set_context is called. At time of writing, this 

182 functionality is only used in unit testing. 

183 

184 :param ti: task instance object 

185 """ 

186 local_loc = self._init_file(ti) 

187 self.handler = NonCachingFileHandler(local_loc, encoding="utf-8") 

188 if self.formatter: 

189 self.handler.setFormatter(self.formatter) 

190 self.handler.setLevel(self.level) 

191 return SetContextPropagate.MAINTAIN_PROPAGATE if self.maintain_propagate else None 

192 

193 @staticmethod 

194 def add_triggerer_suffix(full_path, job_id=None): 

195 """ 

196 Helper for deriving trigger log filename from task log filename. 

197 

198 E.g. given /path/to/file.log returns /path/to/file.log.trigger.123.log, where 123 

199 is the triggerer id. We use the triggerer ID instead of trigger ID to distinguish 

200 the files because, rarely, the same trigger could get picked up by two different 

201 triggerer instances. 

202 """ 

203 full_path = Path(full_path).as_posix() 

204 full_path += f".{LogType.TRIGGER.value}" 

205 if job_id: 

206 full_path += f".{job_id}.log" 

207 return full_path 

208 

209 def emit(self, record): 

210 if self.handler: 

211 self.handler.emit(record) 

212 

213 def flush(self): 

214 if self.handler: 

215 self.handler.flush() 

216 

217 def close(self): 

218 if self.handler: 

219 self.handler.close() 

220 

221 def _render_filename(self, ti: TaskInstance, try_number: int) -> str: 

222 """Returns the worker log filename.""" 

223 with create_session() as session: 

224 dag_run = ti.get_dagrun(session=session) 

225 template = dag_run.get_log_template(session=session).filename 

226 str_tpl, jinja_tpl = parse_template_string(template) 

227 

228 if jinja_tpl: 

229 if hasattr(ti, "task"): 

230 context = ti.get_template_context(session=session) 

231 else: 

232 context = Context(ti=ti, ts=dag_run.logical_date.isoformat()) 

233 context["try_number"] = try_number 

234 return render_template_to_string(jinja_tpl, context) 

235 

236 if str_tpl: 

237 try: 

238 dag = ti.task.dag 

239 except AttributeError: # ti.task is not always set. 

240 data_interval = (dag_run.data_interval_start, dag_run.data_interval_end) 

241 else: 

242 if TYPE_CHECKING: 

243 assert dag is not None 

244 data_interval = dag.get_run_data_interval(dag_run) 

245 if data_interval[0]: 

246 data_interval_start = data_interval[0].isoformat() 

247 else: 

248 data_interval_start = "" 

249 if data_interval[1]: 

250 data_interval_end = data_interval[1].isoformat() 

251 else: 

252 data_interval_end = "" 

253 return str_tpl.format( 

254 dag_id=ti.dag_id, 

255 task_id=ti.task_id, 

256 run_id=ti.run_id, 

257 data_interval_start=data_interval_start, 

258 data_interval_end=data_interval_end, 

259 execution_date=ti.get_dagrun().logical_date.isoformat(), 

260 try_number=try_number, 

261 ) 

262 else: 

263 raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen") 

264 

265 def _read_grouped_logs(self): 

266 return False 

267 

268 @cached_property 

269 def _executor_get_task_log(self) -> Callable[[TaskInstance, int], tuple[list[str], list[str]]]: 

270 """This cached property avoids loading executor repeatedly.""" 

271 executor = ExecutorLoader.get_default_executor() 

272 return executor.get_task_log 

273 

274 def _read( 

275 self, 

276 ti: TaskInstance, 

277 try_number: int, 

278 metadata: dict[str, Any] | None = None, 

279 ): 

280 """ 

281 Template method that contains custom logic of reading 

282 logs given the try_number. 

283 

284 :param ti: task instance record 

285 :param try_number: current try_number to read log from 

286 :param metadata: log metadata, 

287 can be used for steaming log reading and auto-tailing. 

288 Following attributes are used: 

289 log_pos: (absolute) Char position to which the log 

290 which was retrieved in previous calls, this 

291 part will be skipped and only following test 

292 returned to be added to tail. 

293 :return: log message as a string and metadata. 

294 Following attributes are used in metadata: 

295 end_of_log: Boolean, True if end of log is reached or False 

296 if further calls might get more log text. 

297 This is determined by the status of the TaskInstance 

298 log_pos: (absolute) Char position to which the log is retrieved 

299 """ 

300 # Task instance here might be different from task instance when 

301 # initializing the handler. Thus explicitly getting log location 

302 # is needed to get correct log path. 

303 worker_log_rel_path = self._render_filename(ti, try_number) 

304 messages_list: list[str] = [] 

305 remote_logs: list[str] = [] 

306 local_logs: list[str] = [] 

307 executor_messages: list[str] = [] 

308 executor_logs: list[str] = [] 

309 served_logs: list[str] = [] 

310 with suppress(NotImplementedError): 

311 remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata) 

312 messages_list.extend(remote_messages) 

313 if ti.state == TaskInstanceState.RUNNING: 

314 response = self._executor_get_task_log(ti, try_number) 

315 if response: 

316 executor_messages, executor_logs = response 

317 if executor_messages: 

318 messages_list.extend(executor_messages) 

319 if not (remote_logs and ti.state not in State.unfinished): 

320 # when finished, if we have remote logs, no need to check local 

321 worker_log_full_path = Path(self.local_base, worker_log_rel_path) 

322 local_messages, local_logs = self._read_from_local(worker_log_full_path) 

323 messages_list.extend(local_messages) 

324 if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) and not executor_messages: 

325 served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) 

326 messages_list.extend(served_messages) 

327 elif ti.state not in State.unfinished and not (local_logs or remote_logs): 

328 # ordinarily we don't check served logs, with the assumption that users set up 

329 # remote logging or shared drive for logs for persistence, but that's not always true 

330 # so even if task is done, if no local logs or remote logs are found, we'll check the worker 

331 served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) 

332 messages_list.extend(served_messages) 

333 

334 logs = "\n".join( 

335 _interleave_logs( 

336 *local_logs, 

337 *remote_logs, 

338 *(executor_logs or []), 

339 *served_logs, 

340 ) 

341 ) 

342 log_pos = len(logs) 

343 messages = "".join([f"*** {x}\n" for x in messages_list]) 

344 end_of_log = ti.try_number != try_number or ti.state not in [State.RUNNING, State.DEFERRED] 

345 if metadata and "log_pos" in metadata: 

346 previous_chars = metadata["log_pos"] 

347 logs = logs[previous_chars:] # Cut off previously passed log test as new tail 

348 out_message = logs if "log_pos" in (metadata or {}) else messages + logs 

349 return out_message, {"end_of_log": end_of_log, "log_pos": log_pos} 

350 

351 @staticmethod 

352 def _get_pod_namespace(ti: TaskInstance): 

353 pod_override = ti.executor_config.get("pod_override") 

354 namespace = None 

355 with suppress(Exception): 

356 namespace = pod_override.metadata.namespace 

357 return namespace or conf.get("kubernetes_executor", "namespace", fallback="default") 

358 

359 def _get_log_retrieval_url( 

360 self, ti: TaskInstance, log_relative_path: str, log_type: LogType | None = None 

361 ) -> tuple[str, str]: 

362 """Given TI, generate URL with which to fetch logs from service log server.""" 

363 if log_type == LogType.TRIGGER: 

364 if not ti.triggerer_job: 

365 raise RuntimeError("Could not build triggerer log URL; no triggerer job.") 

366 config_key = "triggerer_log_server_port" 

367 config_default = 8794 

368 hostname = ti.triggerer_job.hostname 

369 log_relative_path = self.add_triggerer_suffix(log_relative_path, job_id=ti.triggerer_job.id) 

370 else: 

371 hostname = ti.hostname 

372 config_key = "worker_log_server_port" 

373 config_default = 8793 

374 return ( 

375 urljoin( 

376 f"http://{hostname}:{conf.get('logging', config_key, fallback=config_default)}/log/", 

377 log_relative_path, 

378 ), 

379 log_relative_path, 

380 ) 

381 

382 def read(self, task_instance, try_number=None, metadata=None): 

383 """ 

384 Read logs of given task instance from local machine. 

385 

386 :param task_instance: task instance object 

387 :param try_number: task instance try_number to read logs from. If None 

388 it returns all logs separated by try_number 

389 :param metadata: log metadata, can be used for steaming log reading and auto-tailing. 

390 :return: a list of listed tuples which order log string by host 

391 """ 

392 # Task instance increments its try number when it starts to run. 

393 # So the log for a particular task try will only show up when 

394 # try number gets incremented in DB, i.e logs produced the time 

395 # after cli run and before try_number + 1 in DB will not be displayed. 

396 if try_number is None: 

397 next_try = task_instance.next_try_number 

398 try_numbers = list(range(1, next_try)) 

399 elif try_number < 1: 

400 logs = [ 

401 [("default_host", f"Error fetching the logs. Try number {try_number} is invalid.")], 

402 ] 

403 return logs, [{"end_of_log": True}] 

404 else: 

405 try_numbers = [try_number] 

406 

407 logs = [""] * len(try_numbers) 

408 metadata_array = [{}] * len(try_numbers) 

409 

410 # subclasses implement _read and may not have log_type, which was added recently 

411 for i, try_number_element in enumerate(try_numbers): 

412 log, out_metadata = self._read(task_instance, try_number_element, metadata) 

413 # es_task_handler return logs grouped by host. wrap other handler returning log string 

414 # with default/ empty host so that UI can render the response in the same way 

415 logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)] 

416 metadata_array[i] = out_metadata 

417 

418 return logs, metadata_array 

419 

420 def _prepare_log_folder(self, directory: Path): 

421 """ 

422 Prepare the log folder and ensure its mode is as configured. 

423 

424 To handle log writing when tasks are impersonated, the log files need to 

425 be writable by the user that runs the Airflow command and the user 

426 that is impersonated. This is mainly to handle corner cases with the 

427 SubDagOperator. When the SubDagOperator is run, all of the operators 

428 run under the impersonated user and create appropriate log files 

429 as the impersonated user. However, if the user manually runs tasks 

430 of the SubDagOperator through the UI, then the log files are created 

431 by the user that runs the Airflow command. For example, the Airflow 

432 run command may be run by the `airflow_sudoable` user, but the Airflow 

433 tasks may be run by the `airflow` user. If the log files are not 

434 writable by both users, then it's possible that re-running a task 

435 via the UI (or vice versa) results in a permission error as the task 

436 tries to write to a log file created by the other user. 

437 

438 We leave it up to the user to manage their permissions by exposing configuration for both 

439 new folders and new log files. Default is to make new log folders and files group-writeable 

440 to handle most common impersonation use cases. The requirement in this case will be to make 

441 sure that the same group is set as default group for both - impersonated user and main airflow 

442 user. 

443 """ 

444 new_folder_permissions = int( 

445 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 8 

446 ) 

447 directory.mkdir(mode=new_folder_permissions, parents=True, exist_ok=True) 

448 if directory.stat().st_mode % 0o1000 != new_folder_permissions % 0o1000: 

449 print(f"Changing {directory} permission to {new_folder_permissions}") 

450 try: 

451 directory.chmod(new_folder_permissions) 

452 except PermissionError as e: 

453 # In some circumstances (depends on user and filesystem) we might not be able to 

454 # change the permission for the folder (when the folder was created by another user 

455 # before or when the filesystem does not allow to change permission). We should not 

456 # fail in this case but rather ignore it. 

457 print(f"Failed to change {directory} permission to {new_folder_permissions}: {e}") 

458 pass 

459 

460 def _init_file(self, ti): 

461 """ 

462 Create log directory and give it permissions that are configured. See above _prepare_log_folder 

463 method for more detailed explanation. 

464 

465 :param ti: task instance object 

466 :return: relative log path of the given task instance 

467 """ 

468 new_file_permissions = int( 

469 conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"), 8 

470 ) 

471 local_relative_path = self._render_filename(ti, ti.try_number) 

472 full_path = os.path.join(self.local_base, local_relative_path) 

473 if ti.is_trigger_log_context is True: 

474 # if this is true, we're invoked via set_context in the context of 

475 # setting up individual trigger logging. return trigger log path. 

476 full_path = self.add_triggerer_suffix(full_path=full_path, job_id=ti.triggerer_job.id) 

477 self._prepare_log_folder(Path(full_path).parent) 

478 

479 if not os.path.exists(full_path): 

480 open(full_path, "a").close() 

481 try: 

482 os.chmod(full_path, new_file_permissions) 

483 except OSError as e: 

484 logging.warning("OSError while changing ownership of the log file. ", e) 

485 

486 return full_path 

487 

488 @staticmethod 

489 def _read_from_local(worker_log_path: Path) -> tuple[list[str], list[str]]: 

490 messages = [] 

491 logs = [] 

492 files = list(worker_log_path.parent.glob(worker_log_path.name + "*")) 

493 if files: 

494 messages.extend(["Found local files:", *[f" * {x}" for x in sorted(files)]]) 

495 for file in sorted(files): 

496 logs.append(Path(file).read_text()) 

497 return messages, logs 

498 

499 def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], list[str]]: 

500 messages = [] 

501 logs = [] 

502 try: 

503 log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER 

504 url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type) 

505 response = _fetch_logs_from_service(url, rel_path) 

506 if response.status_code == 403: 

507 messages.append( 

508 "!!!! Please make sure that all your Airflow components (e.g. " 

509 "schedulers, webservers, workers and triggerer) have " 

510 "the same 'secret_key' configured in 'webserver' section and " 

511 "time is synchronized on all your machines (for example with ntpd)\n" 

512 "See more at https://airflow.apache.org/docs/apache-airflow/" 

513 "stable/configurations-ref.html#secret-key" 

514 ) 

515 # Check if the resource was properly fetched 

516 response.raise_for_status() 

517 if response.text: 

518 messages.append(f"Found logs served from host {url}") 

519 logs.append(response.text) 

520 except Exception as e: 

521 messages.append(f"Could not read served logs: {str(e)}") 

522 logger.exception("Could not read served logs") 

523 return messages, logs 

524 

525 def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], list[str]]: 

526 """ 

527 Implement in subclasses to read from the remote service. 

528 

529 This method should return two lists, messages and logs. 

530 

531 * Each element in the messages list should be a single message, 

532 such as, "reading from x file". 

533 * Each element in the logs list should be the content of one file. 

534 """ 

535 raise NotImplementedError