1from enum import IntFlag, auto
2from typing import List, Optional, Sequence
3
4"""
5OpenTelemetry configuration for redis-py.
6
7This module handles configuration for OTel observability features,
8including parsing environment variables and validating settings.
9"""
10
11
12class MetricGroup(IntFlag):
13 """Metric groups that can be enabled/disabled."""
14
15 RESILIENCY = auto()
16 CONNECTION_BASIC = auto()
17 CONNECTION_ADVANCED = auto()
18 COMMAND = auto()
19 CSC = auto()
20 STREAMING = auto()
21 PUBSUB = auto()
22
23
24class TelemetryOption(IntFlag):
25 """Telemetry options to export."""
26
27 METRICS = auto()
28
29
30def default_operation_duration_buckets() -> Sequence[float]:
31 return [
32 0.0001,
33 0.00025,
34 0.0005,
35 0.001,
36 0.0025,
37 0.005,
38 0.01,
39 0.025,
40 0.05,
41 0.1,
42 0.25,
43 0.5,
44 1,
45 2.5,
46 ]
47
48
49def default_histogram_buckets() -> Sequence[float]:
50 return [0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10]
51
52
53class OTelConfig:
54 """
55 Configuration for OpenTelemetry observability in redis-py.
56
57 This class manages all OTel-related settings including metrics, traces (future),
58 and logs (future). Configuration can be provided via constructor parameters or
59 environment variables (OTEL_* spec).
60
61 Constructor parameters take precedence over environment variables.
62
63 Args:
64 enabled_telemetry: Enabled telemetry options to export (default: metrics). Traces and logs will be added
65 in future phases.
66 metric_groups: Group of metrics that should be exported.
67 include_commands: Explicit allowlist of commands to track
68 exclude_commands: Blocklist of commands to track
69 hide_pubsub_channel_names: If True, hide PubSub channel names in metrics (default: False)
70 hide_stream_names: If True, hide stream names in streaming metrics (default: False)
71
72 Note:
73 Redis-py uses the global MeterProvider set by your application.
74 Set it up before initializing observability:
75
76 from opentelemetry import metrics
77 from opentelemetry.sdk.metrics import MeterProvider
78 from opentelemetry.sdk.metrics._internal.view import View
79 from opentelemetry.sdk.metrics._internal.aggregation import ExplicitBucketHistogramAggregation
80
81 # Configure histogram bucket boundaries via Views
82 views = [
83 View(
84 instrument_name="db.client.operation.duration",
85 aggregation=ExplicitBucketHistogramAggregation(
86 boundaries=[0.0001, 0.00025, 0.0005, 0.001, 0.0025, 0.005,
87 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5]
88 ),
89 ),
90 # Add more views for other histograms...
91 ]
92
93 provider = MeterProvider(views=views, metric_readers=[reader])
94 metrics.set_meter_provider(provider)
95
96 # Then initialize redis-py observability
97 from redis.observability import get_observability_instance, OTelConfig
98 otel = get_observability_instance()
99 otel.init(OTelConfig())
100 """
101
102 DEFAULT_TELEMETRY = TelemetryOption.METRICS
103 DEFAULT_METRIC_GROUPS = MetricGroup.CONNECTION_BASIC | MetricGroup.RESILIENCY
104
105 def __init__(
106 self,
107 # Core enablement
108 enabled_telemetry: Optional[List[TelemetryOption]] = None,
109 # Metrics-specific
110 metric_groups: Optional[List[MetricGroup]] = None,
111 # Redis-specific telemetry controls
112 include_commands: Optional[List[str]] = None,
113 exclude_commands: Optional[List[str]] = None,
114 # Privacy controls
115 hide_pubsub_channel_names: bool = False,
116 hide_stream_names: bool = False,
117 # Bucket sizes
118 buckets_operation_duration: Sequence[
119 float
120 ] = default_operation_duration_buckets(),
121 buckets_stream_processing_duration: Sequence[
122 float
123 ] = default_histogram_buckets(),
124 buckets_connection_create_time: Sequence[float] = default_histogram_buckets(),
125 buckets_connection_wait_time: Sequence[float] = default_histogram_buckets(),
126 ):
127 # Core enablement
128 if enabled_telemetry is None:
129 self.enabled_telemetry = self.DEFAULT_TELEMETRY
130 else:
131 self.enabled_telemetry = TelemetryOption(0)
132 for option in enabled_telemetry:
133 self.enabled_telemetry |= option
134
135 # Enable default metrics if None given
136 if metric_groups is None:
137 self.metric_groups = self.DEFAULT_METRIC_GROUPS
138 else:
139 self.metric_groups = MetricGroup(0)
140 for metric_group in metric_groups:
141 self.metric_groups |= metric_group
142
143 # Redis-specific controls
144 self.include_commands = set(include_commands) if include_commands else None
145 self.exclude_commands = set(exclude_commands) if exclude_commands else set()
146
147 # Privacy controls for hiding sensitive names in metrics
148 self.hide_pubsub_channel_names = hide_pubsub_channel_names
149 self.hide_stream_names = hide_stream_names
150
151 # Bucket sizes
152 self.buckets_operation_duration = buckets_operation_duration
153 self.buckets_stream_processing_duration = buckets_stream_processing_duration
154 self.buckets_connection_create_time = buckets_connection_create_time
155 self.buckets_connection_wait_time = buckets_connection_wait_time
156
157 def is_enabled(self) -> bool:
158 """Check if any observability feature is enabled."""
159 return bool(self.enabled_telemetry)
160
161 def should_track_command(self, command_name: str) -> bool:
162 """
163 Determine if a command should be tracked based on include/exclude lists.
164
165 Args:
166 command_name: The Redis command name (e.g., 'GET', 'SET')
167
168 Returns:
169 True if the command should be tracked, False otherwise
170 """
171 command_upper = command_name.upper()
172
173 # If include list is specified, only track commands in the list
174 if self.include_commands is not None:
175 return command_upper in self.include_commands
176
177 # Otherwise, track all commands except those in exclude list
178 return command_upper not in self.exclude_commands
179
180 def __repr__(self) -> str:
181 return f"OTelConfig(enabled_telemetry={self.enabled_telemetry}"