Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/sentry.py: 22%
102 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Sentry Integration."""
19from __future__ import annotations
21import logging
22from functools import wraps
23from typing import TYPE_CHECKING
25from airflow.configuration import conf
26from airflow.executors.executor_loader import ExecutorLoader
27from airflow.utils.session import find_session_idx, provide_session
28from airflow.utils.state import State
30if TYPE_CHECKING:
31 from sqlalchemy.orm import Session
33log = logging.getLogger(__name__)
36class DummySentry:
37 """Blank class for Sentry."""
39 @classmethod
40 def add_tagging(cls, task_instance):
41 """Blank function for tagging."""
43 @classmethod
44 def add_breadcrumbs(cls, task_instance, session: Session | None = None):
45 """Blank function for breadcrumbs."""
47 @classmethod
48 def enrich_errors(cls, run):
49 """Blank function for formatting a TaskInstance._run_raw_task."""
50 return run
52 def flush(self):
53 """Blank function for flushing errors."""
56Sentry: DummySentry = DummySentry()
57if conf.getboolean("sentry", "sentry_on", fallback=False):
58 import sentry_sdk
60 # Verify blinker installation
61 from blinker import signal # noqa: F401
62 from sentry_sdk.integrations.flask import FlaskIntegration
63 from sentry_sdk.integrations.logging import ignore_logger
65 class ConfiguredSentry(DummySentry):
66 """Configure Sentry SDK."""
68 SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", "data_interval_start", "execution_date"))
69 SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number"))
70 SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration"))
72 UNSUPPORTED_SENTRY_OPTIONS = frozenset(
73 (
74 "integrations",
75 "in_app_include",
76 "in_app_exclude",
77 "ignore_errors",
78 "before_breadcrumb",
79 )
80 )
82 def __init__(self):
83 """Initialize the Sentry SDK."""
84 ignore_logger("airflow.task")
86 sentry_flask = FlaskIntegration()
88 # LoggingIntegration is set by default.
89 integrations = [sentry_flask]
91 executor_class, _ = ExecutorLoader.import_default_executor_cls()
93 if executor_class.supports_sentry:
94 from sentry_sdk.integrations.celery import CeleryIntegration
96 sentry_celery = CeleryIntegration()
97 integrations.append(sentry_celery)
99 dsn = None
100 sentry_config_opts = conf.getsection("sentry") or {}
101 if sentry_config_opts:
102 sentry_config_opts.pop("sentry_on")
103 old_way_dsn = sentry_config_opts.pop("sentry_dsn", None)
104 new_way_dsn = sentry_config_opts.pop("dsn", None)
105 # supported backward compatibility with old way dsn option
106 dsn = old_way_dsn or new_way_dsn
108 unsupported_options = self.UNSUPPORTED_SENTRY_OPTIONS.intersection(sentry_config_opts.keys())
109 if unsupported_options:
110 log.warning(
111 "There are unsupported options in [sentry] section: %s",
112 ", ".join(unsupported_options),
113 )
115 sentry_config_opts["before_send"] = conf.getimport("sentry", "before_send", fallback=None)
116 sentry_config_opts["transport"] = conf.getimport("sentry", "transport", fallback=None)
118 if dsn:
119 sentry_sdk.init(dsn=dsn, integrations=integrations, **sentry_config_opts)
120 else:
121 # Setting up Sentry using environment variables.
122 log.debug("Defaulting to SENTRY_DSN in environment.")
123 sentry_sdk.init(integrations=integrations, **sentry_config_opts)
125 def add_tagging(self, task_instance):
126 """Function to add tagging for a task_instance."""
127 dag_run = task_instance.dag_run
128 task = task_instance.task
130 with sentry_sdk.configure_scope() as scope:
131 for tag_name in self.SCOPE_TASK_INSTANCE_TAGS:
132 attribute = getattr(task_instance, tag_name)
133 scope.set_tag(tag_name, attribute)
134 for tag_name in self.SCOPE_DAG_RUN_TAGS:
135 attribute = getattr(dag_run, tag_name)
136 scope.set_tag(tag_name, attribute)
137 scope.set_tag("operator", task.__class__.__name__)
139 @provide_session
140 def add_breadcrumbs(self, task_instance, session=None):
141 """Function to add breadcrumbs inside of a task_instance."""
142 if session is None:
143 return
144 dr = task_instance.get_dagrun(session)
145 task_instances = dr.get_task_instances(
146 state={State.SUCCESS, State.FAILED},
147 session=session,
148 )
150 for ti in task_instances:
151 data = {}
152 for crumb_tag in self.SCOPE_CRUMBS:
153 data[crumb_tag] = getattr(ti, crumb_tag)
155 sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info")
157 def enrich_errors(self, func):
158 """
159 Decorate errors.
160 Wrap TaskInstance._run_raw_task and LocalTaskJob._run_mini_scheduler_on_child_tasks
161 to support task specific tags and breadcrumbs.
162 """
163 session_args_idx = find_session_idx(func)
165 @wraps(func)
166 def wrapper(_self, *args, **kwargs):
167 # Wrapping the _run_raw_task function with push_scope to contain
168 # tags and breadcrumbs to a specific Task Instance
170 try:
171 session = kwargs.get("session", args[session_args_idx])
172 except IndexError:
173 session = None
175 with sentry_sdk.push_scope():
176 try:
177 # Is a LocalTaskJob get the task instance
178 if hasattr(_self, "task_instance"):
179 task_instance = _self.task_instance
180 else:
181 task_instance = _self
183 self.add_tagging(task_instance)
184 self.add_breadcrumbs(task_instance, session=session)
185 return func(_self, *args, **kwargs)
186 except Exception as e:
187 sentry_sdk.capture_exception(e)
188 raise
190 return wrapper
192 def flush(self):
193 sentry_sdk.flush()
195 Sentry = ConfiguredSentry()