Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/sentry.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Sentry Integration."""
20from __future__ import annotations
22import logging
23from functools import wraps
24from typing import TYPE_CHECKING
26from airflow.configuration import conf
27from airflow.executors.executor_loader import ExecutorLoader
28from airflow.utils.session import find_session_idx, provide_session
29from airflow.utils.state import TaskInstanceState
31if TYPE_CHECKING:
32 from sqlalchemy.orm import Session
34 from airflow.models.taskinstance import TaskInstance
36log = logging.getLogger(__name__)
39class DummySentry:
40 """Blank class for Sentry."""
42 def add_tagging(self, task_instance):
43 """Blank function for tagging."""
45 def add_breadcrumbs(self, task_instance, session: Session | None = None):
46 """Blank function for breadcrumbs."""
48 def enrich_errors(self, run):
49 """Blank function for formatting a TaskInstance._run_raw_task."""
50 return run
52 def flush(self):
53 """Blank function for flushing errors."""
56Sentry: DummySentry = DummySentry()
57if conf.getboolean("sentry", "sentry_on", fallback=False):
58 import sentry_sdk
59 from sentry_sdk.integrations.flask import FlaskIntegration
60 from sentry_sdk.integrations.logging import ignore_logger
62 class ConfiguredSentry(DummySentry):
63 """Configure Sentry SDK."""
65 SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", "data_interval_start", "execution_date"))
66 SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number"))
67 SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration"))
69 UNSUPPORTED_SENTRY_OPTIONS = frozenset(
70 (
71 "integrations",
72 "in_app_include",
73 "in_app_exclude",
74 "ignore_errors",
75 "before_breadcrumb",
76 )
77 )
79 def __init__(self):
80 """Initialize the Sentry SDK."""
81 ignore_logger("airflow.task")
83 sentry_flask = FlaskIntegration()
85 # LoggingIntegration is set by default.
86 integrations = [sentry_flask]
88 executor_class, _ = ExecutorLoader.import_default_executor_cls(validate=False)
90 if executor_class.supports_sentry:
91 from sentry_sdk.integrations.celery import CeleryIntegration
93 sentry_celery = CeleryIntegration()
94 integrations.append(sentry_celery)
96 dsn = None
97 sentry_config_opts = conf.getsection("sentry") or {}
98 if sentry_config_opts:
99 sentry_config_opts.pop("sentry_on")
100 old_way_dsn = sentry_config_opts.pop("sentry_dsn", None)
101 new_way_dsn = sentry_config_opts.pop("dsn", None)
102 # supported backward compatibility with old way dsn option
103 dsn = old_way_dsn or new_way_dsn
105 unsupported_options = self.UNSUPPORTED_SENTRY_OPTIONS.intersection(sentry_config_opts.keys())
106 if unsupported_options:
107 log.warning(
108 "There are unsupported options in [sentry] section: %s",
109 ", ".join(unsupported_options),
110 )
112 sentry_config_opts["before_send"] = conf.getimport("sentry", "before_send", fallback=None)
113 sentry_config_opts["transport"] = conf.getimport("sentry", "transport", fallback=None)
115 if dsn:
116 sentry_sdk.init(dsn=dsn, integrations=integrations, **sentry_config_opts)
117 else:
118 # Setting up Sentry using environment variables.
119 log.debug("Defaulting to SENTRY_DSN in environment.")
120 sentry_sdk.init(integrations=integrations, **sentry_config_opts)
122 def add_tagging(self, task_instance):
123 """Add tagging for a task_instance."""
124 dag_run = task_instance.dag_run
125 task = task_instance.task
127 with sentry_sdk.configure_scope() as scope:
128 for tag_name in self.SCOPE_TASK_INSTANCE_TAGS:
129 attribute = getattr(task_instance, tag_name)
130 scope.set_tag(tag_name, attribute)
131 for tag_name in self.SCOPE_DAG_RUN_TAGS:
132 attribute = getattr(dag_run, tag_name)
133 scope.set_tag(tag_name, attribute)
134 scope.set_tag("operator", task.__class__.__name__)
136 @provide_session
137 def add_breadcrumbs(
138 self,
139 task_instance: TaskInstance,
140 session: Session | None = None,
141 ) -> None:
142 """Add breadcrumbs inside of a task_instance."""
143 if session is None:
144 return
145 dr = task_instance.get_dagrun(session)
146 task_instances = dr.get_task_instances(
147 state={TaskInstanceState.SUCCESS, TaskInstanceState.FAILED},
148 session=session,
149 )
151 for ti in task_instances:
152 data = {}
153 for crumb_tag in self.SCOPE_CRUMBS:
154 data[crumb_tag] = getattr(ti, crumb_tag)
156 sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info")
158 def enrich_errors(self, func):
159 """
160 Decorate errors.
162 Wrap TaskInstance._run_raw_task to support task specific tags and breadcrumbs.
163 """
164 session_args_idx = find_session_idx(func)
166 @wraps(func)
167 def wrapper(_self, *args, **kwargs):
168 # Wrapping the _run_raw_task function with push_scope to contain
169 # tags and breadcrumbs to a specific Task Instance
171 try:
172 session = kwargs.get("session", args[session_args_idx])
173 except IndexError:
174 session = None
176 with sentry_sdk.push_scope():
177 try:
178 # Is a LocalTaskJob get the task instance
179 if hasattr(_self, "task_instance"):
180 task_instance = _self.task_instance
181 else:
182 task_instance = _self
184 self.add_tagging(task_instance)
185 self.add_breadcrumbs(task_instance, session=session)
186 return func(_self, *args, **kwargs)
187 except Exception as e:
188 sentry_sdk.capture_exception(e)
189 raise
191 return wrapper
193 def flush(self):
194 sentry_sdk.flush()
196 Sentry = ConfiguredSentry()