Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/logging_config.py: 70%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

71 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import logging 

21import warnings 

22from importlib import import_module 

23from typing import TYPE_CHECKING, Any 

24 

25from airflow._shared.module_loading import import_string 

26from airflow.configuration import conf 

27from airflow.exceptions import AirflowConfigException 

28 

29if TYPE_CHECKING: 

30 from airflow.logging.remote import RemoteLogIO 

31 

32log = logging.getLogger(__name__) 

33 

34 

35class _ActiveLoggingConfig: 

36 """Private class to hold active logging config variables.""" 

37 

38 logging_config_loaded: bool = False 

39 remote_task_log: RemoteLogIO | None 

40 default_remote_conn_id: str | None = None 

41 

42 

43def get_remote_task_log() -> RemoteLogIO | None: 

44 if not _ActiveLoggingConfig.logging_config_loaded: 

45 load_logging_config() 

46 return _ActiveLoggingConfig.remote_task_log 

47 

48 

49def get_default_remote_conn_id() -> str | None: 

50 if not _ActiveLoggingConfig.logging_config_loaded: 

51 load_logging_config() 

52 return _ActiveLoggingConfig.default_remote_conn_id 

53 

54 

55def load_logging_config() -> tuple[dict[str, Any], str]: 

56 """Configure & Validate Airflow Logging.""" 

57 fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" 

58 logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback) 

59 _ActiveLoggingConfig.logging_config_loaded = True 

60 

61 # Sometimes we end up with `""` as the value! 

62 logging_class_path = logging_class_path or fallback 

63 

64 user_defined = logging_class_path != fallback 

65 

66 try: 

67 logging_config = import_string(logging_class_path) 

68 

69 # Make sure that the variable is in scope 

70 if not isinstance(logging_config, dict): 

71 raise ValueError("Logging Config should be of dict type") 

72 

73 if user_defined: 

74 log.info("Successfully imported user-defined logging config from %s", logging_class_path) 

75 

76 except Exception as err: 

77 # Import default logging configurations. 

78 raise ImportError( 

79 f"Unable to load {'custom ' if user_defined else ''}logging config from {logging_class_path} due " 

80 f"to: {type(err).__name__}:{err}" 

81 ) 

82 else: 

83 modpath = logging_class_path.rsplit(".", 1)[0] 

84 try: 

85 mod = import_module(modpath) 

86 

87 # Load remote logging configuration from the custom module 

88 _ActiveLoggingConfig.remote_task_log = getattr(mod, "REMOTE_TASK_LOG") 

89 _ActiveLoggingConfig.default_remote_conn_id = getattr(mod, "DEFAULT_REMOTE_CONN_ID", None) 

90 except Exception as err: 

91 log.info("Remote task logs will not be available due to an error: %s", err) 

92 

93 return logging_config, logging_class_path 

94 

95 

96def configure_logging(): 

97 from airflow._shared.logging import configure_logging, init_log_folder, translate_config_values 

98 

99 logging_config, logging_class_path = load_logging_config() 

100 try: 

101 level: str = getattr( 

102 logging_config, "LOG_LEVEL", conf.get("logging", "logging_level", fallback="INFO") 

103 ).upper() 

104 

105 colors = getattr( 

106 logging_config, 

107 "COLORED_LOG", 

108 conf.getboolean("logging", "colored_console_log", fallback=True), 

109 ) 

110 # Try to init logging 

111 

112 log_fmt, callsite_params = translate_config_values( 

113 log_format=getattr(logging_config, "LOG_FORMAT", conf.get("logging", "log_format", fallback="")), 

114 callsite_params=conf.getlist("logging", "callsite_parameters", fallback=[]), 

115 ) 

116 configure_logging( 

117 log_level=level, 

118 namespace_log_levels=conf.get("logging", "namespace_levels", fallback=None), 

119 stdlib_config=logging_config, 

120 log_format=log_fmt, 

121 callsite_parameters=callsite_params, 

122 colors=colors, 

123 ) 

124 except (ValueError, KeyError) as e: 

125 log.error("Unable to load the config, contains a configuration error.") 

126 # When there is an error in the config, escalate the exception 

127 # otherwise Airflow would silently fall back on the default config 

128 raise e 

129 

130 validate_logging_config() 

131 

132 new_folder_permissions = int( 

133 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 

134 8, 

135 ) 

136 

137 base_log_folder = conf.get("logging", "base_log_folder") 

138 

139 return init_log_folder( 

140 base_log_folder, 

141 new_folder_permissions=new_folder_permissions, 

142 ) 

143 

144 

145def validate_logging_config(): 

146 """Validate the provided Logging Config.""" 

147 # Now lets validate the other logging-related settings 

148 task_log_reader = conf.get("logging", "task_log_reader") 

149 

150 logger = logging.getLogger("airflow.task") 

151 

152 def _get_handler(name): 

153 return next((h for h in logger.handlers if h.name == name), None) 

154 

155 if _get_handler(task_log_reader) is None: 

156 # Check for pre 1.10 setting that might be in deployed airflow.cfg files 

157 if task_log_reader == "file.task" and _get_handler("task"): 

158 warnings.warn( 

159 f"task_log_reader setting in [logging] has a deprecated value of {task_log_reader!r}, " 

160 "but no handler with this name was found. Please update your config to use task. " 

161 "Running config has been adjusted to match", 

162 DeprecationWarning, 

163 stacklevel=2, 

164 ) 

165 conf.set("logging", "task_log_reader", "task") 

166 else: 

167 raise AirflowConfigException( 

168 f"Configured task_log_reader {task_log_reader!r} was not a handler of " 

169 f"the 'airflow.task' logger." 

170 )