Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/sentry.py: 21%

100 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Sentry Integration.""" 

19from __future__ import annotations 

20 

21import logging 

22from functools import wraps 

23from typing import TYPE_CHECKING 

24 

25from airflow.configuration import conf 

26from airflow.utils.session import find_session_idx, provide_session 

27from airflow.utils.state import State 

28 

29if TYPE_CHECKING: 

30 from sqlalchemy.orm import Session 

31 

32log = logging.getLogger(__name__) 

33 

34 

35class DummySentry: 

36 """Blank class for Sentry.""" 

37 

38 @classmethod 

39 def add_tagging(cls, task_instance): 

40 """Blank function for tagging.""" 

41 

42 @classmethod 

43 def add_breadcrumbs(cls, task_instance, session: Session | None = None): 

44 """Blank function for breadcrumbs.""" 

45 

46 @classmethod 

47 def enrich_errors(cls, run): 

48 """Blank function for formatting a TaskInstance._run_raw_task.""" 

49 return run 

50 

51 def flush(self): 

52 """Blank function for flushing errors.""" 

53 

54 

55Sentry: DummySentry = DummySentry() 

56if conf.getboolean("sentry", "sentry_on", fallback=False): 

57 import sentry_sdk 

58 

59 # Verify blinker installation 

60 from blinker import signal # noqa: F401 

61 from sentry_sdk.integrations.flask import FlaskIntegration 

62 from sentry_sdk.integrations.logging import ignore_logger 

63 

64 class ConfiguredSentry(DummySentry): 

65 """Configure Sentry SDK.""" 

66 

67 SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", "data_interval_start", "execution_date")) 

68 SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number")) 

69 SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration")) 

70 

71 UNSUPPORTED_SENTRY_OPTIONS = frozenset( 

72 ( 

73 "integrations", 

74 "in_app_include", 

75 "in_app_exclude", 

76 "ignore_errors", 

77 "before_breadcrumb", 

78 "transport", 

79 ) 

80 ) 

81 

82 def __init__(self): 

83 """Initialize the Sentry SDK.""" 

84 ignore_logger("airflow.task") 

85 executor_name = conf.get("core", "EXECUTOR") 

86 

87 sentry_flask = FlaskIntegration() 

88 

89 # LoggingIntegration is set by default. 

90 integrations = [sentry_flask] 

91 

92 if executor_name == "CeleryExecutor": 

93 from sentry_sdk.integrations.celery import CeleryIntegration 

94 

95 sentry_celery = CeleryIntegration() 

96 integrations.append(sentry_celery) 

97 

98 dsn = None 

99 sentry_config_opts = conf.getsection("sentry") or {} 

100 if sentry_config_opts: 

101 sentry_config_opts.pop("sentry_on") 

102 old_way_dsn = sentry_config_opts.pop("sentry_dsn", None) 

103 new_way_dsn = sentry_config_opts.pop("dsn", None) 

104 # supported backward compatibility with old way dsn option 

105 dsn = old_way_dsn or new_way_dsn 

106 

107 unsupported_options = self.UNSUPPORTED_SENTRY_OPTIONS.intersection(sentry_config_opts.keys()) 

108 if unsupported_options: 

109 log.warning( 

110 "There are unsupported options in [sentry] section: %s", 

111 ", ".join(unsupported_options), 

112 ) 

113 

114 sentry_config_opts["before_send"] = conf.getimport("sentry", "before_send", fallback=None) 

115 

116 if dsn: 

117 sentry_sdk.init(dsn=dsn, integrations=integrations, **sentry_config_opts) 

118 else: 

119 # Setting up Sentry using environment variables. 

120 log.debug("Defaulting to SENTRY_DSN in environment.") 

121 sentry_sdk.init(integrations=integrations, **sentry_config_opts) 

122 

123 def add_tagging(self, task_instance): 

124 """Function to add tagging for a task_instance.""" 

125 dag_run = task_instance.dag_run 

126 task = task_instance.task 

127 

128 with sentry_sdk.configure_scope() as scope: 

129 for tag_name in self.SCOPE_TASK_INSTANCE_TAGS: 

130 attribute = getattr(task_instance, tag_name) 

131 scope.set_tag(tag_name, attribute) 

132 for tag_name in self.SCOPE_DAG_RUN_TAGS: 

133 attribute = getattr(dag_run, tag_name) 

134 scope.set_tag(tag_name, attribute) 

135 scope.set_tag("operator", task.__class__.__name__) 

136 

137 @provide_session 

138 def add_breadcrumbs(self, task_instance, session=None): 

139 """Function to add breadcrumbs inside of a task_instance.""" 

140 if session is None: 

141 return 

142 dr = task_instance.get_dagrun(session) 

143 task_instances = dr.get_task_instances( 

144 state={State.SUCCESS, State.FAILED}, 

145 session=session, 

146 ) 

147 

148 for ti in task_instances: 

149 data = {} 

150 for crumb_tag in self.SCOPE_CRUMBS: 

151 data[crumb_tag] = getattr(ti, crumb_tag) 

152 

153 sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info") 

154 

155 def enrich_errors(self, func): 

156 """ 

157 Decorate errors. 

158 Wrap TaskInstance._run_raw_task and LocalTaskJob._run_mini_scheduler_on_child_tasks 

159 to support task specific tags and breadcrumbs. 

160 """ 

161 session_args_idx = find_session_idx(func) 

162 

163 @wraps(func) 

164 def wrapper(_self, *args, **kwargs): 

165 # Wrapping the _run_raw_task function with push_scope to contain 

166 # tags and breadcrumbs to a specific Task Instance 

167 

168 try: 

169 session = kwargs.get("session", args[session_args_idx]) 

170 except IndexError: 

171 session = None 

172 

173 with sentry_sdk.push_scope(): 

174 try: 

175 # Is a LocalTaskJob get the task instance 

176 if hasattr(_self, "task_instance"): 

177 task_instance = _self.task_instance 

178 else: 

179 task_instance = _self 

180 

181 self.add_tagging(task_instance) 

182 self.add_breadcrumbs(task_instance, session=session) 

183 return func(_self, *args, **kwargs) 

184 except Exception as e: 

185 sentry_sdk.capture_exception(e) 

186 raise 

187 

188 return wrapper 

189 

190 def flush(self): 

191 sentry_sdk.flush() 

192 

193 Sentry = ConfiguredSentry()