Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/log/file_task_handler.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

284 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""File logging handler for tasks.""" 

19 

20from __future__ import annotations 

21 

22import inspect 

23import logging 

24import os 

25import warnings 

26from contextlib import suppress 

27from enum import Enum 

28from functools import cached_property 

29from pathlib import Path 

30from typing import TYPE_CHECKING, Any, Callable, Iterable 

31from urllib.parse import urljoin 

32 

33import pendulum 

34 

35from airflow.api_internal.internal_api_call import internal_api_call 

36from airflow.configuration import conf 

37from airflow.exceptions import AirflowException, RemovedInAirflow3Warning 

38from airflow.executors.executor_loader import ExecutorLoader 

39from airflow.utils.context import Context 

40from airflow.utils.helpers import parse_template_string, render_template_to_string 

41from airflow.utils.log.logging_mixin import SetContextPropagate 

42from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler 

43from airflow.utils.session import provide_session 

44from airflow.utils.state import State, TaskInstanceState 

45 

46if TYPE_CHECKING: 

47 from pendulum import DateTime 

48 

49 from airflow.models import DagRun 

50 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey 

51 from airflow.serialization.pydantic.dag_run import DagRunPydantic 

52 from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic 

53 

54logger = logging.getLogger(__name__) 

55 

56 

57class LogType(str, Enum): 

58 """ 

59 Type of service from which we retrieve logs. 

60 

61 :meta private: 

62 """ 

63 

64 TRIGGER = "trigger" 

65 WORKER = "worker" 

66 

67 

68def _set_task_deferred_context_var(): 

69 """ 

70 Tell task log handler that task exited with deferral. 

71 

72 This exists for the sole purpose of telling elasticsearch handler not to 

73 emit end_of_log mark after task deferral. 

74 

75 Depending on how the task is run, we may need to set this in task command or in local task job. 

76 Kubernetes executor requires the local task job invocation; local executor requires the task 

77 command invocation. 

78 

79 :meta private: 

80 """ 

81 logger = logging.getLogger() 

82 with suppress(StopIteration): 

83 h = next(h for h in logger.handlers if hasattr(h, "ctx_task_deferred")) 

84 h.ctx_task_deferred = True 

85 

86 

87def _fetch_logs_from_service(url, log_relative_path): 

88 # Import occurs in function scope for perf. Ref: https://github.com/apache/airflow/pull/21438 

89 import httpx 

90 

91 from airflow.utils.jwt_signer import JWTSigner 

92 

93 timeout = conf.getint("webserver", "log_fetch_timeout_sec", fallback=None) 

94 signer = JWTSigner( 

95 secret_key=conf.get("webserver", "secret_key"), 

96 expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30), 

97 audience="task-instance-logs", 

98 ) 

99 response = httpx.get( 

100 url, 

101 timeout=timeout, 

102 headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})}, 

103 ) 

104 response.encoding = "utf-8" 

105 return response 

106 

107 

108_parse_timestamp = conf.getimport("logging", "interleave_timestamp_parser", fallback=None) 

109 

110if not _parse_timestamp: 

111 

112 def _parse_timestamp(line: str): 

113 timestamp_str, _ = line.split(" ", 1) 

114 return pendulum.parse(timestamp_str.strip("[]")) 

115 

116 

117def _parse_timestamps_in_log_file(lines: Iterable[str]): 

118 timestamp = None 

119 next_timestamp = None 

120 for idx, line in enumerate(lines): 

121 if line: 

122 with suppress(Exception): 

123 # next_timestamp unchanged if line can't be parsed 

124 next_timestamp = _parse_timestamp(line) 

125 if next_timestamp: 

126 timestamp = next_timestamp 

127 yield timestamp, idx, line 

128 

129 

130def _interleave_logs(*logs): 

131 records = [] 

132 for log in logs: 

133 records.extend(_parse_timestamps_in_log_file(log.splitlines())) 

134 last = None 

135 for _, _, v in sorted( 

136 records, key=lambda x: (x[0], x[1]) if x[0] else (pendulum.datetime(2000, 1, 1), x[1]) 

137 ): 

138 if v != last: # dedupe 

139 yield v 

140 last = v 

141 

142 

143def _ensure_ti(ti: TaskInstanceKey | TaskInstance | TaskInstancePydantic, session) -> TaskInstance: 

144 """Given TI | TIKey, return a TI object. 

145 

146 Will raise exception if no TI is found in the database. 

