Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/metrics/statsd_logger.py: 42%

79 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import logging 

21from functools import wraps 

22from typing import TYPE_CHECKING, Callable, TypeVar, cast 

23 

24from airflow.configuration import conf 

25from airflow.exceptions import AirflowConfigException 

26from airflow.metrics.protocols import DeltaType, Timer, TimerProtocol 

27from airflow.metrics.validators import ( 

28 AllowListValidator, 

29 BlockListValidator, 

30 ListValidator, 

31 validate_stat, 

32) 

33 

34if TYPE_CHECKING: 

35 from statsd import StatsClient 

36 

37T = TypeVar("T", bound=Callable) 

38 

39log = logging.getLogger(__name__) 

40 

41 

42def prepare_stat_with_tags(fn: T) -> T: 

43 """Add tags to stat with influxdb standard format if influxdb_tags_enabled is True.""" 

44 

45 @wraps(fn) 

46 def wrapper( 

47 self, stat: str | None = None, *args, tags: dict[str, str] | None = None, **kwargs 

48 ) -> Callable[[str], str]: 

49 if self.influxdb_tags_enabled: 

50 if stat is not None and tags is not None: 

51 for k, v in tags.items(): 

52 if self.metric_tags_validator.test(k): 

53 if all(c not in [",", "="] for c in v + k): 

54 stat += f",{k}={v}" 

55 else: 

56 log.error("Dropping invalid tag: %s=%s.", k, v) 

57 return fn(self, stat, *args, tags=tags, **kwargs) 

58 

59 return cast(T, wrapper) 

60 

61 

62class SafeStatsdLogger: 

63 """StatsD Logger.""" 

64 

65 def __init__( 

66 self, 

67 statsd_client: StatsClient, 

68 metrics_validator: ListValidator = AllowListValidator(), 

69 influxdb_tags_enabled: bool = False, 

70 metric_tags_validator: ListValidator = AllowListValidator(), 

71 ) -> None: 

72 self.statsd = statsd_client 

73 self.metrics_validator = metrics_validator 

74 self.influxdb_tags_enabled = influxdb_tags_enabled 

75 self.metric_tags_validator = metric_tags_validator 

76 

77 @prepare_stat_with_tags 

78 @validate_stat 

79 def incr( 

80 self, 

81 stat: str, 

82 count: int = 1, 

83 rate: float = 1, 

84 *, 

85 tags: dict[str, str] | None = None, 

86 ) -> None: 

87 """Increment stat.""" 

88 if self.metrics_validator.test(stat): 

89 return self.statsd.incr(stat, count, rate) 

90 return None 

91 

92 @prepare_stat_with_tags 

93 @validate_stat 

94 def decr( 

95 self, 

96 stat: str, 

97 count: int = 1, 

98 rate: float = 1, 

99 *, 

100 tags: dict[str, str] | None = None, 

101 ) -> None: 

102 """Decrement stat.""" 

103 if self.metrics_validator.test(stat): 

104 return self.statsd.decr(stat, count, rate) 

105 return None 

106 

107 @prepare_stat_with_tags 

108 @validate_stat 

109 def gauge( 

110 self, 

111 stat: str, 

112 value: int | float, 

113 rate: float = 1, 

114 delta: bool = False, 

115 *, 

116 tags: dict[str, str] | None = None, 

117 ) -> None: 

118 """Gauge stat.""" 

119 if self.metrics_validator.test(stat): 

120 return self.statsd.gauge(stat, value, rate, delta) 

121 return None 

122 

123 @prepare_stat_with_tags 

124 @validate_stat 

125 def timing( 

126 self, 

127 stat: str, 

128 dt: DeltaType, 

129 *, 

130 tags: dict[str, str] | None = None, 

131 ) -> None: 

132 """Stats timing.""" 

133 if self.metrics_validator.test(stat): 

134 return self.statsd.timing(stat, dt) 

135 return None 

136 

137 @prepare_stat_with_tags 

138 @validate_stat 

139 def timer( 

140 self, 

141 stat: str | None = None, 

142 *args, 

143 tags: dict[str, str] | None = None, 

144 **kwargs, 

145 ) -> TimerProtocol: 

146 """Timer metric that can be cancelled.""" 

147 if stat and self.metrics_validator.test(stat): 

148 return Timer(self.statsd.timer(stat, *args, **kwargs)) 

149 return Timer() 

150 

151 

152def get_statsd_logger(cls) -> SafeStatsdLogger: 

153 """Returns logger for StatsD.""" 

154 # no need to check for the scheduler/statsd_on -> this method is only called when it is set 

155 # and previously it would crash with None is callable if it was called without it. 

156 from statsd import StatsClient 

157 

158 stats_class = conf.getimport("metrics", "statsd_custom_client_path", fallback=None) 

159 metrics_validator: ListValidator 

160 

161 if stats_class: 

162 if not issubclass(stats_class, StatsClient): 

163 raise AirflowConfigException( 

164 "Your custom StatsD client must extend the statsd.StatsClient in order to ensure " 

165 "backwards compatibility." 

166 ) 

167 else: 

168 log.info("Successfully loaded custom StatsD client") 

169 

170 else: 

171 stats_class = StatsClient 

172 

173 statsd = stats_class( 

174 host=conf.get("metrics", "statsd_host"), 

175 port=conf.getint("metrics", "statsd_port"), 

176 prefix=conf.get("metrics", "statsd_prefix"), 

177 ) 

178 if conf.get("metrics", "metrics_allow_list", fallback=None): 

179 metrics_validator = AllowListValidator(conf.get("metrics", "metrics_allow_list")) 

180 if conf.get("metrics", "metrics_block_list", fallback=None): 

181 log.warning( 

182 "Ignoring metrics_block_list as both metrics_allow_list " 

183 "and metrics_block_list have been set" 

184 ) 

185 elif conf.get("metrics", "metrics_block_list", fallback=None): 

186 metrics_validator = BlockListValidator(conf.get("metrics", "metrics_block_list")) 

187 else: 

188 metrics_validator = AllowListValidator() 

189 influxdb_tags_enabled = conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False) 

190 metric_tags_validator = BlockListValidator(conf.get("metrics", "statsd_disabled_tags", fallback=None)) 

191 return SafeStatsdLogger(statsd, metrics_validator, influxdb_tags_enabled, metric_tags_validator)