Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/log/file_task_handler.py: 25%
255 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""File logging handler for tasks."""
19from __future__ import annotations
21import logging
22import os
23import warnings
24from contextlib import suppress
25from enum import Enum
26from functools import cached_property
27from pathlib import Path
28from typing import TYPE_CHECKING, Any, Callable, Iterable
29from urllib.parse import urljoin
31import pendulum
33from airflow.configuration import conf
34from airflow.exceptions import RemovedInAirflow3Warning
35from airflow.executors.executor_loader import ExecutorLoader
36from airflow.utils.context import Context
37from airflow.utils.helpers import parse_template_string, render_template_to_string
38from airflow.utils.log.logging_mixin import SetContextPropagate
39from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
40from airflow.utils.session import create_session
41from airflow.utils.state import State, TaskInstanceState
43if TYPE_CHECKING:
44 from airflow.models import TaskInstance
46logger = logging.getLogger(__name__)
49class LogType(str, Enum):
50 """
51 Type of service from which we retrieve logs.
53 :meta private:
54 """
56 TRIGGER = "trigger"
57 WORKER = "worker"
60def _set_task_deferred_context_var():
61 """
62 Tell task log handler that task exited with deferral.
64 This exists for the sole purpose of telling elasticsearch handler not to
65 emit end_of_log mark after task deferral.
67 Depending on how the task is run, we may need to set this in task command or in local task job.
68 Kubernetes executor requires the local task job invocation; local executor requires the task
69 command invocation.
71 :meta private:
72 """
73 logger = logging.getLogger()
74 with suppress(StopIteration):
75 h = next(h for h in logger.handlers if hasattr(h, "ctx_task_deferred"))
76 h.ctx_task_deferred = True
79def _fetch_logs_from_service(url, log_relative_path):
80 import httpx
82 from airflow.utils.jwt_signer import JWTSigner
84 timeout = conf.getint("webserver", "log_fetch_timeout_sec", fallback=None)
85 signer = JWTSigner(
86 secret_key=conf.get("webserver", "secret_key"),
87 expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30),
88 audience="task-instance-logs",
89 )
90 response = httpx.get(
91 url,
92 timeout=timeout,
93 headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
94 )
95 response.encoding = "utf-8"
96 return response
99_parse_timestamp = conf.getimport("logging", "interleave_timestamp_parser", fallback=None)
101if not _parse_timestamp:
103 def _parse_timestamp(line: str):
104 timestamp_str, _ = line.split(" ", 1)
105 return pendulum.parse(timestamp_str.strip("[]"))
108def _parse_timestamps_in_log_file(lines: Iterable[str]):
109 timestamp = None
110 next_timestamp = None
111 for idx, line in enumerate(lines):
112 if not line:
113 continue
114 with suppress(Exception):
115 # next_timestamp unchanged if line can't be parsed
116 next_timestamp = _parse_timestamp(line)
117 if next_timestamp:
118 timestamp = next_timestamp
119 yield timestamp, idx, line
122def _interleave_logs(*logs):
123 records = []
124 for log in logs:
125 records.extend(_parse_timestamps_in_log_file(log.splitlines()))
126 last = None
127 for _, _, v in sorted(
128 records, key=lambda x: (x[0], x[1]) if x[0] else (pendulum.datetime(2000, 1, 1), x[1])
129 ):
130 if v != last: # dedupe
131 yield v
132 last = v
135class FileTaskHandler(logging.Handler):
136 """
137 FileTaskHandler is a python log handler that handles and reads
138 task instance logs. It creates and delegates log handling
139 to `logging.FileHandler` after receiving task instance context.
140 It reads logs from task instance's host machine.
142 :param base_log_folder: Base log folder to place logs.
143 :param filename_template: template filename string
144 """
146 trigger_should_wrap = True
148 def __init__(self, base_log_folder: str, filename_template: str | None = None):
149 super().__init__()
150 self.handler: logging.FileHandler | None = None
151 self.local_base = base_log_folder
152 if filename_template is not None:
153 warnings.warn(
154 "Passing filename_template to a log handler is deprecated and has no effect",
155 RemovedInAirflow3Warning,
156 # We want to reference the stack that actually instantiates the
157 # handler, not the one that calls super()__init__.
158 stacklevel=(2 if type(self) == FileTaskHandler else 3),
159 )
160 self.maintain_propagate: bool = False
161 """
162 If true, overrides default behavior of setting propagate=False
164 :meta private:
165 """
167 self.ctx_task_deferred = False
168 """
169 If true, task exited with deferral to trigger.
171 Some handlers emit "end of log" markers, and may not wish to do so when task defers.
172 """
174 def set_context(self, ti: TaskInstance) -> None | SetContextPropagate:
175 """
176 Provide task_instance context to airflow task handler.
178 Generally speaking returns None. But if attr `maintain_propagate` has
179 been set to propagate, then returns sentinel MAINTAIN_PROPAGATE. This
180 has the effect of overriding the default behavior to set `propagate`
181 to False whenever set_context is called. At time of writing, this
182 functionality is only used in unit testing.
184 :param ti: task instance object
185 """
186 local_loc = self._init_file(ti)
187 self.handler = NonCachingFileHandler(local_loc, encoding="utf-8")
188 if self.formatter:
189 self.handler.setFormatter(self.formatter)
190 self.handler.setLevel(self.level)
191 return SetContextPropagate.MAINTAIN_PROPAGATE if self.maintain_propagate else None
193 @staticmethod
194 def add_triggerer_suffix(full_path, job_id=None):
195 """
196 Helper for deriving trigger log filename from task log filename.
198 E.g. given /path/to/file.log returns /path/to/file.log.trigger.123.log, where 123
199 is the triggerer id. We use the triggerer ID instead of trigger ID to distinguish
200 the files because, rarely, the same trigger could get picked up by two different
201 triggerer instances.
202 """
203 full_path = Path(full_path).as_posix()
204 full_path += f".{LogType.TRIGGER.value}"
205 if job_id:
206 full_path += f".{job_id}.log"
207 return full_path
209 def emit(self, record):
210 if self.handler:
211 self.handler.emit(record)
213 def flush(self):
214 if self.handler:
215 self.handler.flush()
217 def close(self):
218 if self.handler:
219 self.handler.close()
221 def _render_filename(self, ti: TaskInstance, try_number: int) -> str:
222 """Returns the worker log filename."""
223 with create_session() as session:
224 dag_run = ti.get_dagrun(session=session)
225 template = dag_run.get_log_template(session=session).filename
226 str_tpl, jinja_tpl = parse_template_string(template)
228 if jinja_tpl:
229 if hasattr(ti, "task"):
230 context = ti.get_template_context(session=session)
231 else:
232 context = Context(ti=ti, ts=dag_run.logical_date.isoformat())
233 context["try_number"] = try_number
234 return render_template_to_string(jinja_tpl, context)
236 if str_tpl:
237 try:
238 dag = ti.task.dag
239 except AttributeError: # ti.task is not always set.
240 data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
241 else:
242 if TYPE_CHECKING:
243 assert dag is not None
244 data_interval = dag.get_run_data_interval(dag_run)
245 if data_interval[0]:
246 data_interval_start = data_interval[0].isoformat()
247 else:
248 data_interval_start = ""
249 if data_interval[1]:
250 data_interval_end = data_interval[1].isoformat()
251 else:
252 data_interval_end = ""
253 return str_tpl.format(
254 dag_id=ti.dag_id,
255 task_id=ti.task_id,
256 run_id=ti.run_id,
257 data_interval_start=data_interval_start,
258 data_interval_end=data_interval_end,
259 execution_date=ti.get_dagrun().logical_date.isoformat(),
260 try_number=try_number,
261 )
262 else:
263 raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen")
265 def _read_grouped_logs(self):
266 return False
268 @cached_property
269 def _executor_get_task_log(self) -> Callable[[TaskInstance, int], tuple[list[str], list[str]]]:
270 """This cached property avoids loading executor repeatedly."""
271 executor = ExecutorLoader.get_default_executor()
272 return executor.get_task_log
274 def _read(
275 self,
276 ti: TaskInstance,
277 try_number: int,
278 metadata: dict[str, Any] | None = None,
279 ):
280 """
281 Template method that contains custom logic of reading
282 logs given the try_number.
284 :param ti: task instance record
285 :param try_number: current try_number to read log from
286 :param metadata: log metadata,
287 can be used for steaming log reading and auto-tailing.
288 Following attributes are used:
289 log_pos: (absolute) Char position to which the log
290 which was retrieved in previous calls, this
291 part will be skipped and only following test
292 returned to be added to tail.
293 :return: log message as a string and metadata.
294 Following attributes are used in metadata:
295 end_of_log: Boolean, True if end of log is reached or False
296 if further calls might get more log text.
297 This is determined by the status of the TaskInstance
298 log_pos: (absolute) Char position to which the log is retrieved
299 """
300 # Task instance here might be different from task instance when
301 # initializing the handler. Thus explicitly getting log location
302 # is needed to get correct log path.
303 worker_log_rel_path = self._render_filename(ti, try_number)
304 messages_list: list[str] = []
305 remote_logs: list[str] = []
306 local_logs: list[str] = []
307 executor_messages: list[str] = []
308 executor_logs: list[str] = []
309 served_logs: list[str] = []
310 with suppress(NotImplementedError):
311 remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata)
312 messages_list.extend(remote_messages)
313 if ti.state == TaskInstanceState.RUNNING:
314 response = self._executor_get_task_log(ti, try_number)
315 if response:
316 executor_messages, executor_logs = response
317 if executor_messages:
318 messages_list.extend(executor_messages)
319 if not (remote_logs and ti.state not in State.unfinished):
320 # when finished, if we have remote logs, no need to check local
321 worker_log_full_path = Path(self.local_base, worker_log_rel_path)
322 local_messages, local_logs = self._read_from_local(worker_log_full_path)
323 messages_list.extend(local_messages)
324 if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) and not executor_messages:
325 served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path)
326 messages_list.extend(served_messages)
327 elif ti.state not in State.unfinished and not (local_logs or remote_logs):
328 # ordinarily we don't check served logs, with the assumption that users set up
329 # remote logging or shared drive for logs for persistence, but that's not always true
330 # so even if task is done, if no local logs or remote logs are found, we'll check the worker
331 served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path)
332 messages_list.extend(served_messages)
334 logs = "\n".join(
335 _interleave_logs(
336 *local_logs,
337 *remote_logs,
338 *(executor_logs or []),
339 *served_logs,
340 )
341 )
342 log_pos = len(logs)
343 messages = "".join([f"*** {x}\n" for x in messages_list])
344 end_of_log = ti.try_number != try_number or ti.state not in [State.RUNNING, State.DEFERRED]
345 if metadata and "log_pos" in metadata:
346 previous_chars = metadata["log_pos"]
347 logs = logs[previous_chars:] # Cut off previously passed log test as new tail
348 out_message = logs if "log_pos" in (metadata or {}) else messages + logs
349 return out_message, {"end_of_log": end_of_log, "log_pos": log_pos}
351 @staticmethod
352 def _get_pod_namespace(ti: TaskInstance):
353 pod_override = ti.executor_config.get("pod_override")
354 namespace = None
355 with suppress(Exception):
356 namespace = pod_override.metadata.namespace
357 return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")
359 def _get_log_retrieval_url(
360 self, ti: TaskInstance, log_relative_path: str, log_type: LogType | None = None
361 ) -> tuple[str, str]:
362 """Given TI, generate URL with which to fetch logs from service log server."""
363 if log_type == LogType.TRIGGER:
364 if not ti.triggerer_job:
365 raise RuntimeError("Could not build triggerer log URL; no triggerer job.")
366 config_key = "triggerer_log_server_port"
367 config_default = 8794
368 hostname = ti.triggerer_job.hostname
369 log_relative_path = self.add_triggerer_suffix(log_relative_path, job_id=ti.triggerer_job.id)
370 else:
371 hostname = ti.hostname
372 config_key = "worker_log_server_port"
373 config_default = 8793
374 return (
375 urljoin(
376 f"http://{hostname}:{conf.get('logging', config_key, fallback=config_default)}/log/",
377 log_relative_path,
378 ),
379 log_relative_path,
380 )
382 def read(self, task_instance, try_number=None, metadata=None):
383 """
384 Read logs of given task instance from local machine.
386 :param task_instance: task instance object
387 :param try_number: task instance try_number to read logs from. If None
388 it returns all logs separated by try_number
389 :param metadata: log metadata, can be used for steaming log reading and auto-tailing.
390 :return: a list of listed tuples which order log string by host
391 """
392 # Task instance increments its try number when it starts to run.
393 # So the log for a particular task try will only show up when
394 # try number gets incremented in DB, i.e logs produced the time
395 # after cli run and before try_number + 1 in DB will not be displayed.
396 if try_number is None:
397 next_try = task_instance.next_try_number
398 try_numbers = list(range(1, next_try))
399 elif try_number < 1:
400 logs = [
401 [("default_host", f"Error fetching the logs. Try number {try_number} is invalid.")],
402 ]
403 return logs, [{"end_of_log": True}]
404 else:
405 try_numbers = [try_number]
407 logs = [""] * len(try_numbers)
408 metadata_array = [{}] * len(try_numbers)
410 # subclasses implement _read and may not have log_type, which was added recently
411 for i, try_number_element in enumerate(try_numbers):
412 log, out_metadata = self._read(task_instance, try_number_element, metadata)
413 # es_task_handler return logs grouped by host. wrap other handler returning log string
414 # with default/ empty host so that UI can render the response in the same way
415 logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)]
416 metadata_array[i] = out_metadata
418 return logs, metadata_array
420 def _prepare_log_folder(self, directory: Path):
421 """
422 Prepare the log folder and ensure its mode is as configured.
424 To handle log writing when tasks are impersonated, the log files need to
425 be writable by the user that runs the Airflow command and the user
426 that is impersonated. This is mainly to handle corner cases with the
427 SubDagOperator. When the SubDagOperator is run, all of the operators
428 run under the impersonated user and create appropriate log files
429 as the impersonated user. However, if the user manually runs tasks
430 of the SubDagOperator through the UI, then the log files are created
431 by the user that runs the Airflow command. For example, the Airflow
432 run command may be run by the `airflow_sudoable` user, but the Airflow
433 tasks may be run by the `airflow` user. If the log files are not
434 writable by both users, then it's possible that re-running a task
435 via the UI (or vice versa) results in a permission error as the task
436 tries to write to a log file created by the other user.
438 We leave it up to the user to manage their permissions by exposing configuration for both
439 new folders and new log files. Default is to make new log folders and files group-writeable
440 to handle most common impersonation use cases. The requirement in this case will be to make
441 sure that the same group is set as default group for both - impersonated user and main airflow
442 user.
443 """
444 new_folder_permissions = int(
445 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 8
446 )
447 directory.mkdir(mode=new_folder_permissions, parents=True, exist_ok=True)
448 if directory.stat().st_mode % 0o1000 != new_folder_permissions % 0o1000:
449 print(f"Changing {directory} permission to {new_folder_permissions}")
450 try:
451 directory.chmod(new_folder_permissions)
452 except PermissionError as e:
453 # In some circumstances (depends on user and filesystem) we might not be able to
454 # change the permission for the folder (when the folder was created by another user
455 # before or when the filesystem does not allow to change permission). We should not
456 # fail in this case but rather ignore it.
457 print(f"Failed to change {directory} permission to {new_folder_permissions}: {e}")
458 pass
460 def _init_file(self, ti):
461 """
462 Create log directory and give it permissions that are configured. See above _prepare_log_folder
463 method for more detailed explanation.
465 :param ti: task instance object
466 :return: relative log path of the given task instance
467 """
468 new_file_permissions = int(
469 conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"), 8
470 )
471 local_relative_path = self._render_filename(ti, ti.try_number)
472 full_path = os.path.join(self.local_base, local_relative_path)
473 if ti.is_trigger_log_context is True:
474 # if this is true, we're invoked via set_context in the context of
475 # setting up individual trigger logging. return trigger log path.
476 full_path = self.add_triggerer_suffix(full_path=full_path, job_id=ti.triggerer_job.id)
477 self._prepare_log_folder(Path(full_path).parent)
479 if not os.path.exists(full_path):
480 open(full_path, "a").close()
481 try:
482 os.chmod(full_path, new_file_permissions)
483 except OSError as e:
484 logging.warning("OSError while changing ownership of the log file. ", e)
486 return full_path
488 @staticmethod
489 def _read_from_local(worker_log_path: Path) -> tuple[list[str], list[str]]:
490 messages = []
491 logs = []
492 files = list(worker_log_path.parent.glob(worker_log_path.name + "*"))
493 if files:
494 messages.extend(["Found local files:", *[f" * {x}" for x in sorted(files)]])
495 for file in sorted(files):
496 logs.append(Path(file).read_text())
497 return messages, logs
499 def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], list[str]]:
500 messages = []
501 logs = []
502 try:
503 log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER
504 url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type)
505 response = _fetch_logs_from_service(url, rel_path)
506 if response.status_code == 403:
507 messages.append(
508 "!!!! Please make sure that all your Airflow components (e.g. "
509 "schedulers, webservers, workers and triggerer) have "
510 "the same 'secret_key' configured in 'webserver' section and "
511 "time is synchronized on all your machines (for example with ntpd)\n"
512 "See more at https://airflow.apache.org/docs/apache-airflow/"
513 "stable/configurations-ref.html#secret-key"
514 )
515 # Check if the resource was properly fetched
516 response.raise_for_status()
517 if response.text:
518 messages.append(f"Found logs served from host {url}")
519 logs.append(response.text)
520 except Exception as e:
521 messages.append(f"Could not read served logs: {str(e)}")
522 logger.exception("Could not read served logs")
523 return messages, logs
525 def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], list[str]]:
526 """
527 Implement in subclasses to read from the remote service.
529 This method should return two lists, messages and logs.
531 * Each element in the messages list should be a single message,
532 such as, "reading from x file".
533 * Each element in the logs list should be the content of one file.
534 """
535 raise NotImplementedError