Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/log/file_task_handler.py: 22%
169 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""File logging handler for tasks."""
19from __future__ import annotations
21import logging
22import os
23import warnings
24from pathlib import Path
25from typing import TYPE_CHECKING, Any
26from urllib.parse import urljoin
28from airflow.configuration import AirflowConfigException, conf
29from airflow.exceptions import RemovedInAirflow3Warning
30from airflow.utils.context import Context
31from airflow.utils.helpers import parse_template_string, render_template_to_string
32from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
33from airflow.utils.session import create_session
34from airflow.utils.state import State
36if TYPE_CHECKING:
37 from airflow.models import TaskInstance
38 from airflow.utils.log.logging_mixin import SetContextPropagate
41class FileTaskHandler(logging.Handler):
42 """
43 FileTaskHandler is a python log handler that handles and reads
44 task instance logs. It creates and delegates log handling
45 to `logging.FileHandler` after receiving task instance context.
46 It reads logs from task instance's host machine.
48 :param base_log_folder: Base log folder to place logs.
49 :param filename_template: template filename string
50 """
52 def __init__(self, base_log_folder: str, filename_template: str | None = None):
53 super().__init__()
54 self.handler: logging.FileHandler | None = None
55 self.local_base = base_log_folder
56 if filename_template is not None:
57 warnings.warn(
58 "Passing filename_template to a log handler is deprecated and has no effect",
59 RemovedInAirflow3Warning,
60 # We want to reference the stack that actually instantiates the
61 # handler, not the one that calls super()__init__.
62 stacklevel=(2 if type(self) == FileTaskHandler else 3),
63 )
65 def set_context(self, ti: TaskInstance) -> None | SetContextPropagate:
66 """
67 Provide task_instance context to airflow task handler.
69 :param ti: task instance object
70 """
71 local_loc = self._init_file(ti)
72 self.handler = NonCachingFileHandler(local_loc, encoding="utf-8")
73 if self.formatter:
74 self.handler.setFormatter(self.formatter)
75 self.handler.setLevel(self.level)
76 return None
78 def emit(self, record):
79 if self.handler:
80 self.handler.emit(record)
82 def flush(self):
83 if self.handler:
84 self.handler.flush()
86 def close(self):
87 if self.handler:
88 self.handler.close()
90 def _render_filename(self, ti: TaskInstance, try_number: int) -> str:
91 with create_session() as session:
92 dag_run = ti.get_dagrun(session=session)
93 template = dag_run.get_log_template(session=session).filename
94 str_tpl, jinja_tpl = parse_template_string(template)
96 if jinja_tpl:
97 if hasattr(ti, "task"):
98 context = ti.get_template_context()
99 else:
100 context = Context(ti=ti, ts=dag_run.logical_date.isoformat())
101 context["try_number"] = try_number
102 return render_template_to_string(jinja_tpl, context)
103 elif str_tpl:
104 try:
105 dag = ti.task.dag
106 except AttributeError: # ti.task is not always set.
107 data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
108 else:
109 if TYPE_CHECKING:
110 assert dag is not None
111 data_interval = dag.get_run_data_interval(dag_run)
112 if data_interval[0]:
113 data_interval_start = data_interval[0].isoformat()
114 else:
115 data_interval_start = ""
116 if data_interval[1]:
117 data_interval_end = data_interval[1].isoformat()
118 else:
119 data_interval_end = ""
120 return str_tpl.format(
121 dag_id=ti.dag_id,
122 task_id=ti.task_id,
123 run_id=ti.run_id,
124 data_interval_start=data_interval_start,
125 data_interval_end=data_interval_end,
126 execution_date=ti.get_dagrun().logical_date.isoformat(),
127 try_number=try_number,
128 )
129 else:
130 raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen")
132 def _read_grouped_logs(self):
133 return False
135 @staticmethod
136 def _should_check_k8s(queue):
137 """
138 If the task is running through kubernetes executor, return True.
140 When logs aren't available locally, in this case we read from k8s pod logs.
141 """
142 executor = conf.get("core", "executor")
143 if executor == "KubernetesExecutor":
144 return True
145 elif executor == "LocalKubernetesExecutor":
146 if queue == conf.get("local_kubernetes_executor", "kubernetes_queue"):
147 return True
148 elif executor == "CeleryKubernetesExecutor":
149 if queue == conf.get("celery_kubernetes_executor", "kubernetes_queue"):
150 return True
151 return False
153 def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | None = None):
154 """
155 Template method that contains custom logic of reading
156 logs given the try_number.
158 :param ti: task instance record
159 :param try_number: current try_number to read log from
160 :param metadata: log metadata,
161 can be used for steaming log reading and auto-tailing.
162 Following attributes are used:
163 log_pos: (absolute) Char position to which the log
164 which was retrieved in previous calls, this
165 part will be skipped and only following test
166 returned to be added to tail.
168 :return: log message as a string and metadata.
169 Following attributes are used in metadata:
170 end_of_log: Boolean, True if end of log is reached or False
171 if further calls might get more log text.
172 This is determined by the status of the TaskInstance
173 log_pos: (absolute) Char position to which the log is retrieved
174 """
175 from airflow.utils.jwt_signer import JWTSigner
177 # Task instance here might be different from task instance when
178 # initializing the handler. Thus explicitly getting log location
179 # is needed to get correct log path.
180 log_relative_path = self._render_filename(ti, try_number)
181 location = os.path.join(self.local_base, log_relative_path)
183 log = ""
184 if os.path.exists(location):
185 try:
186 with open(location, encoding="utf-8", errors="surrogateescape") as file:
187 log += f"*** Reading local file: {location}\n"
188 log += "".join(file.readlines())
189 except Exception as e:
190 log = f"*** Failed to load local log file: {location}\n"
191 log += f"*** {str(e)}\n"
192 return log, {"end_of_log": True}
193 elif self._should_check_k8s(ti.queue):
194 pod_override = ti.executor_config.get("pod_override")
195 if pod_override and pod_override.metadata and pod_override.metadata.namespace:
196 namespace = pod_override.metadata.namespace
197 else:
198 namespace = conf.get("kubernetes_executor", "namespace")
199 try:
200 from airflow.kubernetes.kube_client import get_kube_client
202 kube_client = get_kube_client()
204 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
205 res = kube_client.read_namespaced_pod_log(
206 name=ti.hostname,
207 namespace=namespace,
208 container="base",
209 follow=False,
210 tail_lines=100,
211 _preload_content=False,
212 )
214 for line in res:
215 log += line.decode()
217 except Exception as f:
218 log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n"
219 return log, {"end_of_log": True}
220 else:
221 import httpx
223 url = self._get_log_retrieval_url(ti, log_relative_path)
224 log += f"*** Log file does not exist: {location}\n"
225 log += f"*** Fetching from: {url}\n"
226 try:
227 timeout = None # No timeout
228 try:
229 timeout = conf.getint("webserver", "log_fetch_timeout_sec")
230 except (AirflowConfigException, ValueError):
231 pass
233 signer = JWTSigner(
234 secret_key=conf.get("webserver", "secret_key"),
235 expiration_time_in_seconds=conf.getint(
236 "webserver", "log_request_clock_grace", fallback=30
237 ),
238 audience="task-instance-logs",
239 )
240 response = httpx.get(
241 url,
242 timeout=timeout,
243 headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
244 )
245 response.encoding = "utf-8"
247 if response.status_code == 403:
248 log += (
249 "*** !!!! Please make sure that all your Airflow components (e.g. "
250 "schedulers, webservers and workers) have "
251 "the same 'secret_key' configured in 'webserver' section and "
252 "time is synchronized on all your machines (for example with ntpd) !!!!!\n***"
253 )
254 log += (
255 "*** See more at https://airflow.apache.org/docs/apache-airflow/"
256 "stable/configurations-ref.html#secret-key\n***"
257 )
258 # Check if the resource was properly fetched
259 response.raise_for_status()
261 log += "\n" + response.text
262 except Exception as e:
263 log += f"*** Failed to fetch log file from worker. {str(e)}\n"
264 return log, {"end_of_log": True}
266 # Process tailing if log is not at it's end
267 end_of_log = ti.try_number != try_number or ti.state not in State.running
268 log_pos = len(log)
269 if metadata and "log_pos" in metadata:
270 previous_chars = metadata["log_pos"]
271 log = log[previous_chars:] # Cut off previously passed log test as new tail
273 return log, {"end_of_log": end_of_log, "log_pos": log_pos}
275 @staticmethod
276 def _get_log_retrieval_url(ti: TaskInstance, log_relative_path: str) -> str:
277 url = urljoin(
278 f"http://{ti.hostname}:{conf.get('logging', 'WORKER_LOG_SERVER_PORT')}/log/",
279 log_relative_path,
280 )
281 return url
283 def read(self, task_instance, try_number=None, metadata=None):
284 """
285 Read logs of given task instance from local machine.
287 :param task_instance: task instance object
288 :param try_number: task instance try_number to read logs from. If None
289 it returns all logs separated by try_number
290 :param metadata: log metadata,
291 can be used for steaming log reading and auto-tailing.
292 :return: a list of listed tuples which order log string by host
293 """
294 # Task instance increments its try number when it starts to run.
295 # So the log for a particular task try will only show up when
296 # try number gets incremented in DB, i.e logs produced the time
297 # after cli run and before try_number + 1 in DB will not be displayed.
299 if try_number is None:
300 next_try = task_instance.next_try_number
301 try_numbers = list(range(1, next_try))
302 elif try_number < 1:
303 logs = [
304 [("default_host", f"Error fetching the logs. Try number {try_number} is invalid.")],
305 ]
306 return logs, [{"end_of_log": True}]
307 else:
308 try_numbers = [try_number]
310 logs = [""] * len(try_numbers)
311 metadata_array = [{}] * len(try_numbers)
312 for i, try_number_element in enumerate(try_numbers):
313 log, out_metadata = self._read(task_instance, try_number_element, metadata)
314 # es_task_handler return logs grouped by host. wrap other handler returning log string
315 # with default/ empty host so that UI can render the response in the same way
316 logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)]
317 metadata_array[i] = out_metadata
319 return logs, metadata_array
321 def _init_file(self, ti):
322 """
323 Create log directory and give it correct permissions.
325 :param ti: task instance object
326 :return: relative log path of the given task instance
327 """
328 # To handle log writing when tasks are impersonated, the log files need to
329 # be writable by the user that runs the Airflow command and the user
330 # that is impersonated. This is mainly to handle corner cases with the
331 # SubDagOperator. When the SubDagOperator is run, all of the operators
332 # run under the impersonated user and create appropriate log files
333 # as the impersonated user. However, if the user manually runs tasks
334 # of the SubDagOperator through the UI, then the log files are created
335 # by the user that runs the Airflow command. For example, the Airflow
336 # run command may be run by the `airflow_sudoable` user, but the Airflow
337 # tasks may be run by the `airflow` user. If the log files are not
338 # writable by both users, then it's possible that re-running a task
339 # via the UI (or vice versa) results in a permission error as the task
340 # tries to write to a log file created by the other user.
341 relative_path = self._render_filename(ti, ti.try_number)
342 full_path = os.path.join(self.local_base, relative_path)
343 directory = os.path.dirname(full_path)
344 # Create the log file and give it group writable permissions
345 # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
346 # operator is not compatible with impersonation (e.g. if a Celery executor is used
347 # for a SubDag operator and the SubDag operator has a different owner than the
348 # parent DAG)
349 Path(directory).mkdir(mode=0o777, parents=True, exist_ok=True)
351 if not os.path.exists(full_path):
352 open(full_path, "a").close()
353 # TODO: Investigate using 444 instead of 666.
354 try:
355 os.chmod(full_path, 0o666)
356 except OSError:
357 logging.warning("OSError while change ownership of the log file")
359 return full_path