Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/metrics/validators.py: 44%

80 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18# Only characters in the character set are considered valid 

19# for the stat_name if stat_name_default_handler is used. 

20from __future__ import annotations 

21 

22import abc 

23import logging 

24import re 

25import string 

26import warnings 

27from functools import partial, wraps 

28from typing import Callable, Iterable, Pattern, cast 

29 

30from airflow.configuration import conf 

31from airflow.exceptions import InvalidStatsNameException 

32 

33log = logging.getLogger(__name__) 

34 

35 

36class MetricNameLengthExemptionWarning(Warning): 

37 """ 

38 A Warning class to be used for the metric name length exemption notice. 

39 Using a custom Warning class allows us to easily test that it is used. 

40 """ 

41 

42 ... 

43 

44 

45# Only characters in the character set are considered valid 

46# for the stat_name if stat_name_default_handler is used. 

47ALLOWED_CHARACTERS = frozenset(string.ascii_letters + string.digits + "_.-") 

48 

49# The following set contains existing metrics whose names are too long for 

50# OpenTelemetry and should be deprecated over time. This is implemented to 

51# ensure that any new metrics we introduce have names which meet the OTel 

52# standard while also allowing us time to deprecate the old names. 

53# NOTE: No new names should be added to this list. This list should 

54# only ever shorten over time as we deprecate these names. 

55BACK_COMPAT_METRIC_NAME_PATTERNS: set[str] = { 

56 r"^(?P<job_name>.*)_start$", 

57 r"^(?P<job_name>.*)_end$", 

58 r"^(?P<job_name>.*)_heartbeat_failure$", 

59 r"^local_task_job.task_exit\.(?P<job_id>.*)\.(?P<dag_id>.*)\.(?P<task_id>.*)\.(?P<return_code>.*)$", 

60 r"^operator_failures_(?P<operator_name>.*)$", 

61 r"^operator_successes_(?P<operator_name>.*)$", 

62 r"^ti.start.(?P<dag_id>.*)\.(?P<task_id>.*)$", 

63 r"^ti.finish.(?P<dag_id>.*)\.(?P<task_id>.*)\.(?P<state>.*)$", 

64 r"^task_removed_from_dag\.(?P<dag_id>.*)$", 

65 r"^task_restored_to_dag\.(?P<dag_id>.*)$", 

66 r"^task_instance_created-(?P<operator_name>.*)$", 

67 r"^dag_processing\.last_run\.seconds_ago\.(?P<dag_file>.*)$", 

68 r"^pool\.open_slots\.(?P<pool_name>.*)$", 

69 r"^pool\.queued_slots\.(?P<pool_name>.*)$", 

70 r"^pool\.running_slots\.(?P<pool_name>.*)$", 

71 r"^pool\.starving_tasks\.(?P<pool_name>.*)$", 

72 r"^dagrun\.dependency-check\.(?P<dag_id>.*)$", 

73 r"^dag\.(?P<dag_id>.*)\.(?P<task_id>.*)\.duration$", 

74 r"^dag_processing\.last_duration\.(?P<dag_file>.*)$", 

75 r"^dagrun\.duration\.success\.(?P<dag_id>.*)$", 

76 r"^dagrun\.duration\.failed\.(?P<dag_id>.*)$", 

77 r"^dagrun\.schedule_delay\.(?P<dag_id>.*)$", 

78 r"^dagrun\.(?P<dag_id>.*)\.first_task_scheduling_delay$", 

79} 

80BACK_COMPAT_METRIC_NAMES: set[Pattern[str]] = {re.compile(name) for name in BACK_COMPAT_METRIC_NAME_PATTERNS} 

81 

82OTEL_NAME_MAX_LENGTH = 63 

83 

84 

85def validate_stat(fn: Callable) -> Callable: 

86 """ 

87 Check if stat name contains invalid characters. 

88 Log and not emit stats if name is invalid. 

89 """ 

90 

91 @wraps(fn) 

92 def wrapper(self, stat: str | None = None, *args, **kwargs) -> Callable | None: 

93 try: 

94 if stat is not None: 

95 handler_stat_name_func = get_current_handler_stat_name_func() 

96 stat = handler_stat_name_func(stat) 

97 return fn(self, stat, *args, **kwargs) 

98 except InvalidStatsNameException: 

99 log.exception("Invalid stat name: %s.", stat) 

100 return None 

101 

102 return cast(Callable, wrapper) 

103 

104 

105def stat_name_otel_handler( 

106 stat_prefix: str, 

107 stat_name: str, 

108 max_length: int = OTEL_NAME_MAX_LENGTH, 

109) -> str: 

110 """ 

111 Verifies that a proposed prefix and name combination will meet OpenTelemetry naming standards. 

112 See: https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax 

113 

114 :param stat_prefix: The proposed prefix applied to all metric names. 

115 :param stat_name: The proposed name. 

116 :param max_length: The max length of the combined prefix and name; defaults to the max length 

117 as defined in the OpenTelemetry standard, but can be overridden. 

118 

119 :returns: Returns the approved combined name or raises an InvalidStatsNameException. 

120 """ 

121 proposed_stat_name: str = f"{stat_prefix}.{stat_name}" 

