Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/observability/providers.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

88 statements  

1""" 

2OpenTelemetry provider management for redis-py. 

3 

4This module handles initialization and lifecycle management of OTel SDK components 

5including MeterProvider, TracerProvider (future), and LoggerProvider (future). 

6 

7Uses a singleton pattern - initialize once globally, all Redis clients use it automatically. 

8 

9Redis-py uses the global MeterProvider set by your application. Set it up before 

10initializing observability: 

11 

12 from opentelemetry import metrics 

13 from opentelemetry.sdk.metrics import MeterProvider 

14 

15 provider = MeterProvider(...) 

16 metrics.set_meter_provider(provider) 

17 

18 # Then initialize redis-py observability 

19 otel = get_observability_instance() 

20 otel.init(OTelConfig(enable_metrics=True)) 

21""" 

22 

23import logging 

24from typing import Optional 

25 

26from redis.observability.config import OTelConfig 

27 

28logger = logging.getLogger(__name__) 

29 

30# Optional imports - OTel SDK may not be installed 

31try: 

32 from opentelemetry.sdk.metrics import MeterProvider 

33 

34 OTEL_AVAILABLE = True 

35except ImportError: 

36 OTEL_AVAILABLE = False 

37 MeterProvider = None 

38 

39# Global singleton instance 

40_global_provider_manager: Optional["OTelProviderManager"] = None 

41 

42 

43class OTelProviderManager: 

44 """ 

45 Manages OpenTelemetry SDK providers and their lifecycle. 

46 

47 This class handles: 

48 - Getting the global MeterProvider set by the application 

49 - Configuring histogram bucket boundaries via Views 

50 - Graceful shutdown 

51 

52 Args: 

53 config: OTel configuration object 

54 """ 

55 

56 def __init__(self, config: OTelConfig): 

57 self.config = config 

58 self._meter_provider: Optional[MeterProvider] = None 

59 

60 def get_meter_provider(self) -> Optional[MeterProvider]: 

61 """ 

62 Get the global MeterProvider set by the application. 

63 

64 Returns: 

65 MeterProvider instance or None if metrics are disabled 

66 

67 Raises: 

68 ImportError: If OpenTelemetry is not installed 

69 RuntimeError: If metrics are enabled but no global MeterProvider is set 

70 """ 

71 if not self.config.is_enabled(): 

72 return None 

73 

74 # Lazy import - only import OTel when metrics are enabled 

75 try: 

76 from opentelemetry import metrics 

77 from opentelemetry.metrics import NoOpMeterProvider 

78 except ImportError: 

79 raise ImportError( 

80 "OpenTelemetry is not installed. Install it with:\n" 

81 " pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-http" 

82 ) 

83 

84 # Get the global MeterProvider 

85 if self._meter_provider is None: 

86 self._meter_provider = metrics.get_meter_provider() 

87 

88 # Check if it's a real provider (not NoOp) 

89 if isinstance(self._meter_provider, NoOpMeterProvider): 

90 raise RuntimeError( 

91 "Metrics are enabled but no global MeterProvider is configured.\n" 

92 "\n" 

93 "Set up OpenTelemetry before initializing redis-py observability:\n" 

94 "\n" 

95 " from opentelemetry import metrics\n" 

96 " from opentelemetry.sdk.metrics import MeterProvider\n" 

97 " from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader\n" 

98 " from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter\n" 

99 "\n" 

100 " # Create exporter\n" 

101 " exporter = OTLPMetricExporter(\n" 

102 " endpoint='http://localhost:4318/v1/metrics'\n" 

103 " )\n" 

104 "\n" 

105 " # Create reader\n" 

106 " reader = PeriodicExportingMetricReader(\n" 

107 " exporter=exporter,\n" 

108 " export_interval_millis=10000\n" 

109 " )\n" 

110 "\n" 

111 " # Create and set global provider\n" 

112 " provider = MeterProvider(metric_readers=[reader])\n" 

113 " metrics.set_meter_provider(provider)\n" 

114 "\n" 

115 " # Now initialize redis-py observability\n" 

116 " from redis.observability import get_observability_instance, OTelConfig\n" 

117 " otel = get_observability_instance()\n" 

118 " otel.init(OTelConfig(enable_metrics=True))\n" 

119 ) 