147 """ 

148 from airflow.models.taskinstance import TaskInstance 

149 

150 if isinstance(ti, TaskInstance): 

151 return ti 

152 val = ( 

153 session.query(TaskInstance) 

154 .filter( 

155 TaskInstance.task_id == ti.task_id, 

156 TaskInstance.dag_id == ti.dag_id, 

157 TaskInstance.run_id == ti.run_id, 

158 TaskInstance.map_index == ti.map_index, 

159 ) 

160 .one_or_none() 

161 ) 

162 if not val: 

163 raise AirflowException(f"Could not find TaskInstance for {ti}") 

164 val.try_number = ti.try_number 

165 return val 

166 

167 

168class FileTaskHandler(logging.Handler): 

169 """ 

170 FileTaskHandler is a python log handler that handles and reads task instance logs. 

171 

172 It creates and delegates log handling to `logging.FileHandler` after receiving task 

173 instance context. It reads logs from task instance's host machine. 

174 

175 :param base_log_folder: Base log folder to place logs. 

176 :param filename_template: template filename string 

177 """ 

178 

179 trigger_should_wrap = True 

180 inherits_from_empty_operator_log_message = ( 

181 "Operator inherits from empty operator and thus does not have logs" 

182 ) 

183 

184 def __init__(self, base_log_folder: str, filename_template: str | None = None): 

185 super().__init__() 

186 self.handler: logging.Handler | None = None 

187 self.local_base = base_log_folder 

188 if filename_template is not None: 

189 warnings.warn( 

190 "Passing filename_template to a log handler is deprecated and has no effect", 

191 RemovedInAirflow3Warning, 

192 # We want to reference the stack that actually instantiates the 

193 # handler, not the one that calls super()__init__. 

194 stacklevel=(2 if type(self) == FileTaskHandler else 3), 

195 ) 

196 self.maintain_propagate: bool = False 

197 """ 

198 If true, overrides default behavior of setting propagate=False 

199 

200 :meta private: 

201 """ 

202 

203 self.ctx_task_deferred = False 

204 """ 

205 If true, task exited with deferral to trigger. 

206 

207 Some handlers emit "end of log" markers, and may not wish to do so when task defers. 

208 """ 

209 

210 def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None | SetContextPropagate: 

211 """ 

212 Provide task_instance context to airflow task handler. 

213 

214 Generally speaking returns None. But if attr `maintain_propagate` has 

215 been set to propagate, then returns sentinel MAINTAIN_PROPAGATE. This 

216 has the effect of overriding the default behavior to set `propagate` 

217 to False whenever set_context is called. At time of writing, this 

218 functionality is only used in unit testing. 

219 

220 :param ti: task instance object 

221 :param identifier: if set, adds suffix to log file. For use when relaying exceptional messages 

222 to task logs from a context other than task or trigger run 

223 """ 

224 local_loc = self._init_file(ti, identifier=identifier) 

225 self.handler = NonCachingFileHandler(local_loc, encoding="utf-8") 

226 if self.formatter: 

227 self.handler.setFormatter(self.formatter) 

228 self.handler.setLevel(self.level) 

229 return SetContextPropagate.MAINTAIN_PROPAGATE if self.maintain_propagate else None 

230 

231 @cached_property 

232 def supports_task_context_logging(self) -> bool: 

233 return "identifier" in inspect.signature(self.set_context).parameters 

234 

235 @staticmethod 

236 def add_triggerer_suffix(full_path, job_id=None): 

237 """ 

238 Derive trigger log filename from task log filename. 

239 

240 E.g. given /path/to/file.log returns /path/to/file.log.trigger.123.log, where 123 

241 is the triggerer id. We use the triggerer ID instead of trigger ID to distinguish 

242 the files because, rarely, the same trigger could get picked up by two different 

243 triggerer instances. 

