Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/metrics/otel_logger.py: 34%

109 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import logging 

20import random 

21import warnings 

22from typing import Callable 

23 

24from opentelemetry import metrics 

25from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter 

26from opentelemetry.sdk.metrics import MeterProvider 

27from opentelemetry.sdk.metrics._internal.export import ConsoleMetricExporter, PeriodicExportingMetricReader 

28from opentelemetry.sdk.resources import SERVICE_NAME, Resource 

29from opentelemetry.util.types import Attributes 

30 

31from airflow.configuration import conf 

32from airflow.metrics.protocols import DeltaType, Timer, TimerProtocol 

33from airflow.metrics.validators import ( 

34 OTEL_NAME_MAX_LENGTH, 

35 AllowListValidator, 

36 stat_name_otel_handler, 

37 validate_stat, 

38) 

39 

40log = logging.getLogger(__name__) 

41 

42 

43# "airflow.dag_processing.processes" is currently the only UDC used in Airflow. If more are added, 

44# we should add a better system for this. 

45# 

46# Generally in OTel a Counter is monotonic (can only go up) and there is an UpDownCounter which, 

47# as you can guess, is non-monotonic; it can go up or down. The choice here is to either drop 

48# this one metric and implement the rest as monotonic Counters, implement all counters as 

49# UpDownCounters, or add a bit of logic to do it intelligently. The catch is that the Collector 

50# which transmits these metrics to the upstream dashboard tools (Prometheus, Grafana, etc.) assigns 

51# the type of Gauge to any UDC instead of Counter. Adding this logic feels like the best compromise 

52# where normal Counters still get typed correctly, and we don't lose an existing metric. 

53# See: 

54# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#counter-creation 

55# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#updowncounter 

56UP_DOWN_COUNTERS = {"airflow.dag_processing.processes"} 

57 

58METRIC_NAME_PREFIX = "airflow." 

59 

60 

61def _is_up_down_counter(name): 

62 return name in UP_DOWN_COUNTERS 

63 

64 

65def _generate_key_name(name: str, attributes: Attributes = None): 

66 if attributes: 

67 key = name 

68 for item in attributes.items(): 

69 key += f"_{item[0]}_{item[1]}" 

70 else: 

71 key = name 

72 

73 return key 

74 

75 

76def name_is_otel_safe(prefix: str, name: str) -> bool: 

77 """ 

78 Returns True if the provided name and prefix would result in a name that meets the OpenTelemetry standard. 

79 

80 Legal names are defined here: 

81 https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax 

82 """ 

83 return bool(stat_name_otel_handler(prefix, name, max_length=OTEL_NAME_MAX_LENGTH)) 

84 

85 

86class SafeOtelLogger: 

87 """Otel Logger""" 

88 

89 def __init__(self, otel_provider, prefix: str = "airflow", allow_list_validator=AllowListValidator()): 

90 self.otel: Callable = otel_provider 

91 self.prefix: str = prefix 

92 self.metrics_validator = allow_list_validator 

93 self.meter = otel_provider.get_meter(__name__) 

94 self.metrics_map = MetricsMap(self.meter) 

95 

96 def incr( 

97 self, 

98 stat: str, 

99 count: int = 1, 

100 rate: float = 1, 

101 tags: Attributes = None, 

102 ): 

103 """ 

104 Increment stat by count. 

105 

106 :param stat: The name of the stat to increment. 

107 :param count: A positive integer to add to the current value of stat. 

108 :param rate: value between 0 and 1 that represents the sampled rate at 

109 which the metric is going to be emitted. 

110 :param tags: Tags to append to the stat. 

111 """ 

112 if (count < 0) or (rate < 0): 

113 raise ValueError("count and rate must both be positive values.") 

114 if rate < 1 and random.random() > rate: 

115 return 

116 

117 if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat): 

118 counter = self.metrics_map.get_counter(f"{self.prefix}.{stat}", attributes=tags) 

119 counter.add(count, attributes=tags) 

120 return counter 

121 

122 def decr( 

123 self, 

124 stat: str, 

125 count: int = 1, 

126 rate: float = 1, 

127 tags: Attributes = None, 

128 ): 

129 """ 

130 Decrement stat by count. 

131 

132 :param stat: The name of the stat to decrement. 

133 :param count: A positive integer to subtract from current value of stat. 

134 :param rate: value between 0 and 1 that represents the sampled rate at 

135 which the metric is going to be emitted. 

136 :param tags: Tags to append to the stat. 

137 """ 

138 if (count < 0) or (rate < 0): 

139 raise ValueError("count and rate must both be positive values.") 

140 if rate < 1 and random.random() > rate: 

141 return 

142 

143 if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat): 

144 counter = self.metrics_map.get_counter(f"{self.prefix}.{stat}") 