120 

121 logger.info("Using global MeterProvider from application") 

122 

123 return self._meter_provider 

124 

125 def shutdown(self, timeout_millis: int = 30000) -> bool: 

126 """ 

127 Shutdown observability and flush any pending metrics. 

128 

129 Note: We don't shutdown the global MeterProvider since it's owned by the application. 

130 We only force flush pending metrics. 

131 

132 Args: 

133 timeout_millis: Maximum time to wait for flush 

134 

135 Returns: 

136 True if flush was successful, False otherwise 

137 """ 

138 logger.debug( 

139 "Flushing metrics before shutdown (not shutting down global MeterProvider)" 

140 ) 

141 return self.force_flush(timeout_millis=timeout_millis) 

142 

143 def force_flush(self, timeout_millis: int = 30000) -> bool: 

144 """ 

145 Force flush any pending metrics from the global MeterProvider. 

146 

147 Args: 

148 timeout_millis: Maximum time to wait for flush 

149 

150 Returns: 

151 True if flush was successful, False otherwise 

152 """ 

153 if self._meter_provider is None: 

154 return True 

155 

156 # NoOpMeterProvider doesn't have force_flush method 

157 if not hasattr(self._meter_provider, "force_flush"): 

158 logger.debug("MeterProvider does not support force_flush, skipping") 

159 return True 

160 

161 try: 

162 logger.debug("Force flushing metrics from global MeterProvider") 

163 self._meter_provider.force_flush(timeout_millis=timeout_millis) 

164 return True 

165 except Exception as e: 

166 logger.error(f"Error flushing metrics: {e}") 

167 return False 

168 

169 def __enter__(self): 

170 """Context manager entry.""" 

171 return self 

172 

173 def __exit__(self, _exc_type, _exc_val, _exc_tb): 

174 """Context manager exit - shutdown provider.""" 

175 self.shutdown() 

176 

177 def __repr__(self) -> str: 

178 return f"OTelProviderManager(config={self.config})" 

179 

180 

181# Singleton instance class 

182 

183 

184class ObservabilityInstance: 

185 """ 

186 Singleton instance for managing OpenTelemetry observability. 

187 

188 This class follows the singleton pattern similar to Glide's GetOtelInstance(). 

189 Use GetObservabilityInstance() to get the singleton instance, then call init() 

190 to initialize observability. 

191 

192 Example: 

193 >>> from redis.observability.config import OTelConfig 

194 >>> 

195 >>> # Get singleton instance 

196 >>> otel = get_observability_instance() 

197 >>> 

198 >>> # Initialize once at app startup 

199 >>> otel.init(OTelConfig()) 

200 >>> 

201 >>> # All Redis clients now automatically collect metrics 

202 >>> import redis 

203 >>> r = redis.Redis(host='localhost', port=6379) 

204 >>> r.set('key', 'value') # Metrics collected automatically 

205 """ 

206 

207 def __init__(self): 

208 self._provider_manager: Optional[OTelProviderManager] = None 

209 

210 def init(self, config: OTelConfig) -> "ObservabilityInstance": 

211 """ 

212 Initialize OpenTelemetry observability globally for all Redis clients. 

213 

214 This should be called once at application startup. After initialization, 

215 all Redis clients will automatically collect and export metrics without 

216 needing any additional configuration. 

217 

218 Safe to call multiple times - will shutdown previous instance before 

219 initializing a new one. 

220 

221 Args: 

222 config: OTel configuration object 

223 

224 Returns: 

225 Self for method chaining 

226 

227 Example: 

228 >>> otel = get_observability_instance() 

229 >>> otel.init(OTelConfig()) 

230 """ 

231 if self._provider_manager is not None: 

232 logger.warning( 

233 "Observability already initialized. Shutting down previous instance." 

234 ) 

