Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/_shared/observability/metrics/stats.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

67 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import logging 

21import os 

22import re 

23import socket 

24from collections.abc import Callable 

25from typing import TYPE_CHECKING 

26 

27from .base_stats_logger import NoStatsLogger 

28 

29if TYPE_CHECKING: 

30 from .base_stats_logger import StatsLogger 

31 

32log = logging.getLogger(__name__) 

33 

34_VALID_STAT_NAME_CHARS_RE = re.compile(r"^[a-zA-Z0-9_.-]+$") 

35_INVALID_STAT_NAME_CHARS_RE = re.compile(r"[^a-zA-Z0-9_.-]") 

36 

37 

38def normalize_name_for_stats(name: str, log_warning: bool = True) -> str: 

39 """ 

40 Normalize a name for stats reporting by replacing invalid characters. 

41 

42 Stats names must only contain ASCII alphabets, numbers, underscores, dots, and dashes. 

43 Invalid characters are replaced with underscores. 

44 

45 :param name: The name to normalize 

46 :param log_warning: Whether to log a warning when normalization occurs 

47 :return: Normalized name safe for stats reporting 

48 """ 

49 if _VALID_STAT_NAME_CHARS_RE.match(name): 

50 return name 

51 

52 normalized = _INVALID_STAT_NAME_CHARS_RE.sub("_", name) 

53 

54 if log_warning: 

55 log.warning( 

56 "Name '%s' contains invalid characters for stats reporting. " 

57 "Reporting stats with normalized name '%s'.", 

58 name, 

59 normalized, 

60 ) 

61 

62 return normalized 

63 

64 

65class _Stats(type): 

66 factory: Callable[[], StatsLogger | NoStatsLogger] | None = None 

67 instance: StatsLogger | NoStatsLogger | None = None 

68 

69 def __getattr__(cls, name: str) -> str: 

70 factory = type.__getattribute__(cls, "factory") 

71 instance = type.__getattribute__(cls, "instance") 

72 

73 # When using OpenTelemetry, some subprocesses are short-lived and 

74 # often exit before flushing any metrics. 

75 # 

76 # The solution is to register a hook that performs a force flush at exit. 

77 # The atexit hook is registered when initializing the instance. 

78 # 

79 # The instance gets initialized once per process. In case a process is forked, then 

80 # the new subprocess, will inherit the already initialized instance of the parent process. 

81 # 

82 # Store the instance pid so that it can be compared with the current pid 

83 # to decide whether to initialize the instance again or not. 

84 # 

85 # So far, all forks are resetting their state to remove anything inherited by the parent. 

86 # But in the future that might not always be true. 

87 current_pid = os.getpid() 

88 if cls.instance and cls._instance_pid != current_pid: 

89 log.info( 

90 "Stats instance was created in PID %s but accessed in PID %s. Re-initializing.", 

91 cls._instance_pid, 

92 current_pid, 

93 ) 

94 # Setting the instance to None, will force re-initialization. 

95 cls.instance = None 

96 cls._instance_pid = None 

97 

98 if instance is None: 

99 if factory is None: 

100 factory = NoStatsLogger 

101 type.__setattr__(cls, "factory", factory) 

102 

103 try: 

104 instance = factory() 

105 cls._instance_pid = current_pid 

106 except (socket.gaierror, ImportError) as e: 

107 log.error("Could not configure StatsClient: %s, using NoStatsLogger instead.", e) 

108 instance = NoStatsLogger() 

109 cls._instance_pid = current_pid 

110 

111 type.__setattr__(cls, "instance", instance) 

112 

113 return getattr(instance, name) 

114 

115 def initialize(cls, *, is_statsd_datadog_enabled: bool, is_statsd_on: bool, is_otel_on: bool) -> None: 

116 type.__setattr__(cls, "factory", None) 

117 type.__setattr__(cls, "instance", None) 

118 factory: Callable 

119 

120 if is_statsd_datadog_enabled: 

121 from airflow.observability.metrics import datadog_logger 

122 

123 # Datadog needs the cls param, so wrap it into a 0-arg factory. 

124 factory = lambda: datadog_logger.get_dogstatsd_logger(cls) 

125 elif is_statsd_on: 

126 from airflow.observability.metrics import statsd_logger 

127 

128 factory = statsd_logger.get_statsd_logger 

129 elif is_otel_on: 

130 from airflow.observability.metrics import otel_logger 

131 

132 factory = otel_logger.get_otel_logger 

133 else: 

134 factory = NoStatsLogger 

135 

136 type.__setattr__(cls, "factory", factory) 

137 

138 @classmethod 

139 def get_constant_tags(cls, *, tags_in_string: str | None) -> list[str]: 

140 """Get constant DataDog tags to add to all stats.""" 

141 if not tags_in_string: 

142 return [] 

143 return tags_in_string.split(",") 

144 

145 

146if TYPE_CHECKING: 

147 Stats: StatsLogger 

148else: 

149 

150 class Stats(metaclass=_Stats): 

151 """Empty class for Stats - we use metaclass to inject the right one."""