1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""File logging handler for tasks."""
19
20from __future__ import annotations
21
22import heapq
23import io
24import logging
25import os
26from collections.abc import Callable, Generator, Iterator
27from contextlib import suppress
28from datetime import datetime
29from enum import Enum
30from itertools import chain, islice
31from pathlib import Path
32from types import GeneratorType
33from typing import IO, TYPE_CHECKING, TypedDict, cast
34from urllib.parse import urljoin
35
36import pendulum
37from pydantic import BaseModel, ConfigDict, ValidationError
38from typing_extensions import NotRequired
39
40from airflow.configuration import conf
41from airflow.executors.executor_loader import ExecutorLoader
42from airflow.utils.helpers import parse_template_string, render_template
43from airflow.utils.log.log_stream_accumulator import LogStreamAccumulator
44from airflow.utils.log.logging_mixin import SetContextPropagate
45from airflow.utils.log.non_caching_file_handler import NonCachingRotatingFileHandler
46from airflow.utils.session import NEW_SESSION, provide_session
47from airflow.utils.state import State, TaskInstanceState
48
49if TYPE_CHECKING:
50 from typing import TypeAlias
51
52 from requests import Response
53
54 from airflow.executors.base_executor import BaseExecutor
55 from airflow.models.taskinstance import TaskInstance
56 from airflow.models.taskinstancehistory import TaskInstanceHistory
57
58CHUNK_SIZE = 1024 * 1024 * 5 # 5MB
59DEFAULT_SORT_DATETIME = pendulum.datetime(2000, 1, 1)
60DEFAULT_SORT_TIMESTAMP = int(DEFAULT_SORT_DATETIME.timestamp() * 1000)
61SORT_KEY_OFFSET = 10000000
62"""An offset used by the _create_sort_key utility.
63
64Assuming 50 characters per line, an offset of 10,000,000 can represent approximately 500 MB of file data, which is sufficient for use as a constant.
65"""
66HEAP_DUMP_SIZE = 5000
67HALF_HEAP_DUMP_SIZE = HEAP_DUMP_SIZE // 2
68
69# These types are similar, but have distinct names to make processing them less error prone
70LogMessages: TypeAlias = list[str]
71"""The legacy format of log messages before 3.0.4"""
72LogSourceInfo: TypeAlias = list[str]
73"""Information _about_ the log fetching process for display to a user"""
74RawLogStream: TypeAlias = Generator[str, None, None]
75"""Raw log stream, containing unparsed log lines."""
76LogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages | None]
77"""Legacy log response, containing source information and log messages."""
78StreamingLogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]]
79"""Streaming log response, containing source information, stream of log lines."""
80StructuredLogStream: TypeAlias = Generator["StructuredLogMessage", None, None]
81"""Structured log stream, containing structured log messages."""
82LogHandlerOutputStream: TypeAlias = (
83 StructuredLogStream | Iterator["StructuredLogMessage"] | chain["StructuredLogMessage"]
84)
85"""Output stream, containing structured log messages or a chain of them."""
86ParsedLog: TypeAlias = tuple[datetime | None, int, "StructuredLogMessage"]
87"""Parsed log record, containing timestamp, line_num and the structured log message."""
88ParsedLogStream: TypeAlias = Generator[ParsedLog, None, None]
89LegacyProvidersLogType: TypeAlias = list["StructuredLogMessage"] | str | list[str]
90"""Return type used by legacy `_read` methods for Alibaba Cloud, Elasticsearch, OpenSearch, and Redis log handlers.
91
92- For Elasticsearch and OpenSearch: returns either a list of structured log messages.
93- For Alibaba Cloud: returns a string.
94- For Redis: returns a list of strings.
95"""
96
97
98logger = logging.getLogger(__name__)
99
100
101class LogMetadata(TypedDict):
102 """Metadata about the log fetching process, including `end_of_log` and `log_pos`."""
103
104 end_of_log: bool
105 log_pos: NotRequired[int]
106 # the following attributes are used for Elasticsearch and OpenSearch log handlers
107 offset: NotRequired[str | int]
108 # Ensure a string here. Large offset numbers will get JSON.parsed incorrectly
109 # on the client. Sending as a string prevents this issue.
110 # https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER
111 last_log_timestamp: NotRequired[str]
112 max_offset: NotRequired[str]
113
114
115class StructuredLogMessage(BaseModel):
116 """An individual log message."""
117
118 timestamp: datetime | None = None
119 event: str
120
121 # Collisions of sort_key may occur due to duplicated messages. If this happens, the heap will use the second element,
122 # which is the StructuredLogMessage for comparison. Therefore, we need to define a comparator for it.
123 def __lt__(self, other: StructuredLogMessage) -> bool:
124 return self.sort_key < other.sort_key
125
126 @property
127 def sort_key(self) -> datetime:
128 return self.timestamp or DEFAULT_SORT_DATETIME
129
130 # We don't need to cache string when parsing in to this, as almost every line will have a different
131 # values; `extra=allow` means we'll create extra properties as needed. Only timestamp and event are
132 # required, everything else is up to what ever is producing the logs
133 model_config = ConfigDict(cache_strings=False, extra="allow")
134
135
136class LogType(str, Enum):
137 """
138 Type of service from which we retrieve logs.
139
140 :meta private:
141 """
142
143 TRIGGER = "trigger"
144 WORKER = "worker"
145
146
147def _set_task_deferred_context_var():
148 """
149 Tell task log handler that task exited with deferral.
150
151 This exists for the sole purpose of telling elasticsearch handler not to
152 emit end_of_log mark after task deferral.
153
154 Depending on how the task is run, we may need to set this in task command or in local task job.
155 Kubernetes executor requires the local task job invocation; local executor requires the task
156 command invocation.
157
158 :meta private:
159 """
160 logger = logging.getLogger()
161 with suppress(StopIteration):
162 h = next(h for h in logger.handlers if hasattr(h, "ctx_task_deferred"))
163 h.ctx_task_deferred = True
164
165
166def _fetch_logs_from_service(url: str, log_relative_path: str) -> Response:
167 # Import occurs in function scope for perf. Ref: https://github.com/apache/airflow/pull/21438
168 import requests
169
170 from airflow.api_fastapi.auth.tokens import JWTGenerator, get_signing_key
171
172 timeout = conf.getint("api", "log_fetch_timeout_sec", fallback=None)
173 generator = JWTGenerator(
174 secret_key=get_signing_key("api", "secret_key"),
175 # Since we are using a secret key, we need to be explicit about the algorithm here too
176 algorithm="HS512",
177 # We must set an empty private key here as otherwise it can be automatically loaded by JWTGenerator
178 # and secret_key and private_key cannot be set together
179 private_key=None, # type: ignore[arg-type]
180 issuer=None,
181 valid_for=conf.getint("webserver", "log_request_clock_grace", fallback=30),
182 audience="task-instance-logs",
183 )
184 response = requests.get(
185 url,
186 timeout=timeout,
187 headers={"Authorization": generator.generate({"filename": log_relative_path})},
188 stream=True,
189 )
190 response.encoding = "utf-8"
191 return response
192
193
194_parse_timestamp = conf.getimport("logging", "interleave_timestamp_parser", fallback=None)
195
196if not _parse_timestamp:
197
198 def _parse_timestamp(line: str):
199 timestamp_str, _ = line.split(" ", 1)
200 return pendulum.parse(timestamp_str.strip("[]"))
201
202
203def _stream_lines_by_chunk(
204 log_io: IO[str],
205) -> RawLogStream:
206 """
207 Stream lines from a file-like IO object.
208
209 :param log_io: A file-like IO object to read from.
210 :return: A generator that yields individual lines within the specified range.
211 """
212 # Skip processing if file is already closed
213 if log_io.closed:
214 return
215
216 # Seek to beginning if possible
217 if log_io.seekable():
218 try:
219 log_io.seek(0)
220 except Exception as e:
221 logger.error("Error seeking in log stream: %s", e)
222 return
223
224 buffer = ""
225 while True:
226 # Check if file is already closed
227 if log_io.closed:
228 break
229
230 try:
231 chunk = log_io.read(CHUNK_SIZE)
232 except Exception as e:
233 logger.error("Error reading log stream: %s", e)
234 break
235
236 if not chunk:
237 break
238
239 buffer += chunk
240 *lines, buffer = buffer.split("\n")
241 yield from lines
242
243 if buffer:
244 yield from buffer.split("\n")
245
246
247def _log_stream_to_parsed_log_stream(
248 log_stream: RawLogStream,
249) -> ParsedLogStream:
250 """
251 Turn a str log stream into a generator of parsed log lines.
252
253 :param log_stream: The stream to parse.
254 :return: A generator of parsed log lines.
255 """
256 from airflow._shared.timezones.timezone import coerce_datetime
257
258 timestamp = None
259 next_timestamp = None
260 idx = 0
261 for line in log_stream:
262 if line:
263 try:
264 log = StructuredLogMessage.model_validate_json(line)
265 except ValidationError:
266 with suppress(Exception):
267 # If we can't parse the timestamp, don't attach one to the row
268 if isinstance(line, str):
269 next_timestamp = _parse_timestamp(line)
270 log = StructuredLogMessage(event=str(line), timestamp=next_timestamp)
271 if log.timestamp:
272 log.timestamp = coerce_datetime(log.timestamp)
273 timestamp = log.timestamp
274 yield timestamp, idx, log
275 idx += 1
276
277
278def _create_sort_key(timestamp: datetime | None, line_num: int) -> int:
279 """
280 Create a sort key for log record, to be used in K-way merge.
281
282 :param timestamp: timestamp of the log line
283 :param line_num: line number of the log line
284 :return: a integer as sort key to avoid overhead of memory usage
285 """
286 return int((timestamp or DEFAULT_SORT_DATETIME).timestamp() * 1000) * SORT_KEY_OFFSET + line_num
287
288
289def _is_sort_key_with_default_timestamp(sort_key: int) -> bool:
290 """
291 Check if the sort key was generated with the DEFAULT_SORT_TIMESTAMP.
292
293 This is used to identify log records that don't have timestamp.
294
295 :param sort_key: The sort key to check
296 :return: True if the sort key was generated with DEFAULT_SORT_TIMESTAMP, False otherwise
297 """
298 # Extract the timestamp part from the sort key (remove the line number part)
299 timestamp_part = sort_key // SORT_KEY_OFFSET
300 return timestamp_part == DEFAULT_SORT_TIMESTAMP
301
302
303def _add_log_from_parsed_log_streams_to_heap(
304 heap: list[tuple[int, StructuredLogMessage]],
305 parsed_log_streams: dict[int, ParsedLogStream],
306) -> None:
307 """
308 Add one log record from each parsed log stream to the heap, and will remove empty log stream from the dict after iterating.
309
310 :param heap: heap to store log records
311 :param parsed_log_streams: dict of parsed log streams
312 """
313 # We intend to initialize the list lazily, as in most cases we don't need to remove any log streams.
314 # This reduces memory overhead, since this function is called repeatedly until all log streams are empty.
315 log_stream_to_remove: list[int] | None = None
316 for idx, log_stream in parsed_log_streams.items():
317 record: ParsedLog | None = next(log_stream, None)
318 if record is None:
319 if log_stream_to_remove is None:
320 log_stream_to_remove = []
321 log_stream_to_remove.append(idx)
322 continue
323 timestamp, line_num, line = record
324 # take int as sort key to avoid overhead of memory usage
325 heapq.heappush(heap, (_create_sort_key(timestamp, line_num), line))
326 # remove empty log stream from the dict
327 if log_stream_to_remove is not None:
328 for idx in log_stream_to_remove:
329 del parsed_log_streams[idx]
330
331
332def _flush_logs_out_of_heap(
333 heap: list[tuple[int, StructuredLogMessage]],
334 flush_size: int,
335 last_log_container: list[StructuredLogMessage | None],
336) -> Generator[StructuredLogMessage, None, None]:
337 """
338 Flush logs out of the heap, deduplicating them based on the last log.
339
340 :param heap: heap to flush logs from
341 :param flush_size: number of logs to flush
342 :param last_log_container: a container to store the last log, to avoid duplicate logs
343 :return: a generator that yields deduplicated logs
344 """
345 last_log = last_log_container[0]
346 for _ in range(flush_size):
347 sort_key, line = heapq.heappop(heap)
348 if line != last_log or _is_sort_key_with_default_timestamp(sort_key): # dedupe
349 yield line
350 last_log = line
351 # update the last log container with the last log
352 last_log_container[0] = last_log
353
354
355def _interleave_logs(*log_streams: RawLogStream) -> StructuredLogStream:
356 """
357 Merge parsed log streams using K-way merge.
358
359 By yielding HALF_CHUNK_SIZE records when heap size exceeds CHUNK_SIZE, we can reduce the chance of messing up the global order.
360 Since there are multiple log streams, we can't guarantee that the records are in global order.
361
362 e.g.
363
364 log_stream1: ----------
365 log_stream2: ----
366 log_stream3: --------
367
368 The first record of log_stream3 is later than the fourth record of log_stream1 !
369 :param parsed_log_streams: parsed log streams
370 :return: interleaved log stream
371 """
372 # don't need to push whole tuple into heap, which increases too much overhead
373 # push only sort_key and line into heap
374 heap: list[tuple[int, StructuredLogMessage]] = []
375 # to allow removing empty streams while iterating, also turn the str stream into parsed log stream
376 parsed_log_streams: dict[int, ParsedLogStream] = {
377 idx: _log_stream_to_parsed_log_stream(log_stream) for idx, log_stream in enumerate(log_streams)
378 }
379
380 # keep adding records from logs until all logs are empty
381 last_log_container: list[StructuredLogMessage | None] = [None]
382 while parsed_log_streams:
383 _add_log_from_parsed_log_streams_to_heap(heap, parsed_log_streams)
384
385 # yield HALF_HEAP_DUMP_SIZE records when heap size exceeds HEAP_DUMP_SIZE
386 if len(heap) >= HEAP_DUMP_SIZE:
387 yield from _flush_logs_out_of_heap(heap, HALF_HEAP_DUMP_SIZE, last_log_container)
388
389 # yield remaining records
390 yield from _flush_logs_out_of_heap(heap, len(heap), last_log_container)
391 # free memory
392 del heap
393 del parsed_log_streams
394
395
396def _is_logs_stream_like(log) -> bool:
397 """Check if the logs are stream-like."""
398 return isinstance(log, (chain, GeneratorType))
399
400
401def _get_compatible_log_stream(
402 log_messages: LogMessages,
403) -> RawLogStream:
404 """
405 Convert legacy log message blobs into a generator that yields log lines.
406
407 :param log_messages: List of legacy log message strings.
408 :return: A generator that yields interleaved log lines.
409 """
410 yield from chain.from_iterable(
411 _stream_lines_by_chunk(io.StringIO(log_message)) for log_message in log_messages
412 )
413
414
415class FileTaskHandler(logging.Handler):
416 """
417 FileTaskHandler is a python log handler that handles and reads task instance logs.
418
419 It creates and delegates log handling to `logging.FileHandler` after receiving task
420 instance context. It reads logs from task instance's host machine.
421
422 :param base_log_folder: Base log folder to place logs.
423 :param max_bytes: max bytes size for the log file
424 :param backup_count: backup file count for the log file
425 :param delay: default False -> StreamHandler, True -> Handler
426 """
427
428 trigger_should_wrap = True
429 inherits_from_empty_operator_log_message = (
430 "Operator inherits from empty operator and thus does not have logs"
431 )
432 executor_instances: dict[str, BaseExecutor] = {}
433 DEFAULT_EXECUTOR_KEY = "_default_executor"
434
435 def __init__(
436 self,
437 base_log_folder: str,
438 max_bytes: int = 0,
439 backup_count: int = 0,
440 delay: bool = False,
441 ):
442 super().__init__()
443 self.handler: logging.Handler | None = None
444 self.local_base = base_log_folder
445 self.maintain_propagate: bool = False
446 self.max_bytes = max_bytes
447 self.backup_count = backup_count
448 self.delay = delay
449 """
450 If true, overrides default behavior of setting propagate=False
451
452 :meta private:
453 """
454
455 self.ctx_task_deferred = False
456 """
457 If true, task exited with deferral to trigger.
458
459 Some handlers emit "end of log" markers, and may not wish to do so when task defers.
460 """
461
462 def set_context(
463 self, ti: TaskInstance | TaskInstanceHistory, *, identifier: str | None = None
464 ) -> None | SetContextPropagate:
465 """
466 Provide task_instance context to airflow task handler.
467
468 Generally speaking returns None. But if attr `maintain_propagate` has
469 been set to propagate, then returns sentinel MAINTAIN_PROPAGATE. This
470 has the effect of overriding the default behavior to set `propagate`
471 to False whenever set_context is called. At time of writing, this
472 functionality is only used in unit testing.
473
474 :param ti: task instance object
475 :param identifier: if set, adds suffix to log file. For use when relaying exceptional messages
476 to task logs from a context other than task or trigger run
477 """
478 local_loc = self._init_file(ti, identifier=identifier)
479 self.handler = NonCachingRotatingFileHandler(
480 local_loc,
481 encoding="utf-8",
482 maxBytes=self.max_bytes,
483 backupCount=self.backup_count,
484 delay=self.delay,
485 )
486 if self.formatter:
487 self.handler.setFormatter(self.formatter)
488 self.handler.setLevel(self.level)
489 return SetContextPropagate.MAINTAIN_PROPAGATE if self.maintain_propagate else None
490
491 @staticmethod
492 def add_triggerer_suffix(full_path, job_id=None):
493 """
494 Derive trigger log filename from task log filename.
495
496 E.g. given /path/to/file.log returns /path/to/file.log.trigger.123.log, where 123
497 is the triggerer id. We use the triggerer ID instead of trigger ID to distinguish
498 the files because, rarely, the same trigger could get picked up by two different
499 triggerer instances.
500 """
501 full_path = Path(full_path).as_posix()
502 full_path += f".{LogType.TRIGGER.value}"
503 if job_id:
504 full_path += f".{job_id}.log"
505 return full_path
506
507 def emit(self, record):
508 if self.handler:
509 self.handler.emit(record)
510
511 def flush(self):
512 if self.handler:
513 self.handler.flush()
514
515 def close(self):
516 if self.handler:
517 self.handler.close()
518
519 @provide_session
520 def _render_filename(
521 self, ti: TaskInstance | TaskInstanceHistory, try_number: int, session=NEW_SESSION
522 ) -> str:
523 """Return the worker log filename."""
524 dag_run = ti.get_dagrun(session=session)
525
526 date = dag_run.logical_date or dag_run.run_after
527 formatted_date = date.isoformat()
528
529 template = dag_run.get_log_template(session=session).filename
530 str_tpl, jinja_tpl = parse_template_string(template)
531 if jinja_tpl:
532 return render_template(
533 jinja_tpl, {"ti": ti, "ts": formatted_date, "try_number": try_number}, native=False
534 )
535
536 if str_tpl:
537 data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
538 if data_interval[0]:
539 data_interval_start = data_interval[0].isoformat()
540 else:
541 data_interval_start = ""
542 if data_interval[1]:
543 data_interval_end = data_interval[1].isoformat()
544 else:
545 data_interval_end = ""
546 return str_tpl.format(
547 dag_id=ti.dag_id,
548 task_id=ti.task_id,
549 run_id=ti.run_id,
550 data_interval_start=data_interval_start,
551 data_interval_end=data_interval_end,
552 logical_date=formatted_date,
553 try_number=try_number,
554 )
555 raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen")
556
557 def _get_executor_get_task_log(
558 self, ti: TaskInstance | TaskInstanceHistory
559 ) -> Callable[[TaskInstance | TaskInstanceHistory, int], tuple[list[str], list[str]]]:
560 """
561 Get the get_task_log method from executor of current task instance.
562
563 Since there might be multiple executors, so we need to get the executor of current task instance instead of getting from default executor.
564
565 :param ti: task instance object
566 :return: get_task_log method of the executor
567 """
568 executor_name = ti.executor or self.DEFAULT_EXECUTOR_KEY
569 executor = self.executor_instances.get(executor_name)
570 if executor is not None:
571 return executor.get_task_log
572
573 if executor_name == self.DEFAULT_EXECUTOR_KEY:
574 self.executor_instances[executor_name] = ExecutorLoader.get_default_executor()
575 else:
576 self.executor_instances[executor_name] = ExecutorLoader.load_executor(executor_name)
577 return self.executor_instances[executor_name].get_task_log
578
579 def _read(
580 self,
581 ti: TaskInstance | TaskInstanceHistory,
582 try_number: int,
583 metadata: LogMetadata | None = None,
584 ) -> tuple[LogHandlerOutputStream | LegacyProvidersLogType, LogMetadata]:
585 """
586 Template method that contains custom logic of reading logs given the try_number.
587
588 :param ti: task instance record
589 :param try_number: current try_number to read log from
590 :param metadata: log metadata,
591 can be used for steaming log reading and auto-tailing.
592 Following attributes are used:
593 log_pos: (absolute) Char position to which the log
594 which was retrieved in previous calls, this
595 part will be skipped and only following test
596 returned to be added to tail.
597 :return: log message as a string and metadata.
598 Following attributes are used in metadata:
599 end_of_log: Boolean, True if end of log is reached or False
600 if further calls might get more log text.
601 This is determined by the status of the TaskInstance
602 log_pos: (absolute) Char position to which the log is retrieved
603 """
604 # Task instance here might be different from task instance when
605 # initializing the handler. Thus explicitly getting log location
606 # is needed to get correct log path.
607 worker_log_rel_path = self._render_filename(ti, try_number)
608 sources: LogSourceInfo = []
609 source_list: list[str] = []
610 remote_logs: list[RawLogStream] = []
611 local_logs: list[RawLogStream] = []
612 executor_logs: list[RawLogStream] = []
613 served_logs: list[RawLogStream] = []
614 with suppress(NotImplementedError):
615 sources, logs = self._read_remote_logs(ti, try_number, metadata)
616 if not logs:
617 remote_logs = []
618 elif isinstance(logs, list) and isinstance(logs[0], str):
619 # If the logs are in legacy format, convert them to a generator of log lines
620 remote_logs = [
621 # We don't need to use the log_pos here, as we are using the metadata to track the position
622 _get_compatible_log_stream(cast("list[str]", logs))
623 ]
624 elif isinstance(logs, list) and _is_logs_stream_like(logs[0]):
625 # If the logs are already in a stream-like format, we can use them directly
626 remote_logs = cast("list[RawLogStream]", logs)
627 else:
628 # If the logs are in a different format, raise an error
629 raise TypeError("Logs should be either a list of strings or a generator of log lines.")
630 # Extend LogSourceInfo
631 source_list.extend(sources)
632 has_k8s_exec_pod = False
633 if ti.state == TaskInstanceState.RUNNING:
634 executor_get_task_log = self._get_executor_get_task_log(ti)
635 response = executor_get_task_log(ti, try_number)
636 if response:
637 sources, logs = response
638 # make the logs stream-like compatible
639 executor_logs = [_get_compatible_log_stream(logs)]
640 if sources:
641 source_list.extend(sources)
642 has_k8s_exec_pod = True
643 if not (remote_logs and ti.state not in State.unfinished):
644 # when finished, if we have remote logs, no need to check local
645 worker_log_full_path = Path(self.local_base, worker_log_rel_path)
646 sources, local_logs = self._read_from_local(worker_log_full_path)
647 source_list.extend(sources)
648 if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) and not has_k8s_exec_pod:
649 sources, served_logs = self._read_from_logs_server(ti, worker_log_rel_path)
650 source_list.extend(sources)
651 elif ti.state not in State.unfinished and not (local_logs or remote_logs):
652 # ordinarily we don't check served logs, with the assumption that users set up
653 # remote logging or shared drive for logs for persistence, but that's not always true
654 # so even if task is done, if no local logs or remote logs are found, we'll check the worker
655 sources, served_logs = self._read_from_logs_server(ti, worker_log_rel_path)
656 source_list.extend(sources)
657
658 out_stream: LogHandlerOutputStream = _interleave_logs(
659 *local_logs,
660 *remote_logs,
661 *executor_logs,
662 *served_logs,
663 )
664
665 # Log message source details are grouped: they are not relevant for most users and can
666 # distract them from finding the root cause of their errors
667 header = [
668 StructuredLogMessage(event="::group::Log message source details", sources=source_list), # type: ignore[call-arg]
669 StructuredLogMessage(event="::endgroup::"),
670 ]
671 end_of_log = ti.try_number != try_number or ti.state not in (
672 TaskInstanceState.RUNNING,
673 TaskInstanceState.DEFERRED,
674 )
675
676 with LogStreamAccumulator(out_stream, HEAP_DUMP_SIZE) as stream_accumulator:
677 log_pos = stream_accumulator.total_lines
678 out_stream = stream_accumulator.stream
679
680 # skip log stream until the last position
681 if metadata and "log_pos" in metadata:
682 islice(out_stream, metadata["log_pos"])
683 else:
684 # first time reading log, add messages before interleaved log stream
685 out_stream = chain(header, out_stream)
686
687 return out_stream, {
688 "end_of_log": end_of_log,
689 "log_pos": log_pos,
690 }
691
692 @staticmethod
693 @staticmethod
694 def _get_pod_namespace(ti: TaskInstance | TaskInstanceHistory):
695 pod_override = getattr(ti.executor_config, "pod_override", None)
696 metadata = getattr(pod_override, "metadata", None)
697 namespace = None
698 with suppress(Exception):
699 namespace = getattr(metadata, "namespace", None)
700 return namespace or conf.get("kubernetes_executor", "namespace")
701
702 def _get_log_retrieval_url(
703 self,
704 ti: TaskInstance | TaskInstanceHistory,
705 log_relative_path: str,
706 log_type: LogType | None = None,
707 ) -> tuple[str, str]:
708 """Given TI, generate URL with which to fetch logs from service log server."""
709 if log_type == LogType.TRIGGER:
710 if not ti.triggerer_job:
711 raise RuntimeError("Could not build triggerer log URL; no triggerer job.")
712 config_key = "triggerer_log_server_port"
713 config_default = 8794
714 hostname = ti.triggerer_job.hostname
715 log_relative_path = self.add_triggerer_suffix(log_relative_path, job_id=ti.triggerer_job.id)
716 else:
717 hostname = ti.hostname
718 config_key = "worker_log_server_port"
719 config_default = 8793
720 return (
721 urljoin(
722 f"http://{hostname}:{conf.get('logging', config_key, fallback=config_default)}/log/",
723 log_relative_path,
724 ),
725 log_relative_path,
726 )
727
728 def read(
729 self,
730 task_instance: TaskInstance | TaskInstanceHistory,
731 try_number: int | None = None,
732 metadata: LogMetadata | None = None,
733 ) -> tuple[LogHandlerOutputStream, LogMetadata]:
734 """
735 Read logs of given task instance from local machine.
736
737 :param task_instance: task instance object
738 :param try_number: task instance try_number to read logs from. If None
739 it returns the log of task_instance.try_number
740 :param metadata: log metadata, can be used for steaming log reading and auto-tailing.
741 :return: a list of listed tuples which order log string by host
742 """
743 if try_number is None:
744 try_number = task_instance.try_number
745
746 if try_number == 0 and task_instance.state in (
747 TaskInstanceState.SKIPPED,
748 TaskInstanceState.UPSTREAM_FAILED,
749 ):
750 logs = [StructuredLogMessage(event="Task was skipped, no logs available.")]
751 return chain(logs), {"end_of_log": True}
752
753 if try_number is None or try_number < 1:
754 logs = [
755 StructuredLogMessage( # type: ignore[call-arg]
756 level="error", event=f"Error fetching the logs. Try number {try_number} is invalid."
757 )
758 ]
759 return chain(logs), {"end_of_log": True}
760
761 # compatibility for es_task_handler and os_task_handler
762 read_result = self._read(task_instance, try_number, metadata)
763 out_stream, metadata = read_result
764 # If the out_stream is None or empty, return the read result
765 if not out_stream:
766 out_stream = cast("Generator[StructuredLogMessage, None, None]", out_stream)
767 return out_stream, metadata
768
769 if _is_logs_stream_like(out_stream):
770 out_stream = cast("Generator[StructuredLogMessage, None, None]", out_stream)
771 return out_stream, metadata
772 if isinstance(out_stream, list) and isinstance(out_stream[0], StructuredLogMessage):
773 out_stream = cast("list[StructuredLogMessage]", out_stream)
774 return (log for log in out_stream), metadata
775 if isinstance(out_stream, list) and isinstance(out_stream[0], str):
776 # If the out_stream is a list of strings, convert it to a generator
777 out_stream = cast("list[str]", out_stream)
778 raw_stream = _stream_lines_by_chunk(io.StringIO("".join(out_stream)))
779 out_stream = (log for _, _, log in _log_stream_to_parsed_log_stream(raw_stream))
780 return out_stream, metadata
781 if isinstance(out_stream, str):
782 # If the out_stream is a string, convert it to a generator
783 raw_stream = _stream_lines_by_chunk(io.StringIO(out_stream))
784 out_stream = (log for _, _, log in _log_stream_to_parsed_log_stream(raw_stream))
785 return out_stream, metadata
786 raise TypeError(
787 "Invalid log stream type. Expected a generator of StructuredLogMessage, list of StructuredLogMessage, list of str or str."
788 f" Got {type(out_stream).__name__} instead."
789 f" Content type: {type(out_stream[0]).__name__ if isinstance(out_stream, (list, tuple)) and out_stream else 'empty'}"
790 )
791
792 @staticmethod
793 def _prepare_log_folder(directory: Path, new_folder_permissions: int):
794 """
795 Prepare the log folder and ensure its mode is as configured.
796
797 To handle log writing when tasks are impersonated, the log files need to
798 be writable by the user that runs the Airflow command and the user
799 that is impersonated. This is mainly to handle corner cases with the
800 SubDagOperator. When the SubDagOperator is run, all of the operators
801 run under the impersonated user and create appropriate log files
802 as the impersonated user. However, if the user manually runs tasks
803 of the SubDagOperator through the UI, then the log files are created
804 by the user that runs the Airflow command. For example, the Airflow
805 run command may be run by the `airflow_sudoable` user, but the Airflow
806 tasks may be run by the `airflow` user. If the log files are not
807 writable by both users, then it's possible that re-running a task
808 via the UI (or vice versa) results in a permission error as the task
809 tries to write to a log file created by the other user.
810
811 We leave it up to the user to manage their permissions by exposing configuration for both
812 new folders and new log files. Default is to make new log folders and files group-writeable
813 to handle most common impersonation use cases. The requirement in this case will be to make
814 sure that the same group is set as default group for both - impersonated user and main airflow
815 user.
816 """
817 for parent in reversed(directory.parents):
818 parent.mkdir(mode=new_folder_permissions, exist_ok=True)
819 directory.mkdir(mode=new_folder_permissions, exist_ok=True)
820
821 def _init_file(self, ti, *, identifier: str | None = None):
822 """
823 Create log directory and give it permissions that are configured.
824
825 See above _prepare_log_folder method for more detailed explanation.
826
827 :param ti: task instance object
828 :return: relative log path of the given task instance
829 """
830 new_file_permissions = int(
831 conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"), 8
832 )
833 local_relative_path = self._render_filename(ti, ti.try_number)
834 full_path = os.path.join(self.local_base, local_relative_path)
835 if identifier:
836 full_path += f".{identifier}.log"
837 elif ti.is_trigger_log_context is True:
838 # if this is true, we're invoked via set_context in the context of
839 # setting up individual trigger logging. return trigger log path.
840 full_path = self.add_triggerer_suffix(full_path=full_path, job_id=ti.triggerer_job.id)
841 new_folder_permissions = int(
842 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 8
843 )
844 self._prepare_log_folder(Path(full_path).parent, new_folder_permissions)
845
846 if not os.path.exists(full_path):
847 open(full_path, "a").close()
848 try:
849 os.chmod(full_path, new_file_permissions)
850 except OSError as e:
851 logger.warning("OSError while changing ownership of the log file. ", e)
852
853 return full_path
854
855 @staticmethod
856 def _read_from_local(
857 worker_log_path: Path,
858 ) -> StreamingLogResponse:
859 sources: LogSourceInfo = []
860 log_streams: list[RawLogStream] = []
861 paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*"))
862 if not paths:
863 return sources, log_streams
864
865 for path in paths:
866 sources.append(os.fspath(path))
867 # Read the log file and yield lines
868 log_streams.append(_stream_lines_by_chunk(open(path, encoding="utf-8")))
869 return sources, log_streams
870
871 def _read_from_logs_server(
872 self,
873 ti: TaskInstance | TaskInstanceHistory,
874 worker_log_rel_path: str,
875 ) -> StreamingLogResponse:
876 sources: LogSourceInfo = []
877 log_streams: list[RawLogStream] = []
878 try:
879 log_type = LogType.TRIGGER if getattr(ti, "triggerer_job", False) else LogType.WORKER
880 url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type)
881 response = _fetch_logs_from_service(url, rel_path)
882 if response.status_code == 403:
883 sources.append(
884 "!!!! Please make sure that all your Airflow components (e.g. "
885 "schedulers, api-servers, dag-processors, workers and triggerer) have "
886 "the same 'secret_key' configured in '[api]' section and "
887 "time is synchronized on all your machines (for example with ntpd)\n"
888 "See more at https://airflow.apache.org/docs/apache-airflow/"
889 "stable/configurations-ref.html#secret-key"
890 )
891 else:
892 # Check if the resource was properly fetched
893 response.raise_for_status()
894 if int(response.headers.get("Content-Length", 0)) > 0:
895 sources.append(url)
896 log_streams.append(
897 _stream_lines_by_chunk(io.TextIOWrapper(cast("IO[bytes]", response.raw)))
898 )
899 except Exception as e:
900 from requests.exceptions import InvalidURL
901
902 if (
903 isinstance(e, InvalidURL)
904 and ti.task is not None
905 and ti.task.inherits_from_empty_operator is True
906 ):
907 sources.append(self.inherits_from_empty_operator_log_message)
908 else:
909 sources.append(f"Could not read served logs: {e}")
910 logger.exception("Could not read served logs")
911 return sources, log_streams
912
913 def _read_remote_logs(self, ti, try_number, metadata=None) -> LogResponse | StreamingLogResponse:
914 """
915 Implement in subclasses to read from the remote service.
916
917 This method should return two lists, messages and logs.
918
919 * Each element in the messages list should be a single message,
920 such as, "reading from x file".
921 * Each element in the logs list should be the content of one file.
922 """
923 remote_io = None
924 try:
925 from airflow.logging_config import REMOTE_TASK_LOG
926
927 remote_io = REMOTE_TASK_LOG
928 except Exception:
929 pass
930
931 if remote_io is None:
932 # Import not found, or explicitly set to None
933 raise NotImplementedError
934
935 # This living here is not really a good plan, but it just about works for now.
936 # Ideally we move all the read+combine logic in to TaskLogReader and out of the task handler.
937 path = self._render_filename(ti, try_number)
938 if stream_method := getattr(remote_io, "stream", None):
939 # Use .stream interface if provider's RemoteIO supports it
940 sources, logs = stream_method(path, ti)
941 return sources, logs or []
942 # Fallback to .read interface
943 sources, logs = remote_io.read(path, ti)
944 return sources, logs or []