Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/log/file_task_handler.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""File logging handler for tasks."""
20from __future__ import annotations
22import inspect
23import logging
24import os
25import warnings
26from contextlib import suppress
27from enum import Enum
28from functools import cached_property
29from pathlib import Path
30from typing import TYPE_CHECKING, Any, Callable, Iterable
31from urllib.parse import urljoin
33import pendulum
35from airflow.api_internal.internal_api_call import internal_api_call
36from airflow.configuration import conf
37from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
38from airflow.executors.executor_loader import ExecutorLoader
39from airflow.utils.context import Context
40from airflow.utils.helpers import parse_template_string, render_template_to_string
41from airflow.utils.log.logging_mixin import SetContextPropagate
42from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
43from airflow.utils.session import provide_session
44from airflow.utils.state import State, TaskInstanceState
46if TYPE_CHECKING:
47 from pendulum import DateTime
49 from airflow.models import DagRun
50 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
51 from airflow.serialization.pydantic.dag_run import DagRunPydantic
52 from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
54logger = logging.getLogger(__name__)
57class LogType(str, Enum):
58 """
59 Type of service from which we retrieve logs.
61 :meta private:
62 """
64 TRIGGER = "trigger"
65 WORKER = "worker"
68def _set_task_deferred_context_var():
69 """
70 Tell task log handler that task exited with deferral.
72 This exists for the sole purpose of telling elasticsearch handler not to
73 emit end_of_log mark after task deferral.
75 Depending on how the task is run, we may need to set this in task command or in local task job.
76 Kubernetes executor requires the local task job invocation; local executor requires the task
77 command invocation.
79 :meta private:
80 """
81 logger = logging.getLogger()
82 with suppress(StopIteration):
83 h = next(h for h in logger.handlers if hasattr(h, "ctx_task_deferred"))
84 h.ctx_task_deferred = True
87def _fetch_logs_from_service(url, log_relative_path):
88 # Import occurs in function scope for perf. Ref: https://github.com/apache/airflow/pull/21438
89 import httpx
91 from airflow.utils.jwt_signer import JWTSigner
93 timeout = conf.getint("webserver", "log_fetch_timeout_sec", fallback=None)
94 signer = JWTSigner(
95 secret_key=conf.get("webserver", "secret_key"),
96 expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30),
97 audience="task-instance-logs",
98 )
99 response = httpx.get(
100 url,
101 timeout=timeout,
102 headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
103 )
104 response.encoding = "utf-8"
105 return response
108_parse_timestamp = conf.getimport("logging", "interleave_timestamp_parser", fallback=None)
110if not _parse_timestamp:
112 def _parse_timestamp(line: str):
113 timestamp_str, _ = line.split(" ", 1)
114 return pendulum.parse(timestamp_str.strip("[]"))
117def _parse_timestamps_in_log_file(lines: Iterable[str]):
118 timestamp = None
119 next_timestamp = None
120 for idx, line in enumerate(lines):
121 if line:
122 with suppress(Exception):
123 # next_timestamp unchanged if line can't be parsed
124 next_timestamp = _parse_timestamp(line)
125 if next_timestamp:
126 timestamp = next_timestamp
127 yield timestamp, idx, line
130def _interleave_logs(*logs):
131 records = []
132 for log in logs:
133 records.extend(_parse_timestamps_in_log_file(log.splitlines()))
134 last = None
135 for _, _, v in sorted(
136 records, key=lambda x: (x[0], x[1]) if x[0] else (pendulum.datetime(2000, 1, 1), x[1])
137 ):
138 if v != last: # dedupe
139 yield v
140 last = v
143def _ensure_ti(ti: TaskInstanceKey | TaskInstance | TaskInstancePydantic, session) -> TaskInstance:
144 """Given TI | TIKey, return a TI object.
146 Will raise exception if no TI is found in the database.
147 """
148 from airflow.models.taskinstance import TaskInstance
150 if isinstance(ti, TaskInstance):
151 return ti
152 val = (
153 session.query(TaskInstance)
154 .filter(
155 TaskInstance.task_id == ti.task_id,
156 TaskInstance.dag_id == ti.dag_id,
157 TaskInstance.run_id == ti.run_id,
158 TaskInstance.map_index == ti.map_index,
159 )
160 .one_or_none()
161 )
162 if not val:
163 raise AirflowException(f"Could not find TaskInstance for {ti}")
164 val.try_number = ti.try_number
165 return val
168class FileTaskHandler(logging.Handler):
169 """
170 FileTaskHandler is a python log handler that handles and reads task instance logs.
172 It creates and delegates log handling to `logging.FileHandler` after receiving task
173 instance context. It reads logs from task instance's host machine.
175 :param base_log_folder: Base log folder to place logs.
176 :param filename_template: template filename string
177 """
179 trigger_should_wrap = True
180 inherits_from_empty_operator_log_message = (
181 "Operator inherits from empty operator and thus does not have logs"
182 )
184 def __init__(self, base_log_folder: str, filename_template: str | None = None):
185 super().__init__()
186 self.handler: logging.Handler | None = None
187 self.local_base = base_log_folder
188 if filename_template is not None:
189 warnings.warn(
190 "Passing filename_template to a log handler is deprecated and has no effect",
191 RemovedInAirflow3Warning,
192 # We want to reference the stack that actually instantiates the
193 # handler, not the one that calls super()__init__.
194 stacklevel=(2 if type(self) == FileTaskHandler else 3),
195 )
196 self.maintain_propagate: bool = False
197 """
198 If true, overrides default behavior of setting propagate=False
200 :meta private:
201 """
203 self.ctx_task_deferred = False
204 """
205 If true, task exited with deferral to trigger.
207 Some handlers emit "end of log" markers, and may not wish to do so when task defers.
208 """
210 def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None | SetContextPropagate:
211 """
212 Provide task_instance context to airflow task handler.
214 Generally speaking returns None. But if attr `maintain_propagate` has
215 been set to propagate, then returns sentinel MAINTAIN_PROPAGATE. This
216 has the effect of overriding the default behavior to set `propagate`
217 to False whenever set_context is called. At time of writing, this
218 functionality is only used in unit testing.
220 :param ti: task instance object
221 :param identifier: if set, adds suffix to log file. For use when relaying exceptional messages
222 to task logs from a context other than task or trigger run
223 """
224 local_loc = self._init_file(ti, identifier=identifier)
225 self.handler = NonCachingFileHandler(local_loc, encoding="utf-8")
226 if self.formatter:
227 self.handler.setFormatter(self.formatter)
228 self.handler.setLevel(self.level)
229 return SetContextPropagate.MAINTAIN_PROPAGATE if self.maintain_propagate else None
231 @cached_property
232 def supports_task_context_logging(self) -> bool:
233 return "identifier" in inspect.signature(self.set_context).parameters
235 @staticmethod
236 def add_triggerer_suffix(full_path, job_id=None):
237 """
238 Derive trigger log filename from task log filename.
240 E.g. given /path/to/file.log returns /path/to/file.log.trigger.123.log, where 123
241 is the triggerer id. We use the triggerer ID instead of trigger ID to distinguish
242 the files because, rarely, the same trigger could get picked up by two different
243 triggerer instances.
244 """
245 full_path = Path(full_path).as_posix()
246 full_path += f".{LogType.TRIGGER.value}"
247 if job_id:
248 full_path += f".{job_id}.log"
249 return full_path
251 def emit(self, record):
252 if self.handler:
253 self.handler.emit(record)
255 def flush(self):
256 if self.handler:
257 self.handler.flush()
259 def close(self):
260 if self.handler:
261 self.handler.close()
263 @staticmethod
264 @internal_api_call
265 @provide_session
266 def _render_filename_db_access(
267 *, ti, try_number: int, session=None
268 ) -> tuple[DagRun | DagRunPydantic, TaskInstance | TaskInstancePydantic, str | None, str | None]:
269 ti = _ensure_ti(ti, session)
270 dag_run = ti.get_dagrun(session=session)
271 template = dag_run.get_log_template(session=session).filename
272 str_tpl, jinja_tpl = parse_template_string(template)
273 filename = None
274 if jinja_tpl:
275 if getattr(ti, "task", None) is not None:
276 context = ti.get_template_context(session=session)
277 else:
278 context = Context(ti=ti, ts=dag_run.logical_date.isoformat())
279 context["try_number"] = try_number
280 filename = render_template_to_string(jinja_tpl, context)
281 return dag_run, ti, str_tpl, filename
283 def _render_filename(
284 self, ti: TaskInstance | TaskInstanceKey | TaskInstancePydantic, try_number: int
285 ) -> str:
286 """Return the worker log filename."""
287 dag_run, ti, str_tpl, filename = self._render_filename_db_access(ti=ti, try_number=try_number)
288 if filename:
289 return filename
290 if str_tpl:
291 if ti.task is not None and ti.task.dag is not None:
292 dag = ti.task.dag
293 data_interval = dag.get_run_data_interval(dag_run)
294 else:
295 from airflow.timetables.base import DataInterval
297 if TYPE_CHECKING:
298 assert isinstance(dag_run.data_interval_start, DateTime)
299 assert isinstance(dag_run.data_interval_end, DateTime)
300 data_interval = DataInterval(dag_run.data_interval_start, dag_run.data_interval_end)
301 if data_interval[0]:
302 data_interval_start = data_interval[0].isoformat()
303 else:
304 data_interval_start = ""
305 if data_interval[1]:
306 data_interval_end = data_interval[1].isoformat()
307 else:
308 data_interval_end = ""
309 return str_tpl.format(
310 dag_id=ti.dag_id,
311 task_id=ti.task_id,
312 run_id=ti.run_id,
313 data_interval_start=data_interval_start,
314 data_interval_end=data_interval_end,
315 execution_date=ti.get_dagrun().logical_date.isoformat(),
316 try_number=try_number,
317 )
318 else:
319 raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen")
321 def _read_grouped_logs(self):
322 return False
324 @cached_property
325 def _executor_get_task_log(self) -> Callable[[TaskInstance, int], tuple[list[str], list[str]]]:
326 """This cached property avoids loading executor repeatedly."""
327 executor = ExecutorLoader.get_default_executor()
328 return executor.get_task_log
330 def _read(
331 self,
332 ti: TaskInstance,
333 try_number: int,
334 metadata: dict[str, Any] | None = None,
335 ):
336 """
337 Template method that contains custom logic of reading logs given the try_number.
339 :param ti: task instance record
340 :param try_number: current try_number to read log from
341 :param metadata: log metadata,
342 can be used for steaming log reading and auto-tailing.
343 Following attributes are used:
344 log_pos: (absolute) Char position to which the log
345 which was retrieved in previous calls, this
346 part will be skipped and only following test
347 returned to be added to tail.
348 :return: log message as a string and metadata.
349 Following attributes are used in metadata:
350 end_of_log: Boolean, True if end of log is reached or False
351 if further calls might get more log text.
352 This is determined by the status of the TaskInstance
353 log_pos: (absolute) Char position to which the log is retrieved
354 """
355 # Task instance here might be different from task instance when
356 # initializing the handler. Thus explicitly getting log location
357 # is needed to get correct log path.
358 worker_log_rel_path = self._render_filename(ti, try_number)
359 messages_list: list[str] = []
360 remote_logs: list[str] = []
361 local_logs: list[str] = []
362 executor_messages: list[str] = []
363 executor_logs: list[str] = []
364 served_logs: list[str] = []
365 is_in_running_or_deferred = ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED)
366 with suppress(NotImplementedError):
367 remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata)
368 messages_list.extend(remote_messages)
369 if ti.state == TaskInstanceState.RUNNING:
370 response = self._executor_get_task_log(ti, try_number)
371 if response:
372 executor_messages, executor_logs = response
373 if executor_messages:
374 messages_list.extend(executor_messages)
375 if not (remote_logs and ti.state not in State.unfinished):
376 # when finished, if we have remote logs, no need to check local
377 worker_log_full_path = Path(self.local_base, worker_log_rel_path)
378 local_messages, local_logs = self._read_from_local(worker_log_full_path)
379 messages_list.extend(local_messages)
380 if is_in_running_or_deferred and not executor_messages and not remote_logs:
381 # While task instance is still running and we don't have either executor nor remote logs, look for served logs
382 # This is for cases when users have not setup remote logging nor shared drive for logs
383 served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path)
384 messages_list.extend(served_messages)
385 elif ti.state not in State.unfinished and not (local_logs or remote_logs):
386 # ordinarily we don't check served logs, with the assumption that users set up
387 # remote logging or shared drive for logs for persistence, but that's not always true
388 # so even if task is done, if no local logs or remote logs are found, we'll check the worker
389 served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path)
390 messages_list.extend(served_messages)
392 logs = "\n".join(
393 _interleave_logs(
394 *local_logs,
395 *remote_logs,
396 *(executor_logs or []),
397 *served_logs,
398 )
399 )
400 log_pos = len(logs)
401 messages = "".join([f"*** {x}\n" for x in messages_list])
402 end_of_log = ti.try_number != try_number or not is_in_running_or_deferred
403 if metadata and "log_pos" in metadata:
404 previous_chars = metadata["log_pos"]
405 logs = logs[previous_chars:] # Cut off previously passed log test as new tail
406 out_message = logs if "log_pos" in (metadata or {}) else messages + logs
407 return out_message, {"end_of_log": end_of_log, "log_pos": log_pos}
409 @staticmethod
410 def _get_pod_namespace(ti: TaskInstance):
411 pod_override = ti.executor_config.get("pod_override")
412 namespace = None
413 with suppress(Exception):
414 namespace = pod_override.metadata.namespace
415 return namespace or conf.get("kubernetes_executor", "namespace")
417 def _get_log_retrieval_url(
418 self, ti: TaskInstance, log_relative_path: str, log_type: LogType | None = None
419 ) -> tuple[str, str]:
420 """Given TI, generate URL with which to fetch logs from service log server."""
421 if log_type == LogType.TRIGGER:
422 if not ti.triggerer_job:
423 raise RuntimeError("Could not build triggerer log URL; no triggerer job.")
424 config_key = "triggerer_log_server_port"
425 config_default = 8794
426 hostname = ti.triggerer_job.hostname
427 log_relative_path = self.add_triggerer_suffix(log_relative_path, job_id=ti.triggerer_job.id)
428 else:
429 hostname = ti.hostname
430 config_key = "worker_log_server_port"
431 config_default = 8793
432 return (
433 urljoin(
434 f"http://{hostname}:{conf.get('logging', config_key, fallback=config_default)}/log/",
435 log_relative_path,
436 ),
437 log_relative_path,
438 )
440 def read(self, task_instance, try_number=None, metadata=None):
441 """
442 Read logs of given task instance from local machine.
444 :param task_instance: task instance object
445 :param try_number: task instance try_number to read logs from. If None
446 it returns all logs separated by try_number
447 :param metadata: log metadata, can be used for steaming log reading and auto-tailing.
448 :return: a list of listed tuples which order log string by host
449 """
450 # Task instance increments its try number when it starts to run.
451 # So the log for a particular task try will only show up when
452 # try number gets incremented in DB, i.e logs produced the time
453 # after cli run and before try_number + 1 in DB will not be displayed.
454 if try_number is None:
455 next_try = task_instance.next_try_number
456 try_numbers = list(range(1, next_try))
457 elif try_number < 1:
458 logs = [
459 [("default_host", f"Error fetching the logs. Try number {try_number} is invalid.")],
460 ]
461 return logs, [{"end_of_log": True}]
462 else:
463 try_numbers = [try_number]
465 logs = [""] * len(try_numbers)
466 metadata_array = [{}] * len(try_numbers)
468 # subclasses implement _read and may not have log_type, which was added recently
469 for i, try_number_element in enumerate(try_numbers):
470 log, out_metadata = self._read(task_instance, try_number_element, metadata)
471 # es_task_handler return logs grouped by host. wrap other handler returning log string
472 # with default/ empty host so that UI can render the response in the same way
473 logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)]
474 metadata_array[i] = out_metadata
476 return logs, metadata_array
478 @staticmethod
479 def _prepare_log_folder(directory: Path, new_folder_permissions: int):
480 """
481 Prepare the log folder and ensure its mode is as configured.
483 To handle log writing when tasks are impersonated, the log files need to
484 be writable by the user that runs the Airflow command and the user
485 that is impersonated. This is mainly to handle corner cases with the
486 SubDagOperator. When the SubDagOperator is run, all of the operators
487 run under the impersonated user and create appropriate log files
488 as the impersonated user. However, if the user manually runs tasks
489 of the SubDagOperator through the UI, then the log files are created
490 by the user that runs the Airflow command. For example, the Airflow
491 run command may be run by the `airflow_sudoable` user, but the Airflow
492 tasks may be run by the `airflow` user. If the log files are not
493 writable by both users, then it's possible that re-running a task
494 via the UI (or vice versa) results in a permission error as the task
495 tries to write to a log file created by the other user.
497 We leave it up to the user to manage their permissions by exposing configuration for both
498 new folders and new log files. Default is to make new log folders and files group-writeable
499 to handle most common impersonation use cases. The requirement in this case will be to make
500 sure that the same group is set as default group for both - impersonated user and main airflow
501 user.
502 """
503 for parent in reversed(directory.parents):
504 parent.mkdir(mode=new_folder_permissions, exist_ok=True)
505 directory.mkdir(mode=new_folder_permissions, exist_ok=True)
507 def _init_file(self, ti, *, identifier: str | None = None):
508 """
509 Create log directory and give it permissions that are configured.
511 See above _prepare_log_folder method for more detailed explanation.
513 :param ti: task instance object
514 :return: relative log path of the given task instance
515 """
516 new_file_permissions = int(
517 conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"), 8
518 )
519 local_relative_path = self._render_filename(ti, ti.try_number)
520 full_path = os.path.join(self.local_base, local_relative_path)
521 if identifier:
522 full_path += f".{identifier}.log"
523 elif ti.is_trigger_log_context is True:
524 # if this is true, we're invoked via set_context in the context of
525 # setting up individual trigger logging. return trigger log path.
526 full_path = self.add_triggerer_suffix(full_path=full_path, job_id=ti.triggerer_job.id)
527 new_folder_permissions = int(
528 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 8
529 )
530 self._prepare_log_folder(Path(full_path).parent, new_folder_permissions)
532 if not os.path.exists(full_path):
533 open(full_path, "a").close()
534 try:
535 os.chmod(full_path, new_file_permissions)
536 except OSError as e:
537 logger.warning("OSError while changing ownership of the log file. ", e)
539 return full_path
541 @staticmethod
542 def _read_from_local(worker_log_path: Path) -> tuple[list[str], list[str]]:
543 messages = []
544 paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*"))
545 if paths:
546 messages.append("Found local files:")
547 messages.extend(f" * {x}" for x in paths)
548 logs = [file.read_text() for file in paths]
549 return messages, logs
551 def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], list[str]]:
552 messages = []
553 logs = []
554 try:
555 log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER
556 url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type)
557 response = _fetch_logs_from_service(url, rel_path)
558 if response.status_code == 403:
559 messages.append(
560 "!!!! Please make sure that all your Airflow components (e.g. "
561 "schedulers, webservers, workers and triggerer) have "
562 "the same 'secret_key' configured in 'webserver' section and "
563 "time is synchronized on all your machines (for example with ntpd)\n"
564 "See more at https://airflow.apache.org/docs/apache-airflow/"
565 "stable/configurations-ref.html#secret-key"
566 )
567 # Check if the resource was properly fetched
568 response.raise_for_status()
569 if response.text:
570 messages.append(f"Found logs served from host {url}")
571 logs.append(response.text)
572 except Exception as e:
573 from httpx import UnsupportedProtocol
575 if isinstance(e, UnsupportedProtocol) and ti.task.inherits_from_empty_operator is True:
576 messages.append(self.inherits_from_empty_operator_log_message)
577 else:
578 messages.append(f"Could not read served logs: {e}")
579 logger.exception("Could not read served logs")
580 return messages, logs
582 def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], list[str]]:
583 """
584 Implement in subclasses to read from the remote service.
586 This method should return two lists, messages and logs.
588 * Each element in the messages list should be a single message,
589 such as, "reading from x file".
590 * Each element in the logs list should be the content of one file.
591 """
592 raise NotImplementedError