235 self._provider_manager.shutdown() 

236 

237 self._provider_manager = OTelProviderManager(config) 

238 

239 logger.info("Observability initialized") 

240 

241 return self 

242 

243 def is_enabled(self) -> bool: 

244 """ 

245 Check if observability is enabled. 

246 

247 Returns: 

248 True if observability is initialized and metrics are enabled 

249 

250 Example: 

251 >>> otel = get_observability_instance() 

252 >>> if otel.is_enabled(): 

253 ... print("Metrics are being collected") 

254 """ 

255 return ( 

256 self._provider_manager is not None 

257 and self._provider_manager.config.is_enabled() 

258 ) 

259 

260 def get_provider_manager(self) -> Optional[OTelProviderManager]: 

261 """ 

262 Get the provider manager instance. 

263 

264 Returns: 

265 The provider manager, or None if not initialized 

266 

267 Example: 

268 >>> otel = get_observability_instance() 

269 >>> manager = otel.get_provider_manager() 

270 >>> if manager is not None: 

271 ... print(f"Observability enabled: {manager.config.is_enabled()}") 

272 """ 

273 return self._provider_manager 

274 

275 def shutdown(self, timeout_millis: int = 30000) -> bool: 

276 """ 

277 Shutdown observability and flush any pending metrics. 

278 

279 This should be called at application shutdown to ensure all metrics 

280 are exported before the application exits. 

281 

282 Args: 

283 timeout_millis: Maximum time to wait for shutdown 

284 

285 Returns: 

286 True if shutdown was successful 

287 

288 Example: 

289 >>> otel = get_observability_instance() 

290 >>> # At application shutdown 

291 >>> otel.shutdown() 

292 """ 

293 if self._provider_manager is None: 

294 logger.debug("Observability not initialized, nothing to shutdown") 

295 return True 

296 

297 success = self._provider_manager.shutdown(timeout_millis) 

298 self._provider_manager = None 

299 logger.info("Observability shutdown") 

300 

301 return success 

302 

303 def force_flush(self, timeout_millis: int = 30000) -> bool: 

304 """ 

305 Force flush all pending metrics immediately. 

306 

307 Useful for testing or when you want to ensure metrics are exported 

308 before a specific point in your application. 

309 

310 Args: 

311 timeout_millis: Maximum time to wait for flush 

312 

313 Returns: 

314 True if flush was successful 

315 

316 Example: 

317 >>> otel = get_observability_instance() 

318 >>> # Execute some Redis commands 

319 >>> r.set('key', 'value') 

320 >>> # Force flush metrics immediately 

321 >>> otel.force_flush() 

322 """ 

323 if self._provider_manager is None: 

324 logger.debug("Observability not initialized, nothing to flush") 

325 return True 

326 

327 return self._provider_manager.force_flush(timeout_millis) 

328 

329 

330# Global singleton instance 

331_observability_instance: Optional[ObservabilityInstance] = None 

332 

333 

334def get_observability_instance() -> ObservabilityInstance: 

335 """ 

336 Get the global observability singleton instance. 

337 

338 This is the Pythonic way to get the singleton instance. 

339 

340 Returns: 

341 The global ObservabilityInstance singleton 

342 

343 Example: 

344 >>> 

345 >>> otel = get_observability_instance() 

346 >>> otel.init(OTelConfig()) 

347 """ 

348 global _observability_instance 

349 

350 if _observability_instance is None: 

351 _observability_instance = ObservabilityInstance() 

352 

353 return _observability_instance 

354 

355 

356def reset_observability_instance() -> None: 

357 """ 

358 Reset the global observability singleton instance. 

359 

360 This is primarily used for testing and benchmarking to ensure 

361 a clean state between test runs. 

362 

363 Warning: 

364 This will shutdown any active provider manager and reset 

365 the global state. Use with caution in production code. 

366 """ 

367 global _observability_instance 

368 

369 if _observability_instance is not None: 

370 _observability_instance.shutdown() 

371 _observability_instance = None