145 counter.add(-count, attributes=tags) 

146 return counter 

147 

148 @validate_stat 

149 def gauge( 

150 self, 

151 stat: str, 

152 value: int | float, 

153 rate: float = 1, 

154 delta: bool = False, 

155 *, 

156 tags: Attributes = None, 

157 ) -> None: 

158 warnings.warn("OpenTelemetry Gauges are not yet implemented.") 

159 return None 

160 

161 @validate_stat 

162 def timing( 

163 self, 

164 stat: str, 

165 dt: DeltaType, 

166 *, 

167 tags: Attributes = None, 

168 ) -> None: 

169 warnings.warn("OpenTelemetry Timers are not yet implemented.") 

170 return None 

171 

172 @validate_stat 

173 def timer( 

174 self, 

175 stat: str | None = None, 

176 *args, 

177 tags: Attributes = None, 

178 **kwargs, 

179 ) -> TimerProtocol: 

180 warnings.warn("OpenTelemetry Timers are not yet implemented.") 

181 return Timer() 

182 

183 

184class MetricsMap: 

185 """Stores Otel Instruments.""" 

186 

187 def __init__(self, meter): 

188 self.meter = meter 

189 self.map = {} 

190 

191 def clear(self) -> None: 

192 self.map.clear() 

193 

194 def _create_counter(self, name): 

195 """Creates a new counter or up_down_counter for the provided name.""" 

196 otel_safe_name = name[:OTEL_NAME_MAX_LENGTH] 

197 if name != otel_safe_name: 

198 warnings.warn( 

199 f"Metric name `{name}` exceeds OpenTelemetry's name length limit of " 

200 f"{OTEL_NAME_MAX_LENGTH} characters and will be truncated to `{otel_safe_name}`." 

201 ) 

202 

203 if _is_up_down_counter(name): 

204 counter = self.meter.create_up_down_counter(name=otel_safe_name) 

205 else: 

206 counter = self.meter.create_counter(name=otel_safe_name) 

207 

208 counter_type = str(type(counter)).split(".")[-1][:-2] 

209 logging.debug("Created %s as type: %s", otel_safe_name, counter_type) 

210 return counter 

211 

212 def get_counter(self, name: str, attributes: Attributes = None): 

213 """ 

214 Returns the counter; creates a new one if it did not exist. 

215 

216 :param name: The name of the counter to fetch or create. 

217 :param attributes: Counter attributes, used to generate a unique key to store the counter. 

218 """ 

219 key = _generate_key_name(name, attributes) 

220 if key in self.map.keys(): 

221 return self.map[key] 

222 else: 

223 new_counter = self._create_counter(name) 

224 self.map[key] = new_counter 

225 return new_counter 

226 

227 def del_counter(self, name: str, attributes: Attributes = None) -> None: 

228 """ 

229 Deletes a counter. 

230 

231 :param name: The name of the counter to delete. 

232 :param attributes: Counter attributes which were used to generate a unique key to store the counter. 

233 """ 

234 key = _generate_key_name(name, attributes) 

235 if key in self.map.keys(): 

236 del self.map[key] 

237 

238 

239def get_otel_logger(cls) -> SafeOtelLogger: 

240 host = conf.get("metrics", "otel_host") # ex: "breeze-otel-collector" 

241 port = conf.getint("metrics", "otel_port") # ex: 4318 

242 prefix = conf.get("metrics", "otel_prefix") # ex: "airflow" 

243 interval = conf.getint("metrics", "otel_interval_milliseconds") # ex: 30000 

244 debug = conf.getboolean("metrics", "otel_debugging_on") 

245 

246 allow_list = conf.get("metrics", "metrics_allow_list", fallback=None) 

247 allow_list_validator = AllowListValidator(allow_list) 

248 

249 resource = Resource(attributes={SERVICE_NAME: "Airflow"}) 

250 # TODO: figure out https instead of http ?? 

251 endpoint = f"http://{host}:{port}/v1/metrics" 

252 

253 logging.info("[Metric Exporter] Connecting to OpenTelemetry Collector at %s", endpoint) 

254 readers = [ 

255 PeriodicExportingMetricReader( 

256 OTLPMetricExporter( 

257 endpoint=endpoint, 

258 headers={"Content-Type": "application/json"}, 

259 ), 

260 export_interval_millis=interval, 

261 ) 

262 ] 

263 

264 if debug: 

265 export_to_console = PeriodicExportingMetricReader(ConsoleMetricExporter()) 

266 readers.append(export_to_console) 

267 

268 metrics.set_meter_provider( 

269 MeterProvider( 

270 resource=resource, 

271 metric_readers=readers, 

272 shutdown_on_exit=False, 

273 ), 

274 ) 

275 

276 return SafeOtelLogger(metrics.get_meter_provider(), prefix, allow_list_validator)