Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/stats.py: 49%

211 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import datetime 

21import logging 

22import socket 

23import string 

24import time 

25from functools import wraps 

26from typing import TYPE_CHECKING, Callable, TypeVar, cast 

27 

28from airflow.configuration import conf 

29from airflow.exceptions import AirflowConfigException, InvalidStatsNameException 

30from airflow.typing_compat import Protocol 

31 

32log = logging.getLogger(__name__) 

33 

34 

35class TimerProtocol(Protocol): 

36 """Type protocol for StatsLogger.timer.""" 

37 

38 def __enter__(self): 

39 ... 

40 

41 def __exit__(self, exc_type, exc_value, traceback): 

42 ... 

43 

44 def start(self): 

45 """Start the timer.""" 

46 ... 

47 

48 def stop(self, send=True): 

49 """Stop, and (by default) submit the timer to StatsD.""" 

50 ... 

51 

52 

53class StatsLogger(Protocol): 

54 """This class is only used for TypeChecking (for IDEs, mypy, etc).""" 

55 

56 @classmethod 

57 def incr(cls, stat: str, count: int = 1, rate: int = 1) -> None: 

58 """Increment stat.""" 

59 

60 @classmethod 

61 def decr(cls, stat: str, count: int = 1, rate: int = 1) -> None: 

62 """Decrement stat.""" 

63 

64 @classmethod 

65 def gauge(cls, stat: str, value: float, rate: int = 1, delta: bool = False) -> None: 

66 """Gauge stat.""" 

67 

68 @classmethod 

69 def timing(cls, stat: str, dt: float | datetime.timedelta) -> None: 

70 """Stats timing.""" 

71 

72 @classmethod 

73 def timer(cls, *args, **kwargs) -> TimerProtocol: 

74 """Timer metric that can be cancelled.""" 

75 

76 

77class Timer: 

78 """ 

79 Timer that records duration, and optional sends to StatsD backend. 

80 

81 This class lets us have an accurate timer with the logic in one place (so 

82 that we don't use datetime math for duration -- it is error prone). 

83 

84 Example usage: 

85 

86 .. code-block:: python 

87 

88 with Stats.timer() as t: 

89 # Something to time 

90 frob_the_foos() 

91 

92 log.info("Frobbing the foos took %.2f", t.duration) 

93 

94 Or without a context manager: 

95 

96 .. code-block:: python 

97 

98 timer = Stats.timer().start() 

99 

100 # Something to time 

101 frob_the_foos() 

102 

103 timer.end() 

104 

105 log.info("Frobbing the foos took %.2f", timer.duration) 

106 

107 To send a metric: 

108 

109 .. code-block:: python 

110 

111 with Stats.timer("foos.frob"): 

112 # Something to time 

113 frob_the_foos() 

114 

115 Or both: 

116 

117 .. code-block:: python 

118 

119 with Stats.timer("foos.frob") as t: 

120 # Something to time 

121 frob_the_foos() 

122 

123 log.info("Frobbing the foos took %.2f", t.duration) 

124 """ 

125 

126 # pystatsd and dogstatsd both have a timer class, but present different API 

127 # so we can't use this as a mixin on those, instead this class is contains the "real" timer 

128 

129 _start_time: int | None 

130 duration: int | None 

131 

132 def __init__(self, real_timer=None): 

133 self.real_timer = real_timer 

134 

135 def __enter__(self): 

136 return self.start() 

137 

138 def __exit__(self, exc_type, exc_value, traceback): 

139 self.stop() 

140 

141 def start(self): 

142 """Start the timer.""" 

143 if self.real_timer: 

144 self.real_timer.start() 

145 self._start_time = time.perf_counter() 

146 return self 

147 

148 def stop(self, send=True): 

149 """Stop the timer, and optionally send it to stats backend.""" 

150 self.duration = time.perf_counter() - self._start_time 

151 if send and self.real_timer: 

152 self.real_timer.stop() 

153 

154 

155class DummyStatsLogger: 

156 """If no StatsLogger is configured, DummyStatsLogger is used as a fallback.""" 

157 

158 @classmethod 

159 def incr(cls, stat, count=1, rate=1): 

160 """Increment stat.""" 

161 

162 @classmethod 

163 def decr(cls, stat, count=1, rate=1): 

164 """Decrement stat.""" 

165 

166 @classmethod 

167 def gauge(cls, stat, value, rate=1, delta=False): 

168 """Gauge stat.""" 

169 

170 @classmethod 

171 def timing(cls, stat, dt): 

172 """Stats timing.""" 

173 

174 @classmethod 

175 def timer(cls, *args, **kwargs): 

176 """Timer metric that can be cancelled.""" 

177 return Timer() 

178 

179 

180# Only characters in the character set are considered valid 

181# for the stat_name if stat_name_default_handler is used. 

182ALLOWED_CHARACTERS = set(string.ascii_letters + string.digits + "_.-") 

