Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/log/file_processor_handler.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

74 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import logging 

21import os 

22from datetime import datetime 

23from pathlib import Path 

24 

25from airflow import settings 

26from airflow.utils import timezone 

27from airflow.utils.helpers import parse_template_string 

28from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE 

29from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler 

30 

31logger = logging.getLogger(__name__) 

32 

33 

34class FileProcessorHandler(logging.Handler): 

35 """ 

36 FileProcessorHandler is a python log handler that handles dag processor logs. 

37 

38 It creates and delegates log handling to `logging.FileHandler` 

39 after receiving dag processor context. 

40 

41 :param base_log_folder: Base log folder to place logs. 

42 :param filename_template: template filename string 

43 """ 

44 

45 def __init__(self, base_log_folder, filename_template): 

46 super().__init__() 

47 self.handler = None 

48 self.base_log_folder = base_log_folder 

49 self.dag_dir = os.path.expanduser(settings.DAGS_FOLDER) 

50 self.filename_template, self.filename_jinja_template = parse_template_string(filename_template) 

51 

52 self._cur_date = datetime.today() 

53 Path(self._get_log_directory()).mkdir(parents=True, exist_ok=True) 

54 

55 self._symlink_latest_log_directory() 

56 

57 def set_context(self, filename): 

58 """ 

59 Provide filename context to airflow task handler. 

60 

61 :param filename: filename in which the dag is located 

62 """ 

63 local_loc = self._init_file(filename) 

64 self.handler = NonCachingFileHandler(local_loc) 

65 self.handler.setFormatter(self.formatter) 

66 self.handler.setLevel(self.level) 

67 

68 if self._cur_date < datetime.today(): 

69 self._symlink_latest_log_directory() 

70 self._cur_date = datetime.today() 

71 

72 return DISABLE_PROPOGATE 

73 

74 def emit(self, record): 

75 if self.handler is not None: 

76 self.handler.emit(record) 

77 

78 def flush(self): 

79 if self.handler is not None: 

80 self.handler.flush() 

81 

82 def close(self): 

83 if self.handler is not None: 

84 self.handler.close() 

85 

86 def _render_filename(self, filename): 

87 # Airflow log path used to be generated by `os.path.relpath(filename, self.dag_dir)`, however all DAGs 

88 # in airflow source code are not located in the DAG dir as other DAGs. 

89 # That will create a log filepath which is not under control since it could be outside 

90 # of the log dir. The change here is to make sure the log path for DAGs in airflow code 

91 # is always inside the log dir as other DAGs. To be differentiate with regular DAGs, 

92 # their logs will be in the `log_dir/native_dags`. 

93 import airflow 

94 

95 airflow_directory = airflow.__path__[0] 

96 if filename.startswith(airflow_directory): 

97 filename = os.path.join("native_dags", os.path.relpath(filename, airflow_directory)) 

98 else: 

99 filename = os.path.relpath(filename, self.dag_dir) 

100 ctx = {"filename": filename} 

101 

102 if self.filename_jinja_template: 

103 return self.filename_jinja_template.render(**ctx) 

104 

105 return self.filename_template.format(filename=ctx["filename"]) 

106 

107 def _get_log_directory(self): 

108 return os.path.join(self.base_log_folder, timezone.utcnow().strftime("%Y-%m-%d")) 

109 

110 def _symlink_latest_log_directory(self): 

111 """ 

112 Create symbolic link to the current day's log directory. 

113 

114 Allows easy access to the latest scheduler log files. 

115 

116 :return: None 

117 """ 

118 log_directory = self._get_log_directory() 

119 latest_log_directory_path = os.path.join(self.base_log_folder, "latest") 

120 if os.path.isdir(log_directory): 

121 rel_link_target = Path(log_directory).relative_to(Path(latest_log_directory_path).parent) 

122 try: 

123 # if symlink exists but is stale, update it 

124 if os.path.islink(latest_log_directory_path): 

125 if os.path.realpath(latest_log_directory_path) != log_directory: 

126 os.unlink(latest_log_directory_path) 

127 os.symlink(rel_link_target, latest_log_directory_path) 

128 elif os.path.isdir(latest_log_directory_path) or os.path.isfile(latest_log_directory_path): 

129 logger.warning( 

130 "%s already exists as a dir/file. Skip creating symlink.", latest_log_directory_path 

131 ) 

132 else: 

133 os.symlink(rel_link_target, latest_log_directory_path) 

134 except OSError: 

135 logger.warning("OSError while attempting to symlink the latest log directory") 

136 

137 def _init_file(self, filename): 

138 """ 

139 Create log file and directory if required. 

140 

141 :param filename: task instance object 

142 :return: relative log path of the given task instance 

143 """ 

144 relative_log_file_path = os.path.join(self._get_log_directory(), self._render_filename(filename)) 

145 log_file_path = os.path.abspath(relative_log_file_path) 

146 directory = os.path.dirname(log_file_path) 

147 

148 Path(directory).mkdir(parents=True, exist_ok=True) 

149 

150 if not os.path.exists(log_file_path): 

151 open(log_file_path, "a").close() 

152 

153 return log_file_path