244 """ 

245 full_path = Path(full_path).as_posix() 

246 full_path += f".{LogType.TRIGGER.value}" 

247 if job_id: 

248 full_path += f".{job_id}.log" 

249 return full_path 

250 

251 def emit(self, record): 

252 if self.handler: 

253 self.handler.emit(record) 

254 

255 def flush(self): 

256 if self.handler: 

257 self.handler.flush() 

258 

259 def close(self): 

260 if self.handler: 

261 self.handler.close() 

262 

263 @staticmethod 

264 @internal_api_call 

265 @provide_session 

266 def _render_filename_db_access( 

267 *, ti, try_number: int, session=None 

268 ) -> tuple[DagRun | DagRunPydantic, TaskInstance | TaskInstancePydantic, str | None, str | None]: 

269 ti = _ensure_ti(ti, session) 

270 dag_run = ti.get_dagrun(session=session) 

271 template = dag_run.get_log_template(session=session).filename 

272 str_tpl, jinja_tpl = parse_template_string(template) 

273 filename = None 

274 if jinja_tpl: 

275 if getattr(ti, "task", None) is not None: 

276 context = ti.get_template_context(session=session) 

277 else: 

278 context = Context(ti=ti, ts=dag_run.logical_date.isoformat()) 

279 context["try_number"] = try_number 

280 filename = render_template_to_string(jinja_tpl, context) 

281 return dag_run, ti, str_tpl, filename 

282 

283 def _render_filename( 

284 self, ti: TaskInstance | TaskInstanceKey | TaskInstancePydantic, try_number: int 

285 ) -> str: 

286 """Return the worker log filename.""" 

287 dag_run, ti, str_tpl, filename = self._render_filename_db_access(ti=ti, try_number=try_number) 

288 if filename: 

289 return filename 

290 if str_tpl: 

291 if ti.task is not None and ti.task.dag is not None: 

292 dag = ti.task.dag 

293 data_interval = dag.get_run_data_interval(dag_run) 

294 else: 

295 from airflow.timetables.base import DataInterval 

296 

297 if TYPE_CHECKING: 

298 assert isinstance(dag_run.data_interval_start, DateTime) 

299 assert isinstance(dag_run.data_interval_end, DateTime) 

300 data_interval = DataInterval(dag_run.data_interval_start, dag_run.data_interval_end) 

301 if data_interval[0]: 

302 data_interval_start = data_interval[0].isoformat() 

303 else: 

304 data_interval_start = "" 

305 if data_interval[1]: 

306 data_interval_end = data_interval[1].isoformat() 

307 else: 

308 data_interval_end = "" 

309 return str_tpl.format( 

310 dag_id=ti.dag_id, 

311 task_id=ti.task_id, 

312 run_id=ti.run_id, 

313 data_interval_start=data_interval_start, 

314 data_interval_end=data_interval_end, 

315 execution_date=ti.get_dagrun().logical_date.isoformat(), 

316 try_number=try_number, 

317 ) 

318 else: 

319 raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen") 

320 

321 def _read_grouped_logs(self): 

322 return False 

323 

324 @cached_property 

325 def _executor_get_task_log(self) -> Callable[[TaskInstance, int], tuple[list[str], list[str]]]: 

326 """This cached property avoids loading executor repeatedly.""" 

327 executor = ExecutorLoader.get_default_executor() 

328 return executor.get_task_log 

329 

330 def _read( 

331 self, 

332 ti: TaskInstance, 

333 try_number: int, 

334 metadata: dict[str, Any] | None = None, 

335 ): 

336 """ 

337 Template method that contains custom logic of reading logs given the try_number. 

338 

339 :param ti: task instance record 

340 :param try_number: current try_number to read log from 

341 :param metadata: log metadata, 

342 can be used for steaming log reading and auto-tailing. 

343 Following attributes are used: 

344 log_pos: (absolute) Char position to which the log 

345 which was retrieved in previous calls, this 

346 part will be skipped and only following test 

347 returned to be added to tail. 

348 :return: log message as a string and metadata. 

349 Following attributes are used in metadata: 

350 end_of_log: Boolean, True if end of log is reached or False 

351 if further calls might get more log text. 

352 This is determined by the status of the TaskInstance 

353 log_pos: (absolute) Char position to which the log is retrieved 

