Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/observability/trace.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

61 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import logging 

20from collections.abc import Callable 

21from functools import wraps 

22from socket import socket 

23from typing import TYPE_CHECKING 

24 

25from airflow.sdk._shared.observability.traces.base_tracer import EmptyTrace, Tracer 

26from airflow.sdk.configuration import conf 

27 

28log = logging.getLogger(__name__) 

29 

30 

31def add_debug_span(func): 

32 """Decorate a function with span.""" 

33 func_name = func.__name__ 

34 qual_name = func.__qualname__ 

35 module_name = func.__module__ 

36 component = qual_name.rsplit(".", 1)[0] if "." in qual_name else module_name 

37 

38 @wraps(func) 

39 def wrapper(*args, **kwargs): 

40 with DebugTrace.start_span(span_name=func_name, component=component): 

41 return func(*args, **kwargs) 

42 

43 return wrapper 

44 

45 

46class _TraceMeta(type): 

47 factory: Callable[[], Tracer] | None = None 

48 instance: Tracer | EmptyTrace | None = None 

49 

50 def __new__(cls, name, bases, attrs): 

51 # Read the debug flag from the class body. 

52 if "check_debug_traces_flag" not in attrs: 

53 raise TypeError(f"Class '{name}' must define 'check_debug_traces_flag'.") 

54 

55 return super().__new__(cls, name, bases, attrs) 

56 

57 def __getattr__(cls, name: str): 

58 if not cls.factory: 

59 # Lazy initialization of the factory 

60 cls.configure_factory() 

61 if not cls.instance: 

62 cls._initialize_instance() 

63 return getattr(cls.instance, name) 

64 

65 def _initialize_instance(cls): 

66 """Initialize the trace instance.""" 

67 try: 

68 cls.instance = cls.factory() 

69 except (socket.gaierror, ImportError) as e: 

70 log.error("Could not configure Trace: %s. Using EmptyTrace instead.", e) 

71 cls.instance = EmptyTrace() 

72 

73 def __call__(cls, *args, **kwargs): 

74 """Ensure the class behaves as a singleton.""" 

75 if not cls.instance: 

76 cls._initialize_instance() 

77 return cls.instance 

78 

79 def configure_factory(cls): 

80 """Configure the trace factory based on settings.""" 

81 otel_on = conf.getboolean("traces", "otel_on") 

82 

83 if cls.check_debug_traces_flag: 

84 debug_traces_on = conf.getboolean("traces", "otel_debug_traces_on") 

85 else: 

86 # Set to true so that it will be ignored during the evaluation for the factory instance. 

87 # If this is true, then (otel_on and debug_traces_on) will always evaluate to 

88 # whatever value 'otel_on' has and therefore it will be ignored. 

89 debug_traces_on = True 

90 

91 if otel_on and debug_traces_on: 

92 from airflow.sdk.observability.traces import otel_tracer 

93 

94 cls.factory = staticmethod( 

95 lambda use_simple_processor=False: otel_tracer.get_otel_tracer(cls, use_simple_processor) 

96 ) 

97 else: 

98 # EmptyTrace is a class and not inherently callable. 

99 # Using a lambda ensures it can be invoked as a callable factory. 

100 # staticmethod ensures the lambda is treated as a standalone function 

101 # and avoids passing `cls` as an implicit argument. 

102 cls.factory = staticmethod(lambda: EmptyTrace()) 

103 

104 def get_constant_tags(cls) -> str | None: 

105 """Get constant tags to add to all traces.""" 

106 return conf.get("traces", "tags", fallback=None) 

107 

108 

109if TYPE_CHECKING: 

110 Trace: EmptyTrace 

111 DebugTrace: EmptyTrace 

112else: 

113 

114 class Trace(metaclass=_TraceMeta): 

115 """Empty class for Trace - we use metaclass to inject the right one.""" 

116 

117 check_debug_traces_flag = False 

118 

119 class DebugTrace(metaclass=_TraceMeta): 

120 """Empty class for Trace and in case the debug traces flag is enabled.""" 

121 

122 check_debug_traces_flag = True