Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/_shared/logging/remote.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

46 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import os 

21from collections.abc import Callable, Generator 

22from importlib import import_module 

23from typing import TYPE_CHECKING, Any, Protocol, TypeAlias, runtime_checkable 

24 

25import structlog 

26 

27LogMessages: TypeAlias = list[str] 

28"""The legacy format of log messages before 3.0.4""" 

29LogSourceInfo: TypeAlias = list[str] 

30"""Information about the log fetching process for display to a user""" 

31RawLogStream: TypeAlias = Generator[str, None, None] 

32"""Raw log stream, containing unparsed log lines""" 

33LogResponse: TypeAlias = tuple[LogSourceInfo, LogMessages | None] 

34"""Legacy log response, containing source information and log messages""" 

35StreamingLogResponse: TypeAlias = tuple[LogSourceInfo, list[RawLogStream]] 

36"""Streaming log response, containing source information, stream of log lines""" 

37 

38if TYPE_CHECKING: 

39 # RuntimeTI is only needed for type checking to avoid circular imports 

40 from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI 

41 

42log = structlog.getLogger(__name__) 

43 

44 

45class RemoteLogIO(Protocol): 

46 """Interface for remote task loggers.""" 

47 

48 @property 

49 def processors(self) -> tuple[structlog.typing.Processor, ...]: 

50 """ 

51 List of structlog processors to install in the task write path. 

52 

53 This is useful if a remote logging provider wants to either transform 

54 the structured log messages as they are being written to a file, or if 

55 you want to upload messages as they are generated. 

56 """ 

57 ... 

58 

59 def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None: 

60 """Upload the given log path to the remote storage.""" 

61 ... 

62 

63 def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse: 

64 """Read logs from the given remote log path.""" 

65 ... 

66 

67 

68@runtime_checkable 

69class RemoteLogStreamIO(RemoteLogIO, Protocol): 

70 """Interface for remote task loggers with stream-based read support.""" 

71 

72 def stream(self, relative_path: str, ti: RuntimeTI) -> StreamingLogResponse: 

73 """Stream-based read interface for reading logs from the given remote log path.""" 

74 ... 

75 

76 

77def discover_remote_log_handler( 

78 logging_class_path: str, 

79 fallback_path: str, 

80 import_string: Callable[[str], Any], 

81) -> tuple[RemoteLogIO | None, str | None]: 

82 """Discover and load the remote log handler from a logging config module.""" 

83 # Sometimes we end up with `""` as the value! 

84 logging_class_path = logging_class_path or fallback_path 

85 

86 try: 

87 logging_config = import_string(logging_class_path) 

88 

89 # Make sure that the variable is in scope 

90 if not isinstance(logging_config, dict): 

91 return None, None 

92 

93 modpath = logging_class_path.rsplit(".", 1)[0] 

94 mod = import_module(modpath) 

95 

96 # Load remote logging configuration from the custom module 

97 remote_task_log = getattr(mod, "REMOTE_TASK_LOG", None) 

98 default_remote_conn_id = getattr(mod, "DEFAULT_REMOTE_CONN_ID", None) 

99 

100 return remote_task_log, default_remote_conn_id 

101 

102 except Exception as err: 

103 log.info("Remote task logs will not be available due to an error: %s", err) 

104 return None, None