122 name_length_exemption: bool = False 

123 matched_exemption: str = "" 

124 

125 # This test case is here to enforce that the values can not be None and 

126 # must be a valid String. Without this test here, those values get cast 

127 # to a string and pass when they should not, potentially resulting in 

128 # metrics named "airflow.None", "airflow.42", or "None.42" for example. 

129 if not (isinstance(stat_name, str) and isinstance(stat_prefix, str)): 

130 raise InvalidStatsNameException("Stat name and prefix must both be strings.") 

131 

132 if len(proposed_stat_name) > OTEL_NAME_MAX_LENGTH: 

133 # If the name is in the exceptions list, do not fail it for being too long. 

134 # It may still be deemed invalid for other reasons below. 

135 for exemption in BACK_COMPAT_METRIC_NAMES: 

136 if re.match(exemption, stat_name): 

137 # There is a back-compat exception for this name; proceed 

138 name_length_exemption = True 

139 matched_exemption = exemption.pattern 

140 break 

141 else: 

142 raise InvalidStatsNameException( 

143 f"Invalid stat name: {proposed_stat_name}. Please see " 

144 f"https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax" 

145 ) 

146 

147 # `stat_name_default_handler` throws InvalidStatsNameException if the 

148 # provided value is not valid or returns the value if it is. We don't 

149 # need the return value but will make use of the validation checks. If 

150 # no exception is thrown, then the proposed name meets OTel requirements. 

151 stat_name_default_handler(proposed_stat_name, max_length=999 if name_length_exemption else max_length) 

152 

153 # This warning is down here instead of up above because the exemption only 

154 # applies to the length and a name may still be invalid for other reasons. 

155 if name_length_exemption: 

156 warnings.warn( 

157 f"Stat name {stat_name} matches exemption {matched_exemption} and " 

158 f"will be truncated to {proposed_stat_name[:OTEL_NAME_MAX_LENGTH]}. " 

159 f"This stat name will be deprecated in the future and replaced with " 

160 f"a shorter name combined with Attributes/Tags.", 

161 MetricNameLengthExemptionWarning, 

162 ) 

163 

164 return proposed_stat_name 

165 

166 

167def stat_name_default_handler( 

168 stat_name: str, max_length: int = 250, allowed_chars: Iterable[str] = ALLOWED_CHARACTERS 

169) -> str: 

170 """ 

171 Validate the metric stat name. 

172 

173 Apply changes when necessary and return the transformed stat name. 

174 """ 

175 if not isinstance(stat_name, str): 

176 raise InvalidStatsNameException("The stat_name has to be a string") 

177 if len(stat_name) > max_length: 

178 raise InvalidStatsNameException( 

179 f"The stat_name ({stat_name}) has to be less than {max_length} characters." 

180 ) 

181 if not all((c in allowed_chars) for c in stat_name): 

182 raise InvalidStatsNameException( 

183 f"The stat name ({stat_name}) has to be composed of ASCII " 

184 f"alphabets, numbers, or the underscore, dot, or dash characters." 

185 ) 

186 return stat_name 

187 

188 

189def get_current_handler_stat_name_func() -> Callable[[str], str]: 

190 """Get Stat Name Handler from airflow.cfg.""" 

191 handler = conf.getimport("metrics", "stat_name_handler") 

192 if handler is None: 

193 if conf.get("metrics", "statsd_influxdb_enabled", fallback=False): 

194 handler = partial(stat_name_default_handler, allowed_chars={*ALLOWED_CHARACTERS, ",", "="}) 

195 else: 

196 handler = stat_name_default_handler 

197 return handler 

198 

199 

200class ListValidator(metaclass=abc.ABCMeta): 

201 """ 

202 ListValidator metaclass that can be implemented as a AllowListValidator 

203 or BlockListValidator. The test method must be overridden by its subclass. 

204 """ 

205 

206 def __init__(self, validate_list: str | None = None) -> None: 

207 self.validate_list: tuple[str, ...] | None = ( 

208 tuple(item.strip().lower() for item in validate_list.split(",")) if validate_list else None 

209 ) 

210 

211 @classmethod 

212 def __subclasshook__(cls, subclass: Callable[[str], str]) -> bool: 

213 return hasattr(subclass, "test") and callable(subclass.test) or NotImplemented 

214 

215 @abc.abstractmethod 

216 def test(self, name: str) -> bool: 

217 """Test if name is allowed.""" 

218 raise NotImplementedError 

219 

220 

221class AllowListValidator(ListValidator): 

222 """AllowListValidator only allows names that match the allowed prefixes.""" 

223 

224 def test(self, name: str) -> bool: 

225 if self.validate_list is not None: 

226 return name.strip().lower().startswith(self.validate_list) 

227 else: 

228 return True # default is all metrics are allowed 

229 

230 

231class BlockListValidator(ListValidator): 

232 """BlockListValidator only allows names that do not match the blocked prefixes.""" 

233 

234 def test(self, name: str) -> bool: 

235 if self.validate_list is not None: 

236 return not name.strip().lower().startswith(self.validate_list) 

237 else: 

238 return True # default is all metrics are allowed