354 """ 

355 # Task instance here might be different from task instance when 

356 # initializing the handler. Thus explicitly getting log location 

357 # is needed to get correct log path. 

358 worker_log_rel_path = self._render_filename(ti, try_number) 

359 messages_list: list[str] = [] 

360 remote_logs: list[str] = [] 

361 local_logs: list[str] = [] 

362 executor_messages: list[str] = [] 

363 executor_logs: list[str] = [] 

364 served_logs: list[str] = [] 

365 is_in_running_or_deferred = ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) 

366 with suppress(NotImplementedError): 

367 remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata) 

368 messages_list.extend(remote_messages) 

369 if ti.state == TaskInstanceState.RUNNING: 

370 response = self._executor_get_task_log(ti, try_number) 

371 if response: 

372 executor_messages, executor_logs = response 

373 if executor_messages: 

374 messages_list.extend(executor_messages) 

375 if not (remote_logs and ti.state not in State.unfinished): 

376 # when finished, if we have remote logs, no need to check local 

377 worker_log_full_path = Path(self.local_base, worker_log_rel_path) 

378 local_messages, local_logs = self._read_from_local(worker_log_full_path) 

379 messages_list.extend(local_messages) 

380 if is_in_running_or_deferred and not executor_messages and not remote_logs: 

381 # While task instance is still running and we don't have either executor nor remote logs, look for served logs 

382 # This is for cases when users have not setup remote logging nor shared drive for logs 

383 served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) 

384 messages_list.extend(served_messages) 

385 elif ti.state not in State.unfinished and not (local_logs or remote_logs): 

386 # ordinarily we don't check served logs, with the assumption that users set up 

387 # remote logging or shared drive for logs for persistence, but that's not always true 

388 # so even if task is done, if no local logs or remote logs are found, we'll check the worker 

389 served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) 

390 messages_list.extend(served_messages) 

391 

392 logs = "\n".join( 

393 _interleave_logs( 

394 *local_logs, 

395 *remote_logs, 

396 *(executor_logs or []), 

397 *served_logs, 

398 ) 

399 ) 

400 log_pos = len(logs) 

401 messages = "".join([f"*** {x}\n" for x in messages_list]) 

402 end_of_log = ti.try_number != try_number or not is_in_running_or_deferred 

403 if metadata and "log_pos" in metadata: 

404 previous_chars = metadata["log_pos"] 

405 logs = logs[previous_chars:] # Cut off previously passed log test as new tail 

406 out_message = logs if "log_pos" in (metadata or {}) else messages + logs 

407 return out_message, {"end_of_log": end_of_log, "log_pos": log_pos} 

408 

409 @staticmethod 

410 def _get_pod_namespace(ti: TaskInstance): 

411 pod_override = ti.executor_config.get("pod_override") 

412 namespace = None 

413 with suppress(Exception): 

414 namespace = pod_override.metadata.namespace 

415 return namespace or conf.get("kubernetes_executor", "namespace") 

416 

417 def _get_log_retrieval_url( 

418 self, ti: TaskInstance, log_relative_path: str, log_type: LogType | None = None 

419 ) -> tuple[str, str]: 

420 """Given TI, generate URL with which to fetch logs from service log server.""" 

421 if log_type == LogType.TRIGGER: 

422 if not ti.triggerer_job: 

423 raise RuntimeError("Could not build triggerer log URL; no triggerer job.") 

424 config_key = "triggerer_log_server_port" 

425 config_default = 8794 

426 hostname = ti.triggerer_job.hostname 

427 log_relative_path = self.add_triggerer_suffix(log_relative_path, job_id=ti.triggerer_job.id) 

428 else: 

429 hostname = ti.hostname 

430 config_key = "worker_log_server_port" 

431 config_default = 8793 

432 return ( 

433 urljoin( 

434 f"http://{hostname}:{conf.get('logging', config_key, fallback=config_default)}/log/", 

435 log_relative_path, 

436 ), 

437 log_relative_path, 

438 ) 

439 

440 def read(self, task_instance, try_number=None, metadata=None): 

441 """ 

442 Read logs of given task instance from local machine. 

443 

444 :param task_instance: task instance object 

445 :param try_number: task instance try_number to read logs from. If None 

446 it returns all logs separated by try_number 

447 :param metadata: log metadata, can be used for steaming log reading and auto-tailing. 

448 :return: a list of listed tuples which order log string by host 

449 """ 

450 # Task instance increments its try number when it starts to run. 

451 # So the log for a particular task try will only show up when 

452 # try number gets incremented in DB, i.e logs produced the time 

453 # after cli run and before try_number + 1 in DB will not be displayed. 

454 if try_number is None: 

455 next_try = task_instance.next_try_number 

456 try_numbers = list(range(1, next_try)) 

457 elif try_number < 1: 

458 logs = [ 

459 [("default_host", f"Error fetching the logs. Try number {try_number} is invalid.")], 

460 ] 

461 return logs, [{"end_of_log": True}] 

462 else: 

463 try_numbers = [try_number] 

464 

465 logs = [""] * len(try_numbers) 

466 metadata_array = [{}] * len(try_numbers) 

467 

468 # subclasses implement _read and may not have log_type, which was added recently 

469 for i, try_number_element in enumerate(try_numbers): 

470 log, out_metadata = self._read(task_instance, try_number_element, metadata) 

471 # es_task_handler return logs grouped by host. wrap other handler returning log string 

472 # with default/ empty host so that UI can render the response in the same way 

473 logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)] 

474 metadata_array[i] = out_metadata 

475 

476 return logs, metadata_array 

477 

478 @staticmethod 

479 def _prepare_log_folder(directory: Path, new_folder_permissions: int): 

480 """ 

481 Prepare the log folder and ensure its mode is as configured. 

482 

