Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/metrics/datadog_logger.py: 31%
71 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
18from __future__ import annotations
20import datetime
21import logging
22from typing import TYPE_CHECKING
24from airflow.configuration import conf
25from airflow.metrics.protocols import DeltaType, Timer, TimerProtocol
26from airflow.metrics.validators import (
27 AllowListValidator,
28 BlockListValidator,
29 ListValidator,
30 validate_stat,
31)
33if TYPE_CHECKING:
34 from datadog import DogStatsd
36log = logging.getLogger(__name__)
39class SafeDogStatsdLogger:
40 """DogStatsd Logger."""
42 def __init__(
43 self,
44 dogstatsd_client: DogStatsd,
45 metrics_validator: ListValidator = AllowListValidator(),
46 metrics_tags: bool = False,
47 metric_tags_validator: ListValidator = AllowListValidator(),
48 ) -> None:
49 self.dogstatsd = dogstatsd_client
50 self.metrics_validator = metrics_validator
51 self.metrics_tags = metrics_tags
52 self.metric_tags_validator = metric_tags_validator
54 @validate_stat
55 def incr(
56 self,
57 stat: str,
58 count: int = 1,
59 rate: float = 1,
60 *,
61 tags: dict[str, str] | None = None,
62 ) -> None:
63 """Increment stat."""
64 if self.metrics_tags and isinstance(tags, dict):
65 tags_list = [
66 f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
67 ]
68 else:
69 tags_list = []
70 if self.metrics_validator.test(stat):
71 return self.dogstatsd.increment(metric=stat, value=count, tags=tags_list, sample_rate=rate)
72 return None
74 @validate_stat
75 def decr(
76 self,
77 stat: str,
78 count: int = 1,
79 rate: float = 1,
80 *,
81 tags: dict[str, str] | None = None,
82 ) -> None:
83 """Decrement stat."""
84 if self.metrics_tags and isinstance(tags, dict):
85 tags_list = [
86 f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
87 ]
88 else:
89 tags_list = []
90 if self.metrics_validator.test(stat):
91 return self.dogstatsd.decrement(metric=stat, value=count, tags=tags_list, sample_rate=rate)
92 return None
94 @validate_stat
95 def gauge(
96 self,
97 stat: str,
98 value: int | float,
99 rate: float = 1,
100 delta: bool = False,
101 *,
102 tags: dict[str, str] | None = None,
103 ) -> None:
104 """Gauge stat."""
105 if self.metrics_tags and isinstance(tags, dict):
106 tags_list = [
107 f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
108 ]
109 else:
110 tags_list = []
111 if self.metrics_validator.test(stat):
112 return self.dogstatsd.gauge(metric=stat, value=value, tags=tags_list, sample_rate=rate)
113 return None
115 @validate_stat
116 def timing(
117 self,
118 stat: str,
119 dt: DeltaType,
120 *,
121 tags: dict[str, str] | None = None,
122 ) -> None:
123 """Stats timing."""
124 if self.metrics_tags and isinstance(tags, dict):
125 tags_list = [
126 f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
127 ]
128 else:
129 tags_list = []
130 if self.metrics_validator.test(stat):
131 if isinstance(dt, datetime.timedelta):
132 dt = dt.total_seconds()
133 return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list)
134 return None
136 @validate_stat
137 def timer(
138 self,
139 stat: str | None = None,
140 tags: dict[str, str] | None = None,
141 **kwargs,
142 ) -> TimerProtocol:
143 """Timer metric that can be cancelled."""
144 if self.metrics_tags and isinstance(tags, dict):
145 tags_list = [
146 f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
147 ]
148 else:
149 tags_list = []
150 if stat and self.metrics_validator.test(stat):
151 return Timer(self.dogstatsd.timed(stat, tags=tags_list, **kwargs))
152 return Timer()
155def get_dogstatsd_logger(cls) -> SafeDogStatsdLogger:
156 """Get DataDog StatsD logger."""
157 from datadog import DogStatsd
159 metrics_validator: ListValidator
161 dogstatsd = DogStatsd(
162 host=conf.get("metrics", "statsd_host"),
163 port=conf.getint("metrics", "statsd_port"),
164 namespace=conf.get("metrics", "statsd_prefix"),
165 constant_tags=cls.get_constant_tags(),
166 )
167 if conf.get("metrics", "metrics_allow_list", fallback=None):
168 metrics_validator = AllowListValidator(conf.get("metrics", "metrics_allow_list"))
169 if conf.get("metrics", "metrics_block_list", fallback=None):
170 log.warning(
171 "Ignoring metrics_block_list as both metrics_allow_list "
172 "and metrics_block_list have been set"
173 )
174 elif conf.get("metrics", "metrics_block_list", fallback=None):
175 metrics_validator = BlockListValidator(conf.get("metrics", "metrics_block_list"))
176 else:
177 metrics_validator = AllowListValidator()
178 datadog_metrics_tags = conf.getboolean("metrics", "statsd_datadog_metrics_tags", fallback=True)
179 metric_tags_validator = BlockListValidator(conf.get("metrics", "statsd_disabled_tags", fallback=None))
180 return SafeDogStatsdLogger(dogstatsd, metrics_validator, datadog_metrics_tags, metric_tags_validator)