1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20import logging
21import os
22import re
23import socket
24from collections.abc import Callable
25from typing import TYPE_CHECKING
26
27from .base_stats_logger import NoStatsLogger
28
29if TYPE_CHECKING:
30 from .base_stats_logger import StatsLogger
31
32log = logging.getLogger(__name__)
33
34_VALID_STAT_NAME_CHARS_RE = re.compile(r"^[a-zA-Z0-9_.-]+$")
35_INVALID_STAT_NAME_CHARS_RE = re.compile(r"[^a-zA-Z0-9_.-]")
36
37
38def normalize_name_for_stats(name: str, log_warning: bool = True) -> str:
39 """
40 Normalize a name for stats reporting by replacing invalid characters.
41
42 Stats names must only contain ASCII alphabets, numbers, underscores, dots, and dashes.
43 Invalid characters are replaced with underscores.
44
45 :param name: The name to normalize
46 :param log_warning: Whether to log a warning when normalization occurs
47 :return: Normalized name safe for stats reporting
48 """
49 if _VALID_STAT_NAME_CHARS_RE.match(name):
50 return name
51
52 normalized = _INVALID_STAT_NAME_CHARS_RE.sub("_", name)
53
54 if log_warning:
55 log.warning(
56 "Name '%s' contains invalid characters for stats reporting. "
57 "Reporting stats with normalized name '%s'.",
58 name,
59 normalized,
60 )
61
62 return normalized
63
64
65class _Stats(type):
66 factory: Callable[[], StatsLogger | NoStatsLogger] | None = None
67 instance: StatsLogger | NoStatsLogger | None = None
68
69 def __getattr__(cls, name: str) -> str:
70 factory = type.__getattribute__(cls, "factory")
71 instance = type.__getattribute__(cls, "instance")
72
73 # When using OpenTelemetry, some subprocesses are short-lived and
74 # often exit before flushing any metrics.
75 #
76 # The solution is to register a hook that performs a force flush at exit.
77 # The atexit hook is registered when initializing the instance.
78 #
79 # The instance gets initialized once per process. In case a process is forked, then
80 # the new subprocess, will inherit the already initialized instance of the parent process.
81 #
82 # Store the instance pid so that it can be compared with the current pid
83 # to decide whether to initialize the instance again or not.
84 #
85 # So far, all forks are resetting their state to remove anything inherited by the parent.
86 # But in the future that might not always be true.
87 current_pid = os.getpid()
88 if cls.instance and cls._instance_pid != current_pid:
89 log.info(
90 "Stats instance was created in PID %s but accessed in PID %s. Re-initializing.",
91 cls._instance_pid,
92 current_pid,
93 )
94 # Setting the instance to None, will force re-initialization.
95 cls.instance = None
96 cls._instance_pid = None
97
98 if instance is None:
99 if factory is None:
100 factory = NoStatsLogger
101 type.__setattr__(cls, "factory", factory)
102
103 try:
104 instance = factory()
105 cls._instance_pid = current_pid
106 except (socket.gaierror, ImportError) as e:
107 log.error("Could not configure StatsClient: %s, using NoStatsLogger instead.", e)
108 instance = NoStatsLogger()
109 cls._instance_pid = current_pid
110
111 type.__setattr__(cls, "instance", instance)
112
113 return getattr(instance, name)
114
115 def initialize(cls, *, is_statsd_datadog_enabled: bool, is_statsd_on: bool, is_otel_on: bool) -> None:
116 type.__setattr__(cls, "factory", None)
117 type.__setattr__(cls, "instance", None)
118 factory: Callable
119
120 if is_statsd_datadog_enabled:
121 from airflow.observability.metrics import datadog_logger
122
123 # Datadog needs the cls param, so wrap it into a 0-arg factory.
124 factory = lambda: datadog_logger.get_dogstatsd_logger(cls)
125 elif is_statsd_on:
126 from airflow.observability.metrics import statsd_logger
127
128 factory = statsd_logger.get_statsd_logger
129 elif is_otel_on:
130 from airflow.observability.metrics import otel_logger
131
132 factory = otel_logger.get_otel_logger
133 else:
134 factory = NoStatsLogger
135
136 type.__setattr__(cls, "factory", factory)
137
138 @classmethod
139 def get_constant_tags(cls, *, tags_in_string: str | None) -> list[str]:
140 """Get constant DataDog tags to add to all stats."""
141 if not tags_in_string:
142 return []
143 return tags_in_string.split(",")
144
145
146if TYPE_CHECKING:
147 Stats: StatsLogger
148else:
149
150 class Stats(metaclass=_Stats):
151 """Empty class for Stats - we use metaclass to inject the right one."""