Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/metrics/otel_logger.py: 34%
109 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17from __future__ import annotations
19import logging
20import random
21import warnings
22from typing import Callable
24from opentelemetry import metrics
25from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
26from opentelemetry.sdk.metrics import MeterProvider
27from opentelemetry.sdk.metrics._internal.export import ConsoleMetricExporter, PeriodicExportingMetricReader
28from opentelemetry.sdk.resources import SERVICE_NAME, Resource
29from opentelemetry.util.types import Attributes
31from airflow.configuration import conf
32from airflow.metrics.protocols import DeltaType, Timer, TimerProtocol
33from airflow.metrics.validators import (
34 OTEL_NAME_MAX_LENGTH,
35 AllowListValidator,
36 stat_name_otel_handler,
37 validate_stat,
38)
40log = logging.getLogger(__name__)
43# "airflow.dag_processing.processes" is currently the only UDC used in Airflow. If more are added,
44# we should add a better system for this.
45#
46# Generally in OTel a Counter is monotonic (can only go up) and there is an UpDownCounter which,
47# as you can guess, is non-monotonic; it can go up or down. The choice here is to either drop
48# this one metric and implement the rest as monotonic Counters, implement all counters as
49# UpDownCounters, or add a bit of logic to do it intelligently. The catch is that the Collector
50# which transmits these metrics to the upstream dashboard tools (Prometheus, Grafana, etc.) assigns
51# the type of Gauge to any UDC instead of Counter. Adding this logic feels like the best compromise
52# where normal Counters still get typed correctly, and we don't lose an existing metric.
53# See:
54# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#counter-creation
55# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#updowncounter
56UP_DOWN_COUNTERS = {"airflow.dag_processing.processes"}
58METRIC_NAME_PREFIX = "airflow."
61def _is_up_down_counter(name):
62 return name in UP_DOWN_COUNTERS
65def _generate_key_name(name: str, attributes: Attributes = None):
66 if attributes:
67 key = name
68 for item in attributes.items():
69 key += f"_{item[0]}_{item[1]}"
70 else:
71 key = name
73 return key
76def name_is_otel_safe(prefix: str, name: str) -> bool:
77 """
78 Returns True if the provided name and prefix would result in a name that meets the OpenTelemetry standard.
80 Legal names are defined here:
81 https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax
82 """
83 return bool(stat_name_otel_handler(prefix, name, max_length=OTEL_NAME_MAX_LENGTH))
86class SafeOtelLogger:
87 """Otel Logger"""
89 def __init__(self, otel_provider, prefix: str = "airflow", allow_list_validator=AllowListValidator()):
90 self.otel: Callable = otel_provider
91 self.prefix: str = prefix
92 self.metrics_validator = allow_list_validator
93 self.meter = otel_provider.get_meter(__name__)
94 self.metrics_map = MetricsMap(self.meter)
96 def incr(
97 self,
98 stat: str,
99 count: int = 1,
100 rate: float = 1,
101 tags: Attributes = None,
102 ):
103 """
104 Increment stat by count.
106 :param stat: The name of the stat to increment.
107 :param count: A positive integer to add to the current value of stat.
108 :param rate: value between 0 and 1 that represents the sampled rate at
109 which the metric is going to be emitted.
110 :param tags: Tags to append to the stat.
111 """
112 if (count < 0) or (rate < 0):
113 raise ValueError("count and rate must both be positive values.")
114 if rate < 1 and random.random() > rate:
115 return
117 if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
118 counter = self.metrics_map.get_counter(f"{self.prefix}.{stat}", attributes=tags)
119 counter.add(count, attributes=tags)
120 return counter
122 def decr(
123 self,
124 stat: str,
125 count: int = 1,
126 rate: float = 1,
127 tags: Attributes = None,
128 ):
129 """
130 Decrement stat by count.
132 :param stat: The name of the stat to decrement.
133 :param count: A positive integer to subtract from current value of stat.
134 :param rate: value between 0 and 1 that represents the sampled rate at
135 which the metric is going to be emitted.
136 :param tags: Tags to append to the stat.
137 """
138 if (count < 0) or (rate < 0):
139 raise ValueError("count and rate must both be positive values.")
140 if rate < 1 and random.random() > rate:
141 return
143 if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
144 counter = self.metrics_map.get_counter(f"{self.prefix}.{stat}")
145 counter.add(-count, attributes=tags)
146 return counter
148 @validate_stat
149 def gauge(
150 self,
151 stat: str,
152 value: int | float,
153 rate: float = 1,
154 delta: bool = False,
155 *,
156 tags: Attributes = None,
157 ) -> None:
158 warnings.warn("OpenTelemetry Gauges are not yet implemented.")
159 return None
161 @validate_stat
162 def timing(
163 self,
164 stat: str,
165 dt: DeltaType,
166 *,
167 tags: Attributes = None,
168 ) -> None:
169 warnings.warn("OpenTelemetry Timers are not yet implemented.")
170 return None
172 @validate_stat
173 def timer(
174 self,
175 stat: str | None = None,
176 *args,
177 tags: Attributes = None,
178 **kwargs,
179 ) -> TimerProtocol:
180 warnings.warn("OpenTelemetry Timers are not yet implemented.")
181 return Timer()
184class MetricsMap:
185 """Stores Otel Instruments."""
187 def __init__(self, meter):
188 self.meter = meter
189 self.map = {}
191 def clear(self) -> None:
192 self.map.clear()
194 def _create_counter(self, name):
195 """Creates a new counter or up_down_counter for the provided name."""
196 otel_safe_name = name[:OTEL_NAME_MAX_LENGTH]
197 if name != otel_safe_name:
198 warnings.warn(
199 f"Metric name `{name}` exceeds OpenTelemetry's name length limit of "
200 f"{OTEL_NAME_MAX_LENGTH} characters and will be truncated to `{otel_safe_name}`."
201 )
203 if _is_up_down_counter(name):
204 counter = self.meter.create_up_down_counter(name=otel_safe_name)
205 else:
206 counter = self.meter.create_counter(name=otel_safe_name)
208 counter_type = str(type(counter)).split(".")[-1][:-2]
209 logging.debug("Created %s as type: %s", otel_safe_name, counter_type)
210 return counter
212 def get_counter(self, name: str, attributes: Attributes = None):
213 """
214 Returns the counter; creates a new one if it did not exist.
216 :param name: The name of the counter to fetch or create.
217 :param attributes: Counter attributes, used to generate a unique key to store the counter.
218 """
219 key = _generate_key_name(name, attributes)
220 if key in self.map.keys():
221 return self.map[key]
222 else:
223 new_counter = self._create_counter(name)
224 self.map[key] = new_counter
225 return new_counter
227 def del_counter(self, name: str, attributes: Attributes = None) -> None:
228 """
229 Deletes a counter.
231 :param name: The name of the counter to delete.
232 :param attributes: Counter attributes which were used to generate a unique key to store the counter.
233 """
234 key = _generate_key_name(name, attributes)
235 if key in self.map.keys():
236 del self.map[key]
239def get_otel_logger(cls) -> SafeOtelLogger:
240 host = conf.get("metrics", "otel_host") # ex: "breeze-otel-collector"
241 port = conf.getint("metrics", "otel_port") # ex: 4318
242 prefix = conf.get("metrics", "otel_prefix") # ex: "airflow"
243 interval = conf.getint("metrics", "otel_interval_milliseconds") # ex: 30000
244 debug = conf.getboolean("metrics", "otel_debugging_on")
246 allow_list = conf.get("metrics", "metrics_allow_list", fallback=None)
247 allow_list_validator = AllowListValidator(allow_list)
249 resource = Resource(attributes={SERVICE_NAME: "Airflow"})
250 # TODO: figure out https instead of http ??
251 endpoint = f"http://{host}:{port}/v1/metrics"
253 logging.info("[Metric Exporter] Connecting to OpenTelemetry Collector at %s", endpoint)
254 readers = [
255 PeriodicExportingMetricReader(
256 OTLPMetricExporter(
257 endpoint=endpoint,
258 headers={"Content-Type": "application/json"},
259 ),
260 export_interval_millis=interval,
261 )
262 ]
264 if debug:
265 export_to_console = PeriodicExportingMetricReader(ConsoleMetricExporter())
266 readers.append(export_to_console)
268 metrics.set_meter_provider(
269 MeterProvider(
270 resource=resource,
271 metric_readers=readers,
272 shutdown_on_exit=False,
273 ),
274 )
276 return SafeOtelLogger(metrics.get_meter_provider(), prefix, allow_list_validator)