183 

184 

185def stat_name_default_handler(stat_name, max_length=250) -> str: 

186 """ 

187 Validate the StatsD stat name. 

188 

189 Apply changes when necessary and return the transformed stat name. 

190 """ 

191 if not isinstance(stat_name, str): 

192 raise InvalidStatsNameException("The stat_name has to be a string") 

193 if len(stat_name) > max_length: 

194 raise InvalidStatsNameException( 

195 f"The stat_name ({stat_name}) has to be less than {max_length} characters." 

196 ) 

197 if not all((c in ALLOWED_CHARACTERS) for c in stat_name): 

198 raise InvalidStatsNameException( 

199 f"The stat name ({stat_name}) has to be composed of ASCII " 

200 f"alphabets, numbers, or the underscore, dot, or dash characters." 

201 ) 

202 return stat_name 

203 

204 

205def get_current_handler_stat_name_func() -> Callable[[str], str]: 

206 """Get Stat Name Handler from airflow.cfg.""" 

207 return conf.getimport("metrics", "stat_name_handler") or stat_name_default_handler 

208 

209 

210T = TypeVar("T", bound=Callable) 

211 

212 

213def validate_stat(fn: T) -> T: 

214 """ 

215 Check if stat name contains invalid characters. 

216 Log and not emit stats if name is invalid. 

217 """ 

218 

219 @wraps(fn) 

220 def wrapper(_self, stat=None, *args, **kwargs): 

221 try: 

222 if stat is not None: 

223 handler_stat_name_func = get_current_handler_stat_name_func() 

224 stat = handler_stat_name_func(stat) 

225 return fn(_self, stat, *args, **kwargs) 

226 except InvalidStatsNameException: 

227 log.exception("Invalid stat name: %s.", stat) 

228 return None 

229 

230 return cast(T, wrapper) 

231 

232 

233class AllowListValidator: 

234 """Class to filter unwanted stats.""" 

235 

236 def __init__(self, allow_list=None): 

237 if allow_list: 

238 

239 self.allow_list = tuple(item.strip().lower() for item in allow_list.split(",")) 

240 else: 

241 self.allow_list = None 

242 

243 def test(self, stat): 

244 """Test if stat is in the Allow List.""" 

245 if self.allow_list is not None: 

246 return stat.strip().lower().startswith(self.allow_list) 

247 else: 

248 return True # default is all metrics allowed 

249 

250 

251class SafeStatsdLogger: 

252 """StatsD Logger.""" 

253 

254 def __init__(self, statsd_client, allow_list_validator=AllowListValidator()): 

255 self.statsd = statsd_client 

256 self.allow_list_validator = allow_list_validator 

257 

258 @validate_stat 

259 def incr(self, stat, count=1, rate=1): 

260 """Increment stat.""" 

261 if self.allow_list_validator.test(stat): 

262 return self.statsd.incr(stat, count, rate) 

263 return None 

264 

265 @validate_stat 

266 def decr(self, stat, count=1, rate=1): 

267 """Decrement stat.""" 

268 if self.allow_list_validator.test(stat): 

269 return self.statsd.decr(stat, count, rate) 

270 return None 

271 

272 @validate_stat 

273 def gauge(self, stat, value, rate=1, delta=False): 

274 """Gauge stat.""" 

275 if self.allow_list_validator.test(stat): 

276 return self.statsd.gauge(stat, value, rate, delta) 

277 return None 

278 

279 @validate_stat 

280 def timing(self, stat, dt): 

281 """Stats timing.""" 

282 if self.allow_list_validator.test(stat): 

283 return self.statsd.timing(stat, dt) 

284 return None 

285 

286 @validate_stat 

287 def timer(self, stat=None, *args, **kwargs): 

288 """Timer metric that can be cancelled.""" 

289 if stat and self.allow_list_validator.test(stat): 

290 return Timer(self.statsd.timer(stat, *args, **kwargs)) 

291 return Timer() 

292 

293 

294class SafeDogStatsdLogger: 

295 """DogStatsd Logger.""" 

296 

297 def __init__(self, dogstatsd_client, allow_list_validator=AllowListValidator()): 

298 self.dogstatsd = dogstatsd_client 

299 self.allow_list_validator = allow_list_validator 

300 

301 @validate_stat 

302 def incr(self, stat, count=1, rate=1, tags=None): 

303 """Increment stat.""" 

304 if self.allow_list_validator.test(stat): 

305 tags = tags or [] 

306 return self.dogstatsd.increment(metric=stat, value=count, tags=tags, sample_rate=rate) 

307 return None 

308 

309 @validate_stat 

310 def decr(self, stat, count=1, rate=1, tags=None): 

311 """Decrement stat.""" 

312 if self.allow_list_validator.test(stat): 

313 tags = tags or [] 

314 return self.dogstatsd.decrement(metric=stat, value=count, tags=tags, sample_rate=rate) 

