Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/sentry.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

99 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Sentry Integration.""" 

19 

20from __future__ import annotations 

21 

22import logging 

23from functools import wraps 

24from typing import TYPE_CHECKING 

25 

26from airflow.configuration import conf 

27from airflow.executors.executor_loader import ExecutorLoader 

28from airflow.utils.session import find_session_idx, provide_session 

29from airflow.utils.state import TaskInstanceState 

30 

31if TYPE_CHECKING: 

32 from sqlalchemy.orm import Session 

33 

34 from airflow.models.taskinstance import TaskInstance 

35 

36log = logging.getLogger(__name__) 

37 

38 

39class DummySentry: 

40 """Blank class for Sentry.""" 

41 

42 def add_tagging(self, task_instance): 

43 """Blank function for tagging.""" 

44 

45 def add_breadcrumbs(self, task_instance, session: Session | None = None): 

46 """Blank function for breadcrumbs.""" 

47 

48 def enrich_errors(self, run): 

49 """Blank function for formatting a TaskInstance._run_raw_task.""" 

50 return run 

51 

52 def flush(self): 

53 """Blank function for flushing errors.""" 

54 

55 

56Sentry: DummySentry = DummySentry() 

57if conf.getboolean("sentry", "sentry_on", fallback=False): 

58 import sentry_sdk 

59 from sentry_sdk.integrations.flask import FlaskIntegration 

60 from sentry_sdk.integrations.logging import ignore_logger 

61 

62 class ConfiguredSentry(DummySentry): 

63 """Configure Sentry SDK.""" 

64 

65 SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", "data_interval_start", "execution_date")) 

66 SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number")) 

67 SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration")) 

68 

69 UNSUPPORTED_SENTRY_OPTIONS = frozenset( 

70 ( 

71 "integrations", 

72 "in_app_include", 

73 "in_app_exclude", 

74 "ignore_errors", 

75 "before_breadcrumb", 

76 ) 

77 ) 

78 

79 def __init__(self): 

80 """Initialize the Sentry SDK.""" 

81 ignore_logger("airflow.task") 

82 

83 sentry_flask = FlaskIntegration() 

84 

85 # LoggingIntegration is set by default. 

86 integrations = [sentry_flask] 

87 

88 executor_class, _ = ExecutorLoader.import_default_executor_cls(validate=False) 

89 

90 if executor_class.supports_sentry: 

91 from sentry_sdk.integrations.celery import CeleryIntegration 

92 

93 sentry_celery = CeleryIntegration() 

94 integrations.append(sentry_celery) 

95 

96 dsn = None 

97 sentry_config_opts = conf.getsection("sentry") or {} 

98 if sentry_config_opts: 

99 sentry_config_opts.pop("sentry_on") 

100 old_way_dsn = sentry_config_opts.pop("sentry_dsn", None) 

101 new_way_dsn = sentry_config_opts.pop("dsn", None) 

102 # supported backward compatibility with old way dsn option 

103 dsn = old_way_dsn or new_way_dsn 

104 

105 unsupported_options = self.UNSUPPORTED_SENTRY_OPTIONS.intersection(sentry_config_opts.keys()) 

106 if unsupported_options: 

107 log.warning( 

108 "There are unsupported options in [sentry] section: %s", 

109 ", ".join(unsupported_options), 

110 ) 

111 

112 sentry_config_opts["before_send"] = conf.getimport("sentry", "before_send", fallback=None) 

113 sentry_config_opts["transport"] = conf.getimport("sentry", "transport", fallback=None) 

114 

115 if dsn: 

116 sentry_sdk.init(dsn=dsn, integrations=integrations, **sentry_config_opts) 

117 else: 

118 # Setting up Sentry using environment variables. 

119 log.debug("Defaulting to SENTRY_DSN in environment.") 

120 sentry_sdk.init(integrations=integrations, **sentry_config_opts) 

121 

122 def add_tagging(self, task_instance): 

123 """Add tagging for a task_instance.""" 

124 dag_run = task_instance.dag_run 

125 task = task_instance.task 

126 

127 with sentry_sdk.configure_scope() as scope: 

128 for tag_name in self.SCOPE_TASK_INSTANCE_TAGS: 

129 attribute = getattr(task_instance, tag_name) 

130 scope.set_tag(tag_name, attribute) 

131 for tag_name in self.SCOPE_DAG_RUN_TAGS: 

132 attribute = getattr(dag_run, tag_name) 

133 scope.set_tag(tag_name, attribute) 

134 scope.set_tag("operator", task.__class__.__name__) 

135 

136 @provide_session 

137 def add_breadcrumbs( 

138 self, 

139 task_instance: TaskInstance, 

140 session: Session | None = None, 

141 ) -> None: 

142 """Add breadcrumbs inside of a task_instance.""" 

143 if session is None: 

144 return 

145 dr = task_instance.get_dagrun(session) 

146 task_instances = dr.get_task_instances( 

147 state={TaskInstanceState.SUCCESS, TaskInstanceState.FAILED}, 

148 session=session, 

149 ) 

150 

151 for ti in task_instances: 

152 data = {} 

153 for crumb_tag in self.SCOPE_CRUMBS: 

154 data[crumb_tag] = getattr(ti, crumb_tag) 

155 

156 sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info") 

157 

158 def enrich_errors(self, func): 

159 """ 

160 Decorate errors. 

161 

162 Wrap TaskInstance._run_raw_task to support task specific tags and breadcrumbs. 

163 """ 

164 session_args_idx = find_session_idx(func) 

165 

166 @wraps(func) 

167 def wrapper(_self, *args, **kwargs): 

168 # Wrapping the _run_raw_task function with push_scope to contain 

169 # tags and breadcrumbs to a specific Task Instance 

170 

171 try: 

172 session = kwargs.get("session", args[session_args_idx]) 

173 except IndexError: 

174 session = None 

175 

176 with sentry_sdk.push_scope(): 

177 try: 

178 # Is a LocalTaskJob get the task instance 

179 if hasattr(_self, "task_instance"): 

180 task_instance = _self.task_instance 

181 else: 

182 task_instance = _self 

183 

184 self.add_tagging(task_instance) 

185 self.add_breadcrumbs(task_instance, session=session) 

186 return func(_self, *args, **kwargs) 

187 except Exception as e: 

188 sentry_sdk.capture_exception(e) 

189 raise 

190 

191 return wrapper 

192 

193 def flush(self): 

194 sentry_sdk.flush() 

195 

196 Sentry = ConfiguredSentry()