Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/metrics/datadog_logger.py: 31%

71 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import datetime 

21import logging 

22from typing import TYPE_CHECKING 

23 

24from airflow.configuration import conf 

25from airflow.metrics.protocols import DeltaType, Timer, TimerProtocol 

26from airflow.metrics.validators import ( 

27 AllowListValidator, 

28 BlockListValidator, 

29 ListValidator, 

30 validate_stat, 

31) 

32 

33if TYPE_CHECKING: 

34 from datadog import DogStatsd 

35 

36log = logging.getLogger(__name__) 

37 

38 

39class SafeDogStatsdLogger: 

40 """DogStatsd Logger.""" 

41 

42 def __init__( 

43 self, 

44 dogstatsd_client: DogStatsd, 

45 metrics_validator: ListValidator = AllowListValidator(), 

46 metrics_tags: bool = False, 

47 metric_tags_validator: ListValidator = AllowListValidator(), 

48 ) -> None: 

49 self.dogstatsd = dogstatsd_client 

50 self.metrics_validator = metrics_validator 

51 self.metrics_tags = metrics_tags 

52 self.metric_tags_validator = metric_tags_validator 

53 

54 @validate_stat 

55 def incr( 

56 self, 

57 stat: str, 

58 count: int = 1, 

59 rate: float = 1, 

60 *, 

61 tags: dict[str, str] | None = None, 

62 ) -> None: 

63 """Increment stat.""" 

64 if self.metrics_tags and isinstance(tags, dict): 

65 tags_list = [ 

66 f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key) 

67 ] 

68 else: 

69 tags_list = [] 

70 if self.metrics_validator.test(stat): 

71 return self.dogstatsd.increment(metric=stat, value=count, tags=tags_list, sample_rate=rate) 

72 return None 

73 

74 @validate_stat 

75 def decr( 

76 self, 

77 stat: str, 

78 count: int = 1, 

79 rate: float = 1, 

80 *, 

81 tags: dict[str, str] | None = None, 

82 ) -> None: 

83 """Decrement stat.""" 

84 if self.metrics_tags and isinstance(tags, dict): 

85 tags_list = [ 

86 f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key) 

87 ] 

88 else: 

89 tags_list = [] 

90 if self.metrics_validator.test(stat): 

91 return self.dogstatsd.decrement(metric=stat, value=count, tags=tags_list, sample_rate=rate) 

92 return None 

93 

94 @validate_stat 

95 def gauge( 

96 self, 

97 stat: str, 

98 value: int | float, 

99 rate: float = 1, 

100 delta: bool = False, 

101 *, 

102 tags: dict[str, str] | None = None, 

103 ) -> None: 

104 """Gauge stat.""" 

105 if self.metrics_tags and isinstance(tags, dict): 

106 tags_list = [ 

107 f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key) 

108 ] 

109 else: 

110 tags_list = [] 

111 if self.metrics_validator.test(stat): 

112 return self.dogstatsd.gauge(metric=stat, value=value, tags=tags_list, sample_rate=rate) 

113 return None 

114 

115 @validate_stat 

116 def timing( 

117 self, 

118 stat: str, 

119 dt: DeltaType, 

120 *, 

121 tags: dict[str, str] | None = None, 

122 ) -> None: 

123 """Stats timing.""" 

124 if self.metrics_tags and isinstance(tags, dict): 

125 tags_list = [ 

126 f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key) 

127 ] 

128 else: 

129 tags_list = [] 

130 if self.metrics_validator.test(stat): 

131 if isinstance(dt, datetime.timedelta): 

132 dt = dt.total_seconds() 

133 return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list) 

134 return None 

135 

136 @validate_stat 

137 def timer( 

138 self, 

139 stat: str | None = None, 

140 tags: dict[str, str] | None = None, 

141 **kwargs, 

142 ) -> TimerProtocol: 

143 """Timer metric that can be cancelled.""" 

144 if self.metrics_tags and isinstance(tags, dict): 

145 tags_list = [ 

146 f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key) 

147 ] 

148 else: 

149 tags_list = [] 

150 if stat and self.metrics_validator.test(stat): 

151 return Timer(self.dogstatsd.timed(stat, tags=tags_list, **kwargs)) 

152 return Timer() 

153 

154 

155def get_dogstatsd_logger(cls) -> SafeDogStatsdLogger: 

156 """Get DataDog StatsD logger.""" 

157 from datadog import DogStatsd 

158 

159 metrics_validator: ListValidator 

160 

161 dogstatsd = DogStatsd( 

162 host=conf.get("metrics", "statsd_host"), 

163 port=conf.getint("metrics", "statsd_port"), 

164 namespace=conf.get("metrics", "statsd_prefix"), 

165 constant_tags=cls.get_constant_tags(), 

166 ) 

167 if conf.get("metrics", "metrics_allow_list", fallback=None): 

168 metrics_validator = AllowListValidator(conf.get("metrics", "metrics_allow_list")) 

169 if conf.get("metrics", "metrics_block_list", fallback=None): 

170 log.warning( 

171 "Ignoring metrics_block_list as both metrics_allow_list " 

172 "and metrics_block_list have been set" 

173 ) 

174 elif conf.get("metrics", "metrics_block_list", fallback=None): 

175 metrics_validator = BlockListValidator(conf.get("metrics", "metrics_block_list")) 

176 else: 

177 metrics_validator = AllowListValidator() 

178 datadog_metrics_tags = conf.getboolean("metrics", "statsd_datadog_metrics_tags", fallback=True) 

179 metric_tags_validator = BlockListValidator(conf.get("metrics", "statsd_disabled_tags", fallback=None)) 

180 return SafeDogStatsdLogger(dogstatsd, metrics_validator, datadog_metrics_tags, metric_tags_validator)