Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/sentry.py: 22%

102 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Sentry Integration.""" 

19from __future__ import annotations 

20 

21import logging 

22from functools import wraps 

23from typing import TYPE_CHECKING 

24 

25from airflow.configuration import conf 

26from airflow.executors.executor_loader import ExecutorLoader 

27from airflow.utils.session import find_session_idx, provide_session 

28from airflow.utils.state import State 

29 

30if TYPE_CHECKING: 

31 from sqlalchemy.orm import Session 

32 

33log = logging.getLogger(__name__) 

34 

35 

36class DummySentry: 

37 """Blank class for Sentry.""" 

38 

39 @classmethod 

40 def add_tagging(cls, task_instance): 

41 """Blank function for tagging.""" 

42 

43 @classmethod 

44 def add_breadcrumbs(cls, task_instance, session: Session | None = None): 

45 """Blank function for breadcrumbs.""" 

46 

47 @classmethod 

48 def enrich_errors(cls, run): 

49 """Blank function for formatting a TaskInstance._run_raw_task.""" 

50 return run 

51 

52 def flush(self): 

53 """Blank function for flushing errors.""" 

54 

55 

56Sentry: DummySentry = DummySentry() 

57if conf.getboolean("sentry", "sentry_on", fallback=False): 

58 import sentry_sdk 

59 

60 # Verify blinker installation 

61 from blinker import signal # noqa: F401 

62 from sentry_sdk.integrations.flask import FlaskIntegration 

63 from sentry_sdk.integrations.logging import ignore_logger 

64 

65 class ConfiguredSentry(DummySentry): 

66 """Configure Sentry SDK.""" 

67 

68 SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", "data_interval_start", "execution_date")) 

69 SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number")) 

70 SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration")) 

71 

72 UNSUPPORTED_SENTRY_OPTIONS = frozenset( 

73 ( 

74 "integrations", 

75 "in_app_include", 

76 "in_app_exclude", 

77 "ignore_errors", 

78 "before_breadcrumb", 

79 ) 

80 ) 

81 

82 def __init__(self): 

83 """Initialize the Sentry SDK.""" 

84 ignore_logger("airflow.task") 

85 

86 sentry_flask = FlaskIntegration() 

87 

88 # LoggingIntegration is set by default. 

89 integrations = [sentry_flask] 

90 

91 executor_class, _ = ExecutorLoader.import_default_executor_cls() 

92 

93 if executor_class.supports_sentry: 

94 from sentry_sdk.integrations.celery import CeleryIntegration 

95 

96 sentry_celery = CeleryIntegration() 

97 integrations.append(sentry_celery) 

98 

99 dsn = None 

100 sentry_config_opts = conf.getsection("sentry") or {} 

101 if sentry_config_opts: 

102 sentry_config_opts.pop("sentry_on") 

103 old_way_dsn = sentry_config_opts.pop("sentry_dsn", None) 

104 new_way_dsn = sentry_config_opts.pop("dsn", None) 

105 # supported backward compatibility with old way dsn option 

106 dsn = old_way_dsn or new_way_dsn 

107 

108 unsupported_options = self.UNSUPPORTED_SENTRY_OPTIONS.intersection(sentry_config_opts.keys()) 

109 if unsupported_options: 

110 log.warning( 

111 "There are unsupported options in [sentry] section: %s", 

112 ", ".join(unsupported_options), 

113 ) 

114 

115 sentry_config_opts["before_send"] = conf.getimport("sentry", "before_send", fallback=None) 

116 sentry_config_opts["transport"] = conf.getimport("sentry", "transport", fallback=None) 

117 

118 if dsn: 

119 sentry_sdk.init(dsn=dsn, integrations=integrations, **sentry_config_opts) 

120 else: 

121 # Setting up Sentry using environment variables. 

122 log.debug("Defaulting to SENTRY_DSN in environment.") 

123 sentry_sdk.init(integrations=integrations, **sentry_config_opts) 

124 

125 def add_tagging(self, task_instance): 

126 """Function to add tagging for a task_instance.""" 

127 dag_run = task_instance.dag_run 

128 task = task_instance.task 

129 

130 with sentry_sdk.configure_scope() as scope: 

131 for tag_name in self.SCOPE_TASK_INSTANCE_TAGS: 

132 attribute = getattr(task_instance, tag_name) 

133 scope.set_tag(tag_name, attribute) 

134 for tag_name in self.SCOPE_DAG_RUN_TAGS: 

135 attribute = getattr(dag_run, tag_name) 

136 scope.set_tag(tag_name, attribute) 

137 scope.set_tag("operator", task.__class__.__name__) 

138 

139 @provide_session 

140 def add_breadcrumbs(self, task_instance, session=None): 

141 """Function to add breadcrumbs inside of a task_instance.""" 

142 if session is None: 

143 return 

144 dr = task_instance.get_dagrun(session) 

145 task_instances = dr.get_task_instances( 

146 state={State.SUCCESS, State.FAILED}, 

147 session=session, 

148 ) 

149 

150 for ti in task_instances: 

151 data = {} 

152 for crumb_tag in self.SCOPE_CRUMBS: 

153 data[crumb_tag] = getattr(ti, crumb_tag) 

154 

155 sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info") 

156 

157 def enrich_errors(self, func): 

158 """ 

159 Decorate errors. 

160 Wrap TaskInstance._run_raw_task and LocalTaskJob._run_mini_scheduler_on_child_tasks 

161 to support task specific tags and breadcrumbs. 

162 """ 

163 session_args_idx = find_session_idx(func) 

164 

165 @wraps(func) 

166 def wrapper(_self, *args, **kwargs): 

167 # Wrapping the _run_raw_task function with push_scope to contain 

168 # tags and breadcrumbs to a specific Task Instance 

169 

170 try: 

171 session = kwargs.get("session", args[session_args_idx]) 

172 except IndexError: 

173 session = None 

174 

175 with sentry_sdk.push_scope(): 

176 try: 

177 # Is a LocalTaskJob get the task instance 

178 if hasattr(_self, "task_instance"): 

179 task_instance = _self.task_instance 

180 else: 

181 task_instance = _self 

182 

183 self.add_tagging(task_instance) 

184 self.add_breadcrumbs(task_instance, session=session) 

185 return func(_self, *args, **kwargs) 

186 except Exception as e: 

187 sentry_sdk.capture_exception(e) 

188 raise 

189 

190 return wrapper 

191 

192 def flush(self): 

193 sentry_sdk.flush() 

194 

195 Sentry = ConfiguredSentry()