1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18from __future__ import annotations
19
20import os
21from collections.abc import Callable, Generator
22from importlib import import_module
23from typing import TYPE_CHECKING, Any, Protocol, TypeAlias, runtime_checkable
24
25import structlog
26
27LogMessages: TypeAlias = list[str]
28"""The legacy format of log messages before 3.0.4"""
29LogSourceInfo: TypeAlias = list[str]
30"""Information about the log fetching process for display to a user"""
31RawLogStream: TypeAlias = Generator[str, None, None]
32"""Raw log stream, containing unparsed log lines"""
33LogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages | None]
34"""Legacy log response, containing source information and log messages"""
35StreamingLogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]]
36"""Streaming log response, containing source information, stream of log lines"""
37
38if TYPE_CHECKING:
39 # RuntimeTI is only needed for type checking to avoid circular imports
40 from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
41
42log = structlog.getLogger(__name__)
43
44
45class RemoteLogIO(Protocol):
46 """Interface for remote task loggers."""
47
48 @property
49 def processors(self) -> tuple[structlog.typing.Processor, ...]:
50 """
51 List of structlog processors to install in the task write path.
52
53 This is useful if a remote logging provider wants to either transform
54 the structured log messages as they are being written to a file, or if
55 you want to upload messages as they are generated.
56 """
57 ...
58
59 def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None:
60 """Upload the given log path to the remote storage."""
61 ...
62
63 def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse:
64 """Read logs from the given remote log path."""
65 ...
66
67
68@runtime_checkable
69class RemoteLogStreamIO(RemoteLogIO, Protocol):
70 """Interface for remote task loggers with stream-based read support."""
71
72 def stream(self, relative_path: str, ti: RuntimeTI) -> StreamingLogResponse:
73 """Stream-based read interface for reading logs from the given remote log path."""
74 ...
75
76
77def discover_remote_log_handler(
78 logging_class_path: str,
79 fallback_path: str,
80 import_string: Callable[[str], Any],
81) -> tuple[RemoteLogIO | None, str | None]:
82 """Discover and load the remote log handler from a logging config module."""
83 # Sometimes we end up with `""` as the value!
84 logging_class_path = logging_class_path or fallback_path
85
86 try:
87 logging_config = import_string(logging_class_path)
88
89 # Make sure that the variable is in scope
90 if not isinstance(logging_config, dict):
91 return None, None
92
93 modpath = logging_class_path.rsplit(".", 1)[0]
94 mod = import_module(modpath)
95
96 # Load remote logging configuration from the custom module
97 remote_task_log = getattr(mod, "REMOTE_TASK_LOG", None)
98 default_remote_conn_id = getattr(mod, "DEFAULT_REMOTE_CONN_ID", None)
99
100 return remote_task_log, default_remote_conn_id
101
102 except Exception as err:
103 log.info("Remote task logs will not be available due to an error: %s", err)
104 return None, None