1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
18
19import logging
20from collections.abc import Callable
21from functools import wraps
22from socket import socket
23from typing import TYPE_CHECKING
24
25from airflow.sdk._shared.observability.traces.base_tracer import EmptyTrace, Tracer
26from airflow.sdk.configuration import conf
27
28log = logging.getLogger(__name__)
29
30
31def add_debug_span(func):
32 """Decorate a function with span."""
33 func_name = func.__name__
34 qual_name = func.__qualname__
35 module_name = func.__module__
36 component = qual_name.rsplit(".", 1)[0] if "." in qual_name else module_name
37
38 @wraps(func)
39 def wrapper(*args, **kwargs):
40 with DebugTrace.start_span(span_name=func_name, component=component):
41 return func(*args, **kwargs)
42
43 return wrapper
44
45
46class _TraceMeta(type):
47 factory: Callable[[], Tracer] | None = None
48 instance: Tracer | EmptyTrace | None = None
49
50 def __new__(cls, name, bases, attrs):
51 # Read the debug flag from the class body.
52 if "check_debug_traces_flag" not in attrs:
53 raise TypeError(f"Class '{name}' must define 'check_debug_traces_flag'.")
54
55 return super().__new__(cls, name, bases, attrs)
56
57 def __getattr__(cls, name: str):
58 if not cls.factory:
59 # Lazy initialization of the factory
60 cls.configure_factory()
61 if not cls.instance:
62 cls._initialize_instance()
63 return getattr(cls.instance, name)
64
65 def _initialize_instance(cls):
66 """Initialize the trace instance."""
67 try:
68 cls.instance = cls.factory()
69 except (socket.gaierror, ImportError) as e:
70 log.error("Could not configure Trace: %s. Using EmptyTrace instead.", e)
71 cls.instance = EmptyTrace()
72
73 def __call__(cls, *args, **kwargs):
74 """Ensure the class behaves as a singleton."""
75 if not cls.instance:
76 cls._initialize_instance()
77 return cls.instance
78
79 def configure_factory(cls):
80 """Configure the trace factory based on settings."""
81 otel_on = conf.getboolean("traces", "otel_on")
82
83 if cls.check_debug_traces_flag:
84 debug_traces_on = conf.getboolean("traces", "otel_debug_traces_on")
85 else:
86 # Set to true so that it will be ignored during the evaluation for the factory instance.
87 # If this is true, then (otel_on and debug_traces_on) will always evaluate to
88 # whatever value 'otel_on' has and therefore it will be ignored.
89 debug_traces_on = True
90
91 if otel_on and debug_traces_on:
92 from airflow.sdk.observability.traces import otel_tracer
93
94 cls.factory = staticmethod(
95 lambda use_simple_processor=False: otel_tracer.get_otel_tracer(cls, use_simple_processor)
96 )
97 else:
98 # EmptyTrace is a class and not inherently callable.
99 # Using a lambda ensures it can be invoked as a callable factory.
100 # staticmethod ensures the lambda is treated as a standalone function
101 # and avoids passing `cls` as an implicit argument.
102 cls.factory = staticmethod(lambda: EmptyTrace())
103
104 def get_constant_tags(cls) -> str | None:
105 """Get constant tags to add to all traces."""
106 return conf.get("traces", "tags", fallback=None)
107
108
109if TYPE_CHECKING:
110 Trace: EmptyTrace
111 DebugTrace: EmptyTrace
112else:
113
114 class Trace(metaclass=_TraceMeta):
115 """Empty class for Trace - we use metaclass to inject the right one."""
116
117 check_debug_traces_flag = False
118
119 class DebugTrace(metaclass=_TraceMeta):
120 """Empty class for Trace and in case the debug traces flag is enabled."""
121
122 check_debug_traces_flag = True