Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/metrics/validators.py: 44%
80 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
18# Only characters in the character set are considered valid
19# for the stat_name if stat_name_default_handler is used.
20from __future__ import annotations
22import abc
23import logging
24import re
25import string
26import warnings
27from functools import partial, wraps
28from typing import Callable, Iterable, Pattern, cast
30from airflow.configuration import conf
31from airflow.exceptions import InvalidStatsNameException
33log = logging.getLogger(__name__)
36class MetricNameLengthExemptionWarning(Warning):
37 """
38 A Warning class to be used for the metric name length exemption notice.
39 Using a custom Warning class allows us to easily test that it is used.
40 """
42 ...
45# Only characters in the character set are considered valid
46# for the stat_name if stat_name_default_handler is used.
47ALLOWED_CHARACTERS = frozenset(string.ascii_letters + string.digits + "_.-")
49# The following set contains existing metrics whose names are too long for
50# OpenTelemetry and should be deprecated over time. This is implemented to
51# ensure that any new metrics we introduce have names which meet the OTel
52# standard while also allowing us time to deprecate the old names.
53# NOTE: No new names should be added to this list. This list should
54# only ever shorten over time as we deprecate these names.
55BACK_COMPAT_METRIC_NAME_PATTERNS: set[str] = {
56 r"^(?P<job_name>.*)_start$",
57 r"^(?P<job_name>.*)_end$",
58 r"^(?P<job_name>.*)_heartbeat_failure$",
59 r"^local_task_job.task_exit\.(?P<job_id>.*)\.(?P<dag_id>.*)\.(?P<task_id>.*)\.(?P<return_code>.*)$",
60 r"^operator_failures_(?P<operator_name>.*)$",
61 r"^operator_successes_(?P<operator_name>.*)$",
62 r"^ti.start.(?P<dag_id>.*)\.(?P<task_id>.*)$",
63 r"^ti.finish.(?P<dag_id>.*)\.(?P<task_id>.*)\.(?P<state>.*)$",
64 r"^task_removed_from_dag\.(?P<dag_id>.*)$",
65 r"^task_restored_to_dag\.(?P<dag_id>.*)$",
66 r"^task_instance_created-(?P<operator_name>.*)$",
67 r"^dag_processing\.last_run\.seconds_ago\.(?P<dag_file>.*)$",
68 r"^pool\.open_slots\.(?P<pool_name>.*)$",
69 r"^pool\.queued_slots\.(?P<pool_name>.*)$",
70 r"^pool\.running_slots\.(?P<pool_name>.*)$",
71 r"^pool\.starving_tasks\.(?P<pool_name>.*)$",
72 r"^dagrun\.dependency-check\.(?P<dag_id>.*)$",
73 r"^dag\.(?P<dag_id>.*)\.(?P<task_id>.*)\.duration$",
74 r"^dag_processing\.last_duration\.(?P<dag_file>.*)$",
75 r"^dagrun\.duration\.success\.(?P<dag_id>.*)$",
76 r"^dagrun\.duration\.failed\.(?P<dag_id>.*)$",
77 r"^dagrun\.schedule_delay\.(?P<dag_id>.*)$",
78 r"^dagrun\.(?P<dag_id>.*)\.first_task_scheduling_delay$",
79}
80BACK_COMPAT_METRIC_NAMES: set[Pattern[str]] = {re.compile(name) for name in BACK_COMPAT_METRIC_NAME_PATTERNS}
82OTEL_NAME_MAX_LENGTH = 63
85def validate_stat(fn: Callable) -> Callable:
86 """
87 Check if stat name contains invalid characters.
88 Log and not emit stats if name is invalid.
89 """
91 @wraps(fn)
92 def wrapper(self, stat: str | None = None, *args, **kwargs) -> Callable | None:
93 try:
94 if stat is not None:
95 handler_stat_name_func = get_current_handler_stat_name_func()
96 stat = handler_stat_name_func(stat)
97 return fn(self, stat, *args, **kwargs)
98 except InvalidStatsNameException:
99 log.exception("Invalid stat name: %s.", stat)
100 return None
102 return cast(Callable, wrapper)
105def stat_name_otel_handler(
106 stat_prefix: str,
107 stat_name: str,
108 max_length: int = OTEL_NAME_MAX_LENGTH,
109) -> str:
110 """
111 Verifies that a proposed prefix and name combination will meet OpenTelemetry naming standards.
112 See: https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax
114 :param stat_prefix: The proposed prefix applied to all metric names.
115 :param stat_name: The proposed name.
116 :param max_length: The max length of the combined prefix and name; defaults to the max length
117 as defined in the OpenTelemetry standard, but can be overridden.
119 :returns: Returns the approved combined name or raises an InvalidStatsNameException.
120 """
121 proposed_stat_name: str = f"{stat_prefix}.{stat_name}"
122 name_length_exemption: bool = False
123 matched_exemption: str = ""
125 # This test case is here to enforce that the values can not be None and
126 # must be a valid String. Without this test here, those values get cast
127 # to a string and pass when they should not, potentially resulting in
128 # metrics named "airflow.None", "airflow.42", or "None.42" for example.
129 if not (isinstance(stat_name, str) and isinstance(stat_prefix, str)):
130 raise InvalidStatsNameException("Stat name and prefix must both be strings.")
132 if len(proposed_stat_name) > OTEL_NAME_MAX_LENGTH:
133 # If the name is in the exceptions list, do not fail it for being too long.
134 # It may still be deemed invalid for other reasons below.
135 for exemption in BACK_COMPAT_METRIC_NAMES:
136 if re.match(exemption, stat_name):
137 # There is a back-compat exception for this name; proceed
138 name_length_exemption = True
139 matched_exemption = exemption.pattern
140 break
141 else:
142 raise InvalidStatsNameException(
143 f"Invalid stat name: {proposed_stat_name}. Please see "
144 f"https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax"
145 )
147 # `stat_name_default_handler` throws InvalidStatsNameException if the
148 # provided value is not valid or returns the value if it is. We don't
149 # need the return value but will make use of the validation checks. If
150 # no exception is thrown, then the proposed name meets OTel requirements.
151 stat_name_default_handler(proposed_stat_name, max_length=999 if name_length_exemption else max_length)
153 # This warning is down here instead of up above because the exemption only
154 # applies to the length and a name may still be invalid for other reasons.
155 if name_length_exemption:
156 warnings.warn(
157 f"Stat name {stat_name} matches exemption {matched_exemption} and "
158 f"will be truncated to {proposed_stat_name[:OTEL_NAME_MAX_LENGTH]}. "
159 f"This stat name will be deprecated in the future and replaced with "
160 f"a shorter name combined with Attributes/Tags.",
161 MetricNameLengthExemptionWarning,
162 )
164 return proposed_stat_name
167def stat_name_default_handler(
168 stat_name: str, max_length: int = 250, allowed_chars: Iterable[str] = ALLOWED_CHARACTERS
169) -> str:
170 """
171 Validate the metric stat name.
173 Apply changes when necessary and return the transformed stat name.
174 """
175 if not isinstance(stat_name, str):
176 raise InvalidStatsNameException("The stat_name has to be a string")
177 if len(stat_name) > max_length:
178 raise InvalidStatsNameException(
179 f"The stat_name ({stat_name}) has to be less than {max_length} characters."
180 )
181 if not all((c in allowed_chars) for c in stat_name):
182 raise InvalidStatsNameException(
183 f"The stat name ({stat_name}) has to be composed of ASCII "
184 f"alphabets, numbers, or the underscore, dot, or dash characters."
185 )
186 return stat_name
189def get_current_handler_stat_name_func() -> Callable[[str], str]:
190 """Get Stat Name Handler from airflow.cfg."""
191 handler = conf.getimport("metrics", "stat_name_handler")
192 if handler is None:
193 if conf.get("metrics", "statsd_influxdb_enabled", fallback=False):
194 handler = partial(stat_name_default_handler, allowed_chars={*ALLOWED_CHARACTERS, ",", "="})
195 else:
196 handler = stat_name_default_handler
197 return handler
200class ListValidator(metaclass=abc.ABCMeta):
201 """
202 ListValidator metaclass that can be implemented as a AllowListValidator
203 or BlockListValidator. The test method must be overridden by its subclass.
204 """
206 def __init__(self, validate_list: str | None = None) -> None:
207 self.validate_list: tuple[str, ...] | None = (
208 tuple(item.strip().lower() for item in validate_list.split(",")) if validate_list else None
209 )
211 @classmethod
212 def __subclasshook__(cls, subclass: Callable[[str], str]) -> bool:
213 return hasattr(subclass, "test") and callable(subclass.test) or NotImplemented
215 @abc.abstractmethod
216 def test(self, name: str) -> bool:
217 """Test if name is allowed."""
218 raise NotImplementedError
221class AllowListValidator(ListValidator):
222 """AllowListValidator only allows names that match the allowed prefixes."""
224 def test(self, name: str) -> bool:
225 if self.validate_list is not None:
226 return name.strip().lower().startswith(self.validate_list)
227 else:
228 return True # default is all metrics are allowed
231class BlockListValidator(ListValidator):
232 """BlockListValidator only allows names that do not match the blocked prefixes."""
234 def test(self, name: str) -> bool:
235 if self.validate_list is not None:
236 return not name.strip().lower().startswith(self.validate_list)
237 else:
238 return True # default is all metrics are allowed