Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/log/file_processor_handler.py: 50%

72 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import logging 

21import os 

22from datetime import datetime 

23from pathlib import Path 

24 

25from airflow import settings 

26from airflow.utils.helpers import parse_template_string 

27from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE 

28from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler 

29 

30 

31class FileProcessorHandler(logging.Handler): 

32 """ 

33 FileProcessorHandler is a python log handler that handles 

34 dag processor logs. It creates and delegates log handling 

35 to `logging.FileHandler` after receiving dag processor context. 

36 

37 :param base_log_folder: Base log folder to place logs. 

38 :param filename_template: template filename string 

39 """ 

40 

41 def __init__(self, base_log_folder, filename_template): 

42 super().__init__() 

43 self.handler = None 

44 self.base_log_folder = base_log_folder 

45 self.dag_dir = os.path.expanduser(settings.DAGS_FOLDER) 

46 self.filename_template, self.filename_jinja_template = parse_template_string(filename_template) 

47 

48 self._cur_date = datetime.today() 

49 Path(self._get_log_directory()).mkdir(parents=True, exist_ok=True) 

50 

51 self._symlink_latest_log_directory() 

52 

53 def set_context(self, filename): 

54 """ 

55 Provide filename context to airflow task handler. 

56 

57 :param filename: filename in which the dag is located 

58 """ 

59 local_loc = self._init_file(filename) 

60 self.handler = NonCachingFileHandler(local_loc) 

61 self.handler.setFormatter(self.formatter) 

62 self.handler.setLevel(self.level) 

63 

64 if self._cur_date < datetime.today(): 

65 self._symlink_latest_log_directory() 

66 self._cur_date = datetime.today() 

67 

68 return DISABLE_PROPOGATE 

69 

70 def emit(self, record): 

71 if self.handler is not None: 

72 self.handler.emit(record) 

73 

74 def flush(self): 

75 if self.handler is not None: 

76 self.handler.flush() 

77 

78 def close(self): 

79 if self.handler is not None: 

80 self.handler.close() 

81 

82 def _render_filename(self, filename): 

83 

84 # Airflow log path used to be generated by `os.path.relpath(filename, self.dag_dir)`, however all DAGs 

85 # in airflow source code are not located in the DAG dir as other DAGs. 

86 # That will create a log filepath which is not under control since it could be outside 

87 # of the log dir. The change here is to make sure the log path for DAGs in airflow code 

88 # is always inside the log dir as other DAGs. To be differentiate with regular DAGs, 

89 # their logs will be in the `log_dir/native_dags`. 

90 import airflow 

91 

92 airflow_directory = airflow.__path__[0] 

93 if filename.startswith(airflow_directory): 

94 filename = os.path.join("native_dags", os.path.relpath(filename, airflow_directory)) 

95 else: 

96 filename = os.path.relpath(filename, self.dag_dir) 

97 ctx = {"filename": filename} 

98 

99 if self.filename_jinja_template: 

100 return self.filename_jinja_template.render(**ctx) 

101 

102 return self.filename_template.format(filename=ctx["filename"]) 

103 

104 def _get_log_directory(self): 

105 now = datetime.utcnow() 

106 

107 return os.path.join(self.base_log_folder, now.strftime("%Y-%m-%d")) 

108 

109 def _symlink_latest_log_directory(self): 

110 """ 

111 Create symbolic link to the current day's log directory to 

112 allow easy access to the latest scheduler log files. 

113 

114 :return: None 

115 """ 

116 log_directory = self._get_log_directory() 

117 latest_log_directory_path = os.path.join(self.base_log_folder, "latest") 

118 if os.path.isdir(log_directory): 

119 try: 

120 # if symlink exists but is stale, update it 

121 if os.path.islink(latest_log_directory_path): 

122 if os.readlink(latest_log_directory_path) != log_directory: 

123 os.unlink(latest_log_directory_path) 

124 os.symlink(log_directory, latest_log_directory_path) 

125 elif os.path.isdir(latest_log_directory_path) or os.path.isfile(latest_log_directory_path): 

126 logging.warning( 

127 "%s already exists as a dir/file. Skip creating symlink.", latest_log_directory_path 

128 ) 

129 else: 

130 os.symlink(log_directory, latest_log_directory_path) 

131 except OSError: 

132 logging.warning("OSError while attempting to symlink the latest log directory") 

133 

134 def _init_file(self, filename): 

135 """ 

136 Create log file and directory if required. 

137 

138 :param filename: task instance object 

139 :return: relative log path of the given task instance 

140 """ 

141 relative_log_file_path = os.path.join(self._get_log_directory(), self._render_filename(filename)) 

142 log_file_path = os.path.abspath(relative_log_file_path) 

143 directory = os.path.dirname(log_file_path) 

144 

145 Path(directory).mkdir(parents=True, exist_ok=True) 

146 

147 if not os.path.exists(log_file_path): 

148 open(log_file_path, "a").close() 

149 

150 return log_file_path