Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/logging_config.py: 73%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

70 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import logging 

21import warnings 

22from typing import TYPE_CHECKING, Any 

23 

24from airflow._shared.logging.remote import discover_remote_log_handler 

25from airflow._shared.module_loading import import_string 

26from airflow.configuration import conf 

27from airflow.exceptions import AirflowConfigException 

28 

29if TYPE_CHECKING: 

30 from airflow.logging.remote import RemoteLogIO 

31 

32log = logging.getLogger(__name__) 

33 

34 

35class _ActiveLoggingConfig: 

36 """Private class to hold active logging config variables.""" 

37 

38 logging_config_loaded: bool = False 

39 remote_task_log: RemoteLogIO | None 

40 default_remote_conn_id: str | None = None 

41 

42 @classmethod 

43 def set(cls, remote_task_log: RemoteLogIO | None, default_remote_conn_id: str | None) -> None: 

44 """Set remote logging configuration atomically.""" 

45 cls.remote_task_log = remote_task_log 

46 cls.default_remote_conn_id = default_remote_conn_id 

47 cls.logging_config_loaded = True 

48 

49 

50def get_remote_task_log() -> RemoteLogIO | None: 

51 if not _ActiveLoggingConfig.logging_config_loaded: 

52 load_logging_config() 

53 return _ActiveLoggingConfig.remote_task_log 

54 

55 

56def get_default_remote_conn_id() -> str | None: 

57 if not _ActiveLoggingConfig.logging_config_loaded: 

58 load_logging_config() 

59 return _ActiveLoggingConfig.default_remote_conn_id 

60 

61 

62def load_logging_config() -> tuple[dict[str, Any], str]: 

63 """Configure & Validate Airflow Logging.""" 

64 fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" 

65 logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback) 

66 

67 # Sometimes we end up with `""` as the value! 

68 logging_class_path = logging_class_path or fallback 

69 

70 user_defined = logging_class_path != fallback 

71 

72 try: 

73 logging_config = import_string(logging_class_path) 

74 

75 # Make sure that the variable is in scope 

76 if not isinstance(logging_config, dict): 

77 raise ValueError("Logging Config should be of dict type") 

78 

79 if user_defined: 

80 log.info("Successfully imported user-defined logging config from %s", logging_class_path) 

81 

82 except Exception as err: 

83 # Import default logging configurations. 

84 raise ImportError( 

85 f"Unable to load {'custom ' if user_defined else ''}logging config from {logging_class_path} due " 

86 f"to: {type(err).__name__}:{err}" 

87 ) 

88 else: 

89 # Load remote logging configuration using shared discovery logic 

90 remote_task_log, default_remote_conn_id = discover_remote_log_handler( 

91 logging_class_path, fallback, import_string 

92 ) 

93 _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id) 

94 

95 return logging_config, logging_class_path 

96 

97 

98def configure_logging(): 

99 from airflow._shared.logging import configure_logging, init_log_folder, translate_config_values 

100 

101 logging_config, logging_class_path = load_logging_config() 

102 try: 

103 level: str = getattr( 

104 logging_config, "LOG_LEVEL", conf.get("logging", "logging_level", fallback="INFO") 

105 ).upper() 

106 

107 colors = getattr( 

108 logging_config, 

109 "COLORED_LOG", 

110 conf.getboolean("logging", "colored_console_log", fallback=True), 

111 ) 

112 # Try to init logging 

113 

114 log_fmt, callsite_params = translate_config_values( 

115 log_format=getattr(logging_config, "LOG_FORMAT", conf.get("logging", "log_format", fallback="")), 

116 callsite_params=conf.getlist("logging", "callsite_parameters", fallback=[]), 

117 ) 

118 configure_logging( 

119 log_level=level, 

120 namespace_log_levels=conf.get("logging", "namespace_levels", fallback=None), 

121 stdlib_config=logging_config, 

122 log_format=log_fmt, 

123 callsite_parameters=callsite_params, 

124 colors=colors, 

125 ) 

126 except (ValueError, KeyError) as e: 

127 log.error("Unable to load the config, contains a configuration error.") 

128 # When there is an error in the config, escalate the exception 

129 # otherwise Airflow would silently fall back on the default config 

130 raise e 

131 

132 validate_logging_config() 

133 

134 new_folder_permissions = int( 

135 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 

136 8, 

137 ) 

138 

139 base_log_folder = conf.get("logging", "base_log_folder") 

140 

141 return init_log_folder( 

142 base_log_folder, 

143 new_folder_permissions=new_folder_permissions, 

144 ) 

145 

146 

147def validate_logging_config(): 

148 """Validate the provided Logging Config.""" 

149 # Now lets validate the other logging-related settings 

150 task_log_reader = conf.get("logging", "task_log_reader") 

151 

152 logger = logging.getLogger("airflow.task") 

153 

154 def _get_handler(name): 

155 return next((h for h in logger.handlers if h.name == name), None) 

156 

157 if _get_handler(task_log_reader) is None: 

158 # Check for pre 1.10 setting that might be in deployed airflow.cfg files 

159 if task_log_reader == "file.task" and _get_handler("task"): 

160 warnings.warn( 

161 f"task_log_reader setting in [logging] has a deprecated value of {task_log_reader!r}, " 

162 "but no handler with this name was found. Please update your config to use task. " 

163 "Running config has been adjusted to match", 

164 DeprecationWarning, 

165 stacklevel=2, 

166 ) 

167 conf.set("logging", "task_log_reader", "task") 

168 else: 

169 raise AirflowConfigException( 

170 f"Configured task_log_reader {task_log_reader!r} was not a handler of " 

171 f"the 'airflow.task' logger." 

172 )