Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/metrics/statsd_logger.py: 42%
79 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
18from __future__ import annotations
20import logging
21from functools import wraps
22from typing import TYPE_CHECKING, Callable, TypeVar, cast
24from airflow.configuration import conf
25from airflow.exceptions import AirflowConfigException
26from airflow.metrics.protocols import DeltaType, Timer, TimerProtocol
27from airflow.metrics.validators import (
28 AllowListValidator,
29 BlockListValidator,
30 ListValidator,
31 validate_stat,
32)
34if TYPE_CHECKING:
35 from statsd import StatsClient
37T = TypeVar("T", bound=Callable)
39log = logging.getLogger(__name__)
42def prepare_stat_with_tags(fn: T) -> T:
43 """Add tags to stat with influxdb standard format if influxdb_tags_enabled is True."""
45 @wraps(fn)
46 def wrapper(
47 self, stat: str | None = None, *args, tags: dict[str, str] | None = None, **kwargs
48 ) -> Callable[[str], str]:
49 if self.influxdb_tags_enabled:
50 if stat is not None and tags is not None:
51 for k, v in tags.items():
52 if self.metric_tags_validator.test(k):
53 if all(c not in [",", "="] for c in v + k):
54 stat += f",{k}={v}"
55 else:
56 log.error("Dropping invalid tag: %s=%s.", k, v)
57 return fn(self, stat, *args, tags=tags, **kwargs)
59 return cast(T, wrapper)
62class SafeStatsdLogger:
63 """StatsD Logger."""
65 def __init__(
66 self,
67 statsd_client: StatsClient,
68 metrics_validator: ListValidator = AllowListValidator(),
69 influxdb_tags_enabled: bool = False,
70 metric_tags_validator: ListValidator = AllowListValidator(),
71 ) -> None:
72 self.statsd = statsd_client
73 self.metrics_validator = metrics_validator
74 self.influxdb_tags_enabled = influxdb_tags_enabled
75 self.metric_tags_validator = metric_tags_validator
77 @prepare_stat_with_tags
78 @validate_stat
79 def incr(
80 self,
81 stat: str,
82 count: int = 1,
83 rate: float = 1,
84 *,
85 tags: dict[str, str] | None = None,
86 ) -> None:
87 """Increment stat."""
88 if self.metrics_validator.test(stat):
89 return self.statsd.incr(stat, count, rate)
90 return None
92 @prepare_stat_with_tags
93 @validate_stat
94 def decr(
95 self,
96 stat: str,
97 count: int = 1,
98 rate: float = 1,
99 *,
100 tags: dict[str, str] | None = None,
101 ) -> None:
102 """Decrement stat."""
103 if self.metrics_validator.test(stat):
104 return self.statsd.decr(stat, count, rate)
105 return None
107 @prepare_stat_with_tags
108 @validate_stat
109 def gauge(
110 self,
111 stat: str,
112 value: int | float,
113 rate: float = 1,
114 delta: bool = False,
115 *,
116 tags: dict[str, str] | None = None,
117 ) -> None:
118 """Gauge stat."""
119 if self.metrics_validator.test(stat):
120 return self.statsd.gauge(stat, value, rate, delta)
121 return None
123 @prepare_stat_with_tags
124 @validate_stat
125 def timing(
126 self,
127 stat: str,
128 dt: DeltaType,
129 *,
130 tags: dict[str, str] | None = None,
131 ) -> None:
132 """Stats timing."""
133 if self.metrics_validator.test(stat):
134 return self.statsd.timing(stat, dt)
135 return None
137 @prepare_stat_with_tags
138 @validate_stat
139 def timer(
140 self,
141 stat: str | None = None,
142 *args,
143 tags: dict[str, str] | None = None,
144 **kwargs,
145 ) -> TimerProtocol:
146 """Timer metric that can be cancelled."""
147 if stat and self.metrics_validator.test(stat):
148 return Timer(self.statsd.timer(stat, *args, **kwargs))
149 return Timer()
152def get_statsd_logger(cls) -> SafeStatsdLogger:
153 """Returns logger for StatsD."""
154 # no need to check for the scheduler/statsd_on -> this method is only called when it is set
155 # and previously it would crash with None is callable if it was called without it.
156 from statsd import StatsClient
158 stats_class = conf.getimport("metrics", "statsd_custom_client_path", fallback=None)
159 metrics_validator: ListValidator
161 if stats_class:
162 if not issubclass(stats_class, StatsClient):
163 raise AirflowConfigException(
164 "Your custom StatsD client must extend the statsd.StatsClient in order to ensure "
165 "backwards compatibility."
166 )
167 else:
168 log.info("Successfully loaded custom StatsD client")
170 else:
171 stats_class = StatsClient
173 statsd = stats_class(
174 host=conf.get("metrics", "statsd_host"),
175 port=conf.getint("metrics", "statsd_port"),
176 prefix=conf.get("metrics", "statsd_prefix"),
177 )
178 if conf.get("metrics", "metrics_allow_list", fallback=None):
179 metrics_validator = AllowListValidator(conf.get("metrics", "metrics_allow_list"))
180 if conf.get("metrics", "metrics_block_list", fallback=None):
181 log.warning(
182 "Ignoring metrics_block_list as both metrics_allow_list "
183 "and metrics_block_list have been set"
184 )
185 elif conf.get("metrics", "metrics_block_list", fallback=None):
186 metrics_validator = BlockListValidator(conf.get("metrics", "metrics_block_list"))
187 else:
188 metrics_validator = AllowListValidator()
189 influxdb_tags_enabled = conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False)
190 metric_tags_validator = BlockListValidator(conf.get("metrics", "statsd_disabled_tags", fallback=None))
191 return SafeStatsdLogger(statsd, metrics_validator, influxdb_tags_enabled, metric_tags_validator)