483 To handle log writing when tasks are impersonated, the log files need to 

484 be writable by the user that runs the Airflow command and the user 

485 that is impersonated. This is mainly to handle corner cases with the 

486 SubDagOperator. When the SubDagOperator is run, all of the operators 

487 run under the impersonated user and create appropriate log files 

488 as the impersonated user. However, if the user manually runs tasks 

489 of the SubDagOperator through the UI, then the log files are created 

490 by the user that runs the Airflow command. For example, the Airflow 

491 run command may be run by the `airflow_sudoable` user, but the Airflow 

492 tasks may be run by the `airflow` user. If the log files are not 

493 writable by both users, then it's possible that re-running a task 

494 via the UI (or vice versa) results in a permission error as the task 

495 tries to write to a log file created by the other user. 

496 

497 We leave it up to the user to manage their permissions by exposing configuration for both 

498 new folders and new log files. Default is to make new log folders and files group-writeable 

499 to handle most common impersonation use cases. The requirement in this case will be to make 

500 sure that the same group is set as default group for both - impersonated user and main airflow 

501 user. 

502 """ 

503 for parent in reversed(directory.parents): 

504 parent.mkdir(mode=new_folder_permissions, exist_ok=True) 

505 directory.mkdir(mode=new_folder_permissions, exist_ok=True) 

506 

507 def _init_file(self, ti, *, identifier: str | None = None): 

508 """ 

509 Create log directory and give it permissions that are configured. 

510 

511 See above _prepare_log_folder method for more detailed explanation. 

512 

513 :param ti: task instance object 

514 :return: relative log path of the given task instance 

515 """ 

516 new_file_permissions = int( 

517 conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"), 8 

518 ) 

519 local_relative_path = self._render_filename(ti, ti.try_number) 

520 full_path = os.path.join(self.local_base, local_relative_path) 

521 if identifier: 

522 full_path += f".{identifier}.log" 

523 elif ti.is_trigger_log_context is True: 

524 # if this is true, we're invoked via set_context in the context of 

525 # setting up individual trigger logging. return trigger log path. 

526 full_path = self.add_triggerer_suffix(full_path=full_path, job_id=ti.triggerer_job.id) 

527 new_folder_permissions = int( 

528 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 8 

529 ) 

530 self._prepare_log_folder(Path(full_path).parent, new_folder_permissions) 

531 

532 if not os.path.exists(full_path): 

533 open(full_path, "a").close() 

534 try: 

535 os.chmod(full_path, new_file_permissions) 

536 except OSError as e: 

537 logger.warning("OSError while changing ownership of the log file. ", e) 

538 

539 return full_path 

540 

541 @staticmethod 

542 def _read_from_local(worker_log_path: Path) -> tuple[list[str], list[str]]: 

543 messages = [] 

544 paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*")) 

545 if paths: 

546 messages.append("Found local files:") 

547 messages.extend(f" * {x}" for x in paths) 

548 logs = [file.read_text() for file in paths] 

549 return messages, logs 

550 

551 def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], list[str]]: 

552 messages = [] 

553 logs = [] 

554 try: 

555 log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER 

556 url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type) 

557 response = _fetch_logs_from_service(url, rel_path) 

558 if response.status_code == 403: 

559 messages.append( 

560 "!!!! Please make sure that all your Airflow components (e.g. " 

561 "schedulers, webservers, workers and triggerer) have " 

562 "the same 'secret_key' configured in 'webserver' section and " 

563 "time is synchronized on all your machines (for example with ntpd)\n" 

564 "See more at https://airflow.apache.org/docs/apache-airflow/" 

565 "stable/configurations-ref.html#secret-key" 

566 ) 

567 # Check if the resource was properly fetched 

568 response.raise_for_status() 

569 if response.text: 

570 messages.append(f"Found logs served from host {url}") 

571 logs.append(response.text) 

572 except Exception as e: 

573 from httpx import UnsupportedProtocol 

574 

575 if isinstance(e, UnsupportedProtocol) and ti.task.inherits_from_empty_operator is True: 

576 messages.append(self.inherits_from_empty_operator_log_message) 

577 else: 

578 messages.append(f"Could not read served logs: {e}") 

579 logger.exception("Could not read served logs") 

580 return messages, logs 

581 

582 def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], list[str]]: 

583 """ 

584 Implement in subclasses to read from the remote service. 

585 

586 This method should return two lists, messages and logs. 

587 

588 * Each element in the messages list should be a single message, 

589 such as, "reading from x file". 

590 * Each element in the logs list should be the content of one file. 

591 """ 

592 raise NotImplementedError