1"""
2OpenTelemetry provider management for redis-py.
3
4This module handles initialization and lifecycle management of OTel SDK components
5including MeterProvider, TracerProvider (future), and LoggerProvider (future).
6
7Uses a singleton pattern - initialize once globally, all Redis clients use it automatically.
8
9Redis-py uses the global MeterProvider set by your application. Set it up before
10initializing observability:
11
12 from opentelemetry import metrics
13 from opentelemetry.sdk.metrics import MeterProvider
14
15 provider = MeterProvider(...)
16 metrics.set_meter_provider(provider)
17
18 # Then initialize redis-py observability
19 otel = get_observability_instance()
20 otel.init(OTelConfig(enable_metrics=True))
21"""
22
23import logging
24from typing import Optional
25
26from redis.observability.config import OTelConfig
27
28logger = logging.getLogger(__name__)
29
30# Optional imports - OTel SDK may not be installed
31try:
32 from opentelemetry.sdk.metrics import MeterProvider
33
34 OTEL_AVAILABLE = True
35except ImportError:
36 OTEL_AVAILABLE = False
37 MeterProvider = None
38
39# Global singleton instance
40_global_provider_manager: Optional["OTelProviderManager"] = None
41
42
43class OTelProviderManager:
44 """
45 Manages OpenTelemetry SDK providers and their lifecycle.
46
47 This class handles:
48 - Getting the global MeterProvider set by the application
49 - Configuring histogram bucket boundaries via Views
50 - Graceful shutdown
51
52 Args:
53 config: OTel configuration object
54 """
55
56 def __init__(self, config: OTelConfig):
57 self.config = config
58 self._meter_provider: Optional[MeterProvider] = None
59
60 def get_meter_provider(self) -> Optional[MeterProvider]:
61 """
62 Get the global MeterProvider set by the application.
63
64 Returns:
65 MeterProvider instance or None if metrics are disabled
66
67 Raises:
68 ImportError: If OpenTelemetry is not installed
69 RuntimeError: If metrics are enabled but no global MeterProvider is set
70 """
71 if not self.config.is_enabled():
72 return None
73
74 # Lazy import - only import OTel when metrics are enabled
75 try:
76 from opentelemetry import metrics
77 from opentelemetry.metrics import NoOpMeterProvider
78 except ImportError:
79 raise ImportError(
80 "OpenTelemetry is not installed. Install it with:\n"
81 " pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-http"
82 )
83
84 # Get the global MeterProvider
85 if self._meter_provider is None:
86 self._meter_provider = metrics.get_meter_provider()
87
88 # Check if it's a real provider (not NoOp)
89 if isinstance(self._meter_provider, NoOpMeterProvider):
90 raise RuntimeError(
91 "Metrics are enabled but no global MeterProvider is configured.\n"
92 "\n"
93 "Set up OpenTelemetry before initializing redis-py observability:\n"
94 "\n"
95 " from opentelemetry import metrics\n"
96 " from opentelemetry.sdk.metrics import MeterProvider\n"
97 " from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader\n"
98 " from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter\n"
99 "\n"
100 " # Create exporter\n"
101 " exporter = OTLPMetricExporter(\n"
102 " endpoint='http://localhost:4318/v1/metrics'\n"
103 " )\n"
104 "\n"
105 " # Create reader\n"
106 " reader = PeriodicExportingMetricReader(\n"
107 " exporter=exporter,\n"
108 " export_interval_millis=10000\n"
109 " )\n"
110 "\n"
111 " # Create and set global provider\n"
112 " provider = MeterProvider(metric_readers=[reader])\n"
113 " metrics.set_meter_provider(provider)\n"
114 "\n"
115 " # Now initialize redis-py observability\n"
116 " from redis.observability import get_observability_instance, OTelConfig\n"
117 " otel = get_observability_instance()\n"
118 " otel.init(OTelConfig(enable_metrics=True))\n"
119 )
120
121 logger.info("Using global MeterProvider from application")
122
123 return self._meter_provider
124
125 def shutdown(self, timeout_millis: int = 30000) -> bool:
126 """
127 Shutdown observability and flush any pending metrics.
128
129 Note: We don't shutdown the global MeterProvider since it's owned by the application.
130 We only force flush pending metrics.
131
132 Args:
133 timeout_millis: Maximum time to wait for flush
134
135 Returns:
136 True if flush was successful, False otherwise
137 """
138 logger.debug(
139 "Flushing metrics before shutdown (not shutting down global MeterProvider)"
140 )
141 return self.force_flush(timeout_millis=timeout_millis)
142
143 def force_flush(self, timeout_millis: int = 30000) -> bool:
144 """
145 Force flush any pending metrics from the global MeterProvider.
146
147 Args:
148 timeout_millis: Maximum time to wait for flush
149
150 Returns:
151 True if flush was successful, False otherwise
152 """
153 if self._meter_provider is None:
154 return True
155
156 # NoOpMeterProvider doesn't have force_flush method
157 if not hasattr(self._meter_provider, "force_flush"):
158 logger.debug("MeterProvider does not support force_flush, skipping")
159 return True
160
161 try:
162 logger.debug("Force flushing metrics from global MeterProvider")
163 self._meter_provider.force_flush(timeout_millis=timeout_millis)
164 return True
165 except Exception as e:
166 logger.error(f"Error flushing metrics: {e}")
167 return False
168
169 def __enter__(self):
170 """Context manager entry."""
171 return self
172
173 def __exit__(self, _exc_type, _exc_val, _exc_tb):
174 """Context manager exit - shutdown provider."""
175 self.shutdown()
176
177 def __repr__(self) -> str:
178 return f"OTelProviderManager(config={self.config})"
179
180
181# Singleton instance class
182
183
184class ObservabilityInstance:
185 """
186 Singleton instance for managing OpenTelemetry observability.
187
188 This class follows the singleton pattern similar to Glide's GetOtelInstance().
189 Use GetObservabilityInstance() to get the singleton instance, then call init()
190 to initialize observability.
191
192 Example:
193 >>> from redis.observability.config import OTelConfig
194 >>>
195 >>> # Get singleton instance
196 >>> otel = get_observability_instance()
197 >>>
198 >>> # Initialize once at app startup
199 >>> otel.init(OTelConfig())
200 >>>
201 >>> # All Redis clients now automatically collect metrics
202 >>> import redis
203 >>> r = redis.Redis(host='localhost', port=6379)
204 >>> r.set('key', 'value') # Metrics collected automatically
205 """
206
207 def __init__(self):
208 self._provider_manager: Optional[OTelProviderManager] = None
209
210 def init(self, config: OTelConfig) -> "ObservabilityInstance":
211 """
212 Initialize OpenTelemetry observability globally for all Redis clients.
213
214 This should be called once at application startup. After initialization,
215 all Redis clients will automatically collect and export metrics without
216 needing any additional configuration.
217
218 Safe to call multiple times - will shutdown previous instance before
219 initializing a new one.
220
221 Args:
222 config: OTel configuration object
223
224 Returns:
225 Self for method chaining
226
227 Example:
228 >>> otel = get_observability_instance()
229 >>> otel.init(OTelConfig())
230 """
231 if self._provider_manager is not None:
232 logger.warning(
233 "Observability already initialized. Shutting down previous instance."
234 )
235 self._provider_manager.shutdown()
236
237 self._provider_manager = OTelProviderManager(config)
238
239 logger.info("Observability initialized")
240
241 return self
242
243 def is_enabled(self) -> bool:
244 """
245 Check if observability is enabled.
246
247 Returns:
248 True if observability is initialized and metrics are enabled
249
250 Example:
251 >>> otel = get_observability_instance()
252 >>> if otel.is_enabled():
253 ... print("Metrics are being collected")
254 """
255 return (
256 self._provider_manager is not None
257 and self._provider_manager.config.is_enabled()
258 )
259
260 def get_provider_manager(self) -> Optional[OTelProviderManager]:
261 """
262 Get the provider manager instance.
263
264 Returns:
265 The provider manager, or None if not initialized
266
267 Example:
268 >>> otel = get_observability_instance()
269 >>> manager = otel.get_provider_manager()
270 >>> if manager is not None:
271 ... print(f"Observability enabled: {manager.config.is_enabled()}")
272 """
273 return self._provider_manager
274
275 def shutdown(self, timeout_millis: int = 30000) -> bool:
276 """
277 Shutdown observability and flush any pending metrics.
278
279 This should be called at application shutdown to ensure all metrics
280 are exported before the application exits.
281
282 Args:
283 timeout_millis: Maximum time to wait for shutdown
284
285 Returns:
286 True if shutdown was successful
287
288 Example:
289 >>> otel = get_observability_instance()
290 >>> # At application shutdown
291 >>> otel.shutdown()
292 """
293 if self._provider_manager is None:
294 logger.debug("Observability not initialized, nothing to shutdown")
295 return True
296
297 success = self._provider_manager.shutdown(timeout_millis)
298 self._provider_manager = None
299 logger.info("Observability shutdown")
300
301 return success
302
303 def force_flush(self, timeout_millis: int = 30000) -> bool:
304 """
305 Force flush all pending metrics immediately.
306
307 Useful for testing or when you want to ensure metrics are exported
308 before a specific point in your application.
309
310 Args:
311 timeout_millis: Maximum time to wait for flush
312
313 Returns:
314 True if flush was successful
315
316 Example:
317 >>> otel = get_observability_instance()
318 >>> # Execute some Redis commands
319 >>> r.set('key', 'value')
320 >>> # Force flush metrics immediately
321 >>> otel.force_flush()
322 """
323 if self._provider_manager is None:
324 logger.debug("Observability not initialized, nothing to flush")
325 return True
326
327 return self._provider_manager.force_flush(timeout_millis)
328
329
330# Global singleton instance
331_observability_instance: Optional[ObservabilityInstance] = None
332
333
334def get_observability_instance() -> ObservabilityInstance:
335 """
336 Get the global observability singleton instance.
337
338 This is the Pythonic way to get the singleton instance.
339
340 Returns:
341 The global ObservabilityInstance singleton
342
343 Example:
344 >>>
345 >>> otel = get_observability_instance()
346 >>> otel.init(OTelConfig())
347 """
348 global _observability_instance
349
350 if _observability_instance is None:
351 _observability_instance = ObservabilityInstance()
352
353 return _observability_instance
354
355
356def reset_observability_instance() -> None:
357 """
358 Reset the global observability singleton instance.
359
360 This is primarily used for testing and benchmarking to ensure
361 a clean state between test runs.
362
363 Warning:
364 This will shutdown any active provider manager and reset
365 the global state. Use with caution in production code.
366 """
367 global _observability_instance
368
369 if _observability_instance is not None:
370 _observability_instance.shutdown()
371 _observability_instance = None