Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/stats.py: 49%
211 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import datetime
21import logging
22import socket
23import string
24import time
25from functools import wraps
26from typing import TYPE_CHECKING, Callable, TypeVar, cast
28from airflow.configuration import conf
29from airflow.exceptions import AirflowConfigException, InvalidStatsNameException
30from airflow.typing_compat import Protocol
32log = logging.getLogger(__name__)
35class TimerProtocol(Protocol):
36 """Type protocol for StatsLogger.timer."""
38 def __enter__(self):
39 ...
41 def __exit__(self, exc_type, exc_value, traceback):
42 ...
44 def start(self):
45 """Start the timer."""
46 ...
48 def stop(self, send=True):
49 """Stop, and (by default) submit the timer to StatsD."""
50 ...
53class StatsLogger(Protocol):
54 """This class is only used for TypeChecking (for IDEs, mypy, etc)."""
56 @classmethod
57 def incr(cls, stat: str, count: int = 1, rate: int = 1) -> None:
58 """Increment stat."""
60 @classmethod
61 def decr(cls, stat: str, count: int = 1, rate: int = 1) -> None:
62 """Decrement stat."""
64 @classmethod
65 def gauge(cls, stat: str, value: float, rate: int = 1, delta: bool = False) -> None:
66 """Gauge stat."""
68 @classmethod
69 def timing(cls, stat: str, dt: float | datetime.timedelta) -> None:
70 """Stats timing."""
72 @classmethod
73 def timer(cls, *args, **kwargs) -> TimerProtocol:
74 """Timer metric that can be cancelled."""
77class Timer:
78 """
79 Timer that records duration, and optional sends to StatsD backend.
81 This class lets us have an accurate timer with the logic in one place (so
82 that we don't use datetime math for duration -- it is error prone).
84 Example usage:
86 .. code-block:: python
88 with Stats.timer() as t:
89 # Something to time
90 frob_the_foos()
92 log.info("Frobbing the foos took %.2f", t.duration)
94 Or without a context manager:
96 .. code-block:: python
98 timer = Stats.timer().start()
100 # Something to time
101 frob_the_foos()
103 timer.end()
105 log.info("Frobbing the foos took %.2f", timer.duration)
107 To send a metric:
109 .. code-block:: python
111 with Stats.timer("foos.frob"):
112 # Something to time
113 frob_the_foos()
115 Or both:
117 .. code-block:: python
119 with Stats.timer("foos.frob") as t:
120 # Something to time
121 frob_the_foos()
123 log.info("Frobbing the foos took %.2f", t.duration)
124 """
126 # pystatsd and dogstatsd both have a timer class, but present different API
127 # so we can't use this as a mixin on those, instead this class is contains the "real" timer
129 _start_time: int | None
130 duration: int | None
132 def __init__(self, real_timer=None):
133 self.real_timer = real_timer
135 def __enter__(self):
136 return self.start()
138 def __exit__(self, exc_type, exc_value, traceback):
139 self.stop()
141 def start(self):
142 """Start the timer."""
143 if self.real_timer:
144 self.real_timer.start()
145 self._start_time = time.perf_counter()
146 return self
148 def stop(self, send=True):
149 """Stop the timer, and optionally send it to stats backend."""
150 self.duration = time.perf_counter() - self._start_time
151 if send and self.real_timer:
152 self.real_timer.stop()
155class DummyStatsLogger:
156 """If no StatsLogger is configured, DummyStatsLogger is used as a fallback."""
158 @classmethod
159 def incr(cls, stat, count=1, rate=1):
160 """Increment stat."""
162 @classmethod
163 def decr(cls, stat, count=1, rate=1):
164 """Decrement stat."""
166 @classmethod
167 def gauge(cls, stat, value, rate=1, delta=False):
168 """Gauge stat."""
170 @classmethod
171 def timing(cls, stat, dt):
172 """Stats timing."""
174 @classmethod
175 def timer(cls, *args, **kwargs):
176 """Timer metric that can be cancelled."""
177 return Timer()
180# Only characters in the character set are considered valid
181# for the stat_name if stat_name_default_handler is used.
182ALLOWED_CHARACTERS = set(string.ascii_letters + string.digits + "_.-")
185def stat_name_default_handler(stat_name, max_length=250) -> str:
186 """
187 Validate the StatsD stat name.
189 Apply changes when necessary and return the transformed stat name.
190 """
191 if not isinstance(stat_name, str):
192 raise InvalidStatsNameException("The stat_name has to be a string")
193 if len(stat_name) > max_length:
194 raise InvalidStatsNameException(
195 f"The stat_name ({stat_name}) has to be less than {max_length} characters."
196 )
197 if not all((c in ALLOWED_CHARACTERS) for c in stat_name):
198 raise InvalidStatsNameException(
199 f"The stat name ({stat_name}) has to be composed of ASCII "
200 f"alphabets, numbers, or the underscore, dot, or dash characters."
201 )
202 return stat_name
205def get_current_handler_stat_name_func() -> Callable[[str], str]:
206 """Get Stat Name Handler from airflow.cfg."""
207 return conf.getimport("metrics", "stat_name_handler") or stat_name_default_handler
210T = TypeVar("T", bound=Callable)
213def validate_stat(fn: T) -> T:
214 """
215 Check if stat name contains invalid characters.
216 Log and not emit stats if name is invalid.
217 """
219 @wraps(fn)
220 def wrapper(_self, stat=None, *args, **kwargs):
221 try:
222 if stat is not None:
223 handler_stat_name_func = get_current_handler_stat_name_func()
224 stat = handler_stat_name_func(stat)
225 return fn(_self, stat, *args, **kwargs)
226 except InvalidStatsNameException:
227 log.exception("Invalid stat name: %s.", stat)
228 return None
230 return cast(T, wrapper)
233class AllowListValidator:
234 """Class to filter unwanted stats."""
236 def __init__(self, allow_list=None):
237 if allow_list:
239 self.allow_list = tuple(item.strip().lower() for item in allow_list.split(","))
240 else:
241 self.allow_list = None
243 def test(self, stat):
244 """Test if stat is in the Allow List."""
245 if self.allow_list is not None:
246 return stat.strip().lower().startswith(self.allow_list)
247 else:
248 return True # default is all metrics allowed
251class SafeStatsdLogger:
252 """StatsD Logger."""
254 def __init__(self, statsd_client, allow_list_validator=AllowListValidator()):
255 self.statsd = statsd_client
256 self.allow_list_validator = allow_list_validator
258 @validate_stat
259 def incr(self, stat, count=1, rate=1):
260 """Increment stat."""
261 if self.allow_list_validator.test(stat):
262 return self.statsd.incr(stat, count, rate)
263 return None
265 @validate_stat
266 def decr(self, stat, count=1, rate=1):
267 """Decrement stat."""
268 if self.allow_list_validator.test(stat):
269 return self.statsd.decr(stat, count, rate)
270 return None
272 @validate_stat
273 def gauge(self, stat, value, rate=1, delta=False):
274 """Gauge stat."""
275 if self.allow_list_validator.test(stat):
276 return self.statsd.gauge(stat, value, rate, delta)
277 return None
279 @validate_stat
280 def timing(self, stat, dt):
281 """Stats timing."""
282 if self.allow_list_validator.test(stat):
283 return self.statsd.timing(stat, dt)
284 return None
286 @validate_stat
287 def timer(self, stat=None, *args, **kwargs):
288 """Timer metric that can be cancelled."""
289 if stat and self.allow_list_validator.test(stat):
290 return Timer(self.statsd.timer(stat, *args, **kwargs))
291 return Timer()
294class SafeDogStatsdLogger:
295 """DogStatsd Logger."""
297 def __init__(self, dogstatsd_client, allow_list_validator=AllowListValidator()):
298 self.dogstatsd = dogstatsd_client
299 self.allow_list_validator = allow_list_validator
301 @validate_stat
302 def incr(self, stat, count=1, rate=1, tags=None):
303 """Increment stat."""
304 if self.allow_list_validator.test(stat):
305 tags = tags or []
306 return self.dogstatsd.increment(metric=stat, value=count, tags=tags, sample_rate=rate)
307 return None
309 @validate_stat
310 def decr(self, stat, count=1, rate=1, tags=None):
311 """Decrement stat."""
312 if self.allow_list_validator.test(stat):
313 tags = tags or []
314 return self.dogstatsd.decrement(metric=stat, value=count, tags=tags, sample_rate=rate)
315 return None
317 @validate_stat
318 def gauge(self, stat, value, rate=1, delta=False, tags=None):
319 """Gauge stat."""
320 if self.allow_list_validator.test(stat):
321 tags = tags or []
322 return self.dogstatsd.gauge(metric=stat, value=value, tags=tags, sample_rate=rate)
323 return None
325 @validate_stat
326 def timing(self, stat, dt: float | datetime.timedelta, tags: list[str] | None = None):
327 """Stats timing."""
328 if self.allow_list_validator.test(stat):
329 tags = tags or []
330 if isinstance(dt, datetime.timedelta):
331 dt = dt.total_seconds()
332 return self.dogstatsd.timing(metric=stat, value=dt, tags=tags)
333 return None
335 @validate_stat
336 def timer(self, stat=None, *args, tags=None, **kwargs):
337 """Timer metric that can be cancelled."""
338 if stat and self.allow_list_validator.test(stat):
339 tags = tags or []
340 return Timer(self.dogstatsd.timed(stat, *args, tags=tags, **kwargs))
341 return Timer()
344class _Stats(type):
345 factory = None
346 instance: StatsLogger | None = None
348 def __getattr__(cls, name):
349 if not cls.instance:
350 try:
351 cls.instance = cls.factory()
352 except (socket.gaierror, ImportError) as e:
353 log.error("Could not configure StatsClient: %s, using DummyStatsLogger instead.", e)
354 cls.instance = DummyStatsLogger()
355 return getattr(cls.instance, name)
357 def __init__(cls, *args, **kwargs):
358 super().__init__(cls)
359 if cls.__class__.factory is None:
360 is_datadog_enabled_defined = conf.has_option("metrics", "statsd_datadog_enabled")
361 if is_datadog_enabled_defined and conf.getboolean("metrics", "statsd_datadog_enabled"):
362 cls.__class__.factory = cls.get_dogstatsd_logger
363 elif conf.getboolean("metrics", "statsd_on"):
364 cls.__class__.factory = cls.get_statsd_logger
365 else:
366 cls.__class__.factory = DummyStatsLogger
368 @classmethod
369 def get_statsd_logger(cls):
370 """Returns logger for StatsD."""
371 # no need to check for the scheduler/statsd_on -> this method is only called when it is set
372 # and previously it would crash with None is callable if it was called without it.
373 from statsd import StatsClient
375 stats_class = conf.getimport("metrics", "statsd_custom_client_path", fallback=None)
377 if stats_class:
378 if not issubclass(stats_class, StatsClient):
379 raise AirflowConfigException(
380 "Your custom StatsD client must extend the statsd.StatsClient in order to ensure "
381 "backwards compatibility."
382 )
383 else:
384 log.info("Successfully loaded custom StatsD client")
386 else:
387 stats_class = StatsClient
389 statsd = stats_class(
390 host=conf.get("metrics", "statsd_host"),
391 port=conf.getint("metrics", "statsd_port"),
392 prefix=conf.get("metrics", "statsd_prefix"),
393 )
394 allow_list_validator = AllowListValidator(conf.get("metrics", "statsd_allow_list", fallback=None))
395 return SafeStatsdLogger(statsd, allow_list_validator)
397 @classmethod
398 def get_dogstatsd_logger(cls):
399 """Get DataDog StatsD logger."""
400 from datadog import DogStatsd
402 dogstatsd = DogStatsd(
403 host=conf.get("metrics", "statsd_host"),
404 port=conf.getint("metrics", "statsd_port"),
405 namespace=conf.get("metrics", "statsd_prefix"),
406 constant_tags=cls.get_constant_tags(),
407 )
408 dogstatsd_allow_list = conf.get("metrics", "statsd_allow_list", fallback=None)
409 allow_list_validator = AllowListValidator(dogstatsd_allow_list)
410 return SafeDogStatsdLogger(dogstatsd, allow_list_validator)
412 @classmethod
413 def get_constant_tags(cls):
414 """Get constant DataDog tags to add to all stats."""
415 tags = []
416 tags_in_string = conf.get("metrics", "statsd_datadog_tags", fallback=None)
417 if tags_in_string is None or tags_in_string == "":
418 return tags
419 else:
420 for key_value in tags_in_string.split(","):
421 tags.append(key_value)
422 return tags
425if TYPE_CHECKING:
426 Stats: StatsLogger
427else:
429 class Stats(metaclass=_Stats):
430 """Empty class for Stats - we use metaclass to inject the right one."""