315 return None 

316 

317 @validate_stat 

318 def gauge(self, stat, value, rate=1, delta=False, tags=None): 

319 """Gauge stat.""" 

320 if self.allow_list_validator.test(stat): 

321 tags = tags or [] 

322 return self.dogstatsd.gauge(metric=stat, value=value, tags=tags, sample_rate=rate) 

323 return None 

324 

325 @validate_stat 

326 def timing(self, stat, dt: float | datetime.timedelta, tags: list[str] | None = None): 

327 """Stats timing.""" 

328 if self.allow_list_validator.test(stat): 

329 tags = tags or [] 

330 if isinstance(dt, datetime.timedelta): 

331 dt = dt.total_seconds() 

332 return self.dogstatsd.timing(metric=stat, value=dt, tags=tags) 

333 return None 

334 

335 @validate_stat 

336 def timer(self, stat=None, *args, tags=None, **kwargs): 

337 """Timer metric that can be cancelled.""" 

338 if stat and self.allow_list_validator.test(stat): 

339 tags = tags or [] 

340 return Timer(self.dogstatsd.timed(stat, *args, tags=tags, **kwargs)) 

341 return Timer() 

342 

343 

344class _Stats(type): 

345 factory = None 

346 instance: StatsLogger | None = None 

347 

348 def __getattr__(cls, name): 

349 if not cls.instance: 

350 try: 

351 cls.instance = cls.factory() 

352 except (socket.gaierror, ImportError) as e: 

353 log.error("Could not configure StatsClient: %s, using DummyStatsLogger instead.", e) 

354 cls.instance = DummyStatsLogger() 

355 return getattr(cls.instance, name) 

356 

357 def __init__(cls, *args, **kwargs): 

358 super().__init__(cls) 

359 if cls.__class__.factory is None: 

360 is_datadog_enabled_defined = conf.has_option("metrics", "statsd_datadog_enabled") 

361 if is_datadog_enabled_defined and conf.getboolean("metrics", "statsd_datadog_enabled"): 

362 cls.__class__.factory = cls.get_dogstatsd_logger 

363 elif conf.getboolean("metrics", "statsd_on"): 

364 cls.__class__.factory = cls.get_statsd_logger 

365 else: 

366 cls.__class__.factory = DummyStatsLogger 

367 

368 @classmethod 

369 def get_statsd_logger(cls): 

370 """Returns logger for StatsD.""" 

371 # no need to check for the scheduler/statsd_on -> this method is only called when it is set 

372 # and previously it would crash with None is callable if it was called without it. 

373 from statsd import StatsClient 

374 

375 stats_class = conf.getimport("metrics", "statsd_custom_client_path", fallback=None) 

376 

377 if stats_class: 

378 if not issubclass(stats_class, StatsClient): 

379 raise AirflowConfigException( 

380 "Your custom StatsD client must extend the statsd.StatsClient in order to ensure " 

381 "backwards compatibility." 

382 ) 

383 else: 

384 log.info("Successfully loaded custom StatsD client") 

385 

386 else: 

387 stats_class = StatsClient 

388 

389 statsd = stats_class( 

390 host=conf.get("metrics", "statsd_host"), 

391 port=conf.getint("metrics", "statsd_port"), 

392 prefix=conf.get("metrics", "statsd_prefix"), 

393 ) 

394 allow_list_validator = AllowListValidator(conf.get("metrics", "statsd_allow_list", fallback=None)) 

395 return SafeStatsdLogger(statsd, allow_list_validator) 

396 

397 @classmethod 

398 def get_dogstatsd_logger(cls): 

399 """Get DataDog StatsD logger.""" 

400 from datadog import DogStatsd 

401 

402 dogstatsd = DogStatsd( 

403 host=conf.get("metrics", "statsd_host"), 

404 port=conf.getint("metrics", "statsd_port"), 

405 namespace=conf.get("metrics", "statsd_prefix"), 

406 constant_tags=cls.get_constant_tags(), 

407 ) 

408 dogstatsd_allow_list = conf.get("metrics", "statsd_allow_list", fallback=None) 

409 allow_list_validator = AllowListValidator(dogstatsd_allow_list) 

410 return SafeDogStatsdLogger(dogstatsd, allow_list_validator) 

411 

412 @classmethod 

413 def get_constant_tags(cls): 

414 """Get constant DataDog tags to add to all stats.""" 

415 tags = [] 

416 tags_in_string = conf.get("metrics", "statsd_datadog_tags", fallback=None) 

417 if tags_in_string is None or tags_in_string == "": 

418 return tags 

419 else: 

420 for key_value in tags_in_string.split(","): 

421 tags.append(key_value) 

422 return tags 

423 

424 

425if TYPE_CHECKING: 

426 Stats: StatsLogger 

427else: 

428 

429 class Stats(metaclass=_Stats): 

430 """Empty class for Stats - we use metaclass to inject the right one."""