Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/observability/recorder.py: 14%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

291 statements  

1""" 

2Simple, clean API for recording observability metrics. 

3 

4This module provides a straightforward interface for Redis core code to record 

5metrics without needing to know about OpenTelemetry internals. 

6 

7Usage in Redis core code: 

8 from redis.observability.recorder import record_operation_duration 

9 

10 start_time = time.monotonic() 

11 # ... execute Redis command ... 

12 record_operation_duration( 

13 command_name='SET', 

14 duration_seconds=time.monotonic() - start_time, 

15 server_address='localhost', 

16 server_port=6379, 

17 db_namespace='0', 

18 error=None 

19 ) 

20""" 

21 

22from datetime import datetime 

23from typing import TYPE_CHECKING, Callable, List, Optional 

24 

25from redis.observability.attributes import ( 

26 AttributeBuilder, 

27 ConnectionState, 

28 CSCReason, 

29 CSCResult, 

30 GeoFailoverReason, 

31 PubSubDirection, 

32) 

33from redis.observability.metrics import CloseReason, RedisMetricsCollector 

34from redis.observability.providers import get_observability_instance 

35from redis.observability.registry import get_observables_registry_instance 

36from redis.utils import deprecated_args, deprecated_function, str_if_bytes 

37 

38if TYPE_CHECKING: 

39 from redis.connection import ConnectionPoolInterface 

40 from redis.multidb.database import SyncDatabase 

41 from redis.observability.config import OTelConfig 

42 

43# Global metrics collector instance (lazy-initialized) 

44_metrics_collector: Optional[RedisMetricsCollector] = None 

45 

46CSC_ITEMS_REGISTRY_KEY = "csc_items" 

47CONNECTION_COUNT_REGISTRY_KEY = "connection_count" 

48 

49 

50@deprecated_args( 

51 args_to_warn=["batch_size"], 

52 reason="The batch_size argument is no longer used and will be removed in the next major version.", 

53 version="7.2.1", 

54) 

55def record_operation_duration( 

56 command_name: str, 

57 duration_seconds: float, 

58 server_address: Optional[str] = None, 

59 server_port: Optional[int] = None, 

60 db_namespace: Optional[str] = None, 

61 error: Optional[Exception] = None, 

62 is_blocking: Optional[bool] = None, 

63 batch_size: Optional[int] = None, # noqa 

64 retry_attempts: Optional[int] = None, 

65) -> None: 

66 """ 

67 Record a Redis command execution duration. 

68 

69 This is a simple, clean API that Redis core code can call directly. 

70 If observability is not enabled, this returns immediately with zero overhead. 

71 

72 Args: 

73 command_name: Redis command name (e.g., 'GET', 'SET') 

74 duration_seconds: Command execution time in seconds 

75 server_address: Redis server address 

76 server_port: Redis server port 

77 db_namespace: Redis database index 

78 error: Exception if command failed, None if successful 

79 is_blocking: Whether the operation is a blocking command 

80 batch_size: Number of commands in batch (for pipelines/transactions) 

81 retry_attempts: Number of retry attempts made 

82 

83 Example: 

84 >>> start = time.monotonic() 

85 >>> # ... execute command ... 

86 >>> record_operation_duration('SET', time.monotonic() - start, 'localhost', 6379, '0') 

87 """ 

88 global _metrics_collector 

89 

90 # Fast path: if collector not initialized, observability is disabled 

91 if _metrics_collector is None: 

92 # Try to initialize (only once) 

93 _metrics_collector = _get_or_create_collector() 

94 if _metrics_collector is None: 

95 return # Observability not enabled 

96 

97 # Record the metric 

98 try: 

99 _metrics_collector.record_operation_duration( 

100 command_name=command_name, 

101 duration_seconds=duration_seconds, 

102 server_address=server_address, 

103 server_port=server_port, 

104 db_namespace=db_namespace, 

105 error_type=error, 

106 network_peer_address=server_address, 

107 network_peer_port=server_port, 

108 is_blocking=is_blocking, 

109 retry_attempts=retry_attempts, 

110 ) 

111 except Exception: 

112 # Don't let metric recording errors break Redis operations 

113 pass 

114 

115 

116def record_connection_create_time( 

117 connection_pool: "ConnectionPoolInterface", 

118 duration_seconds: float, 

119) -> None: 

120 """ 

121 Record connection creation time. 

122 

123 Args: 

124 connection_pool: Connection pool implementation 

125 duration_seconds: Time taken to create connection in seconds 

126 

127 Example: 

128 >>> start = time.monotonic() 

129 >>> # ... create connection ... 

130 >>> record_connection_create_time('ConnectionPool<localhost:6379>', time.monotonic() - start) 

131 """ 

132 global _metrics_collector 

133 

134 # Fast path: if collector not initialized, observability is disabled 

135 if _metrics_collector is None: 

136 _metrics_collector = _get_or_create_collector() 

137 if _metrics_collector is None: 

138 return 

139 

140 try: 

141 _metrics_collector.record_connection_create_time( 

142 connection_pool=connection_pool, 

143 duration_seconds=duration_seconds, 

144 ) 

145 except Exception: 

146 pass 

147 

148 

149def record_connection_count( 

150 pool_name: str, 

151 connection_state: ConnectionState, 

152 counter: int = 1, 

153) -> None: 

154 """ 

155 Record a connection count change for a single state. 

156 

157 Args: 

158 pool_name: Connection pool identifier 

159 connection_state: State to update (IDLE or USED) 

160 counter: Number to add (positive) or subtract (negative) 

161 

162 Example: 

163 # New connection created (goes to IDLE first) 

164 >>> record_connection_count('pool_abc123', ConnectionState.IDLE, 1) 

165 

166 # Acquire from pool (transition) 

167 >>> record_connection_count('pool_abc123', ConnectionState.IDLE, -1) 

168 >>> record_connection_count('pool_abc123', ConnectionState.USED, 1) 

169 

170 # Release to pool (transition) 

171 >>> record_connection_count('pool_abc123', ConnectionState.USED, -1) 

172 >>> record_connection_count('pool_abc123', ConnectionState.IDLE, 1) 

173 

174 # Pool disconnect 5 idle connections 

175 >>> record_connection_count('pool_abc123', ConnectionState.IDLE, -5) 

176 """ 

177 global _metrics_collector 

178 

179 if _metrics_collector is None: 

180 _metrics_collector = _get_or_create_collector() 

181 if _metrics_collector is None: 

182 return 

183 

184 try: 

185 _metrics_collector.record_connection_count( 

186 pool_name=pool_name, 

187 connection_state=connection_state, 

188 counter=counter, 

189 ) 

190 except Exception: 

191 pass 

192 

193 

194@deprecated_function( 

195 reason="Connection count is now tracked via record_connection_count(). " 

196 "This functionality will be removed in the next major version", 

197 version="7.4.0", 

198) 

199def init_connection_count() -> None: 

200 """ 

201 Initialize observable gauge for connection count metric. 

202 """ 

203 collector = _get_or_create_collector() 

204 if collector is None: 

205 return 

206 

207 def observable_callback(__): 

208 observables_registry = get_observables_registry_instance() 

209 callbacks = observables_registry.get(CONNECTION_COUNT_REGISTRY_KEY) 

210 observations = [] 

211 

212 for callback in callbacks: 

213 observations.extend(callback()) 

214 

215 return observations 

216 

217 try: 

218 collector.init_connection_count( 

219 callback=observable_callback, 

220 ) 

221 except Exception: 

222 pass 

223 

224 

225@deprecated_function( 

226 reason="Connection count is now tracked via record_connection_count(). " 

227 "This functionality will be removed in the next major version", 

228 version="7.4.0", 

229) 

230def register_pools_connection_count( 

231 connection_pools: List["ConnectionPoolInterface"], 

232) -> None: 

233 """ 

234 Add connection pools to connection count observable registry. 

235 """ 

236 collector = _get_or_create_collector() 

237 if collector is None: 

238 return 

239 

240 try: 

241 # Lazy import 

242 from opentelemetry.metrics import Observation 

243 

244 def connection_count_callback(): 

245 observations = [] 

246 for connection_pool in connection_pools: 

247 for count, attributes in connection_pool.get_connection_count(): 

248 observations.append(Observation(count, attributes=attributes)) 

249 return observations 

250 

251 observables_registry = get_observables_registry_instance() 

252 observables_registry.register( 

253 CONNECTION_COUNT_REGISTRY_KEY, connection_count_callback 

254 ) 

255 except Exception: 

256 pass 

257 

258 

259def record_connection_timeout( 

260 pool_name: str, 

261) -> None: 

262 """ 

263 Record a connection timeout event. 

264 

265 Args: 

266 pool_name: Connection pool identifier 

267 

268 Example: 

269 >>> record_connection_timeout('ConnectionPool<localhost:6379>') 

270 """ 

271 global _metrics_collector 

272 

273 if _metrics_collector is None: 

274 _metrics_collector = _get_or_create_collector() 

275 if _metrics_collector is None: 

276 return 

277 

278 try: 

279 _metrics_collector.record_connection_timeout( 

280 pool_name=pool_name, 

281 ) 

282 except Exception: 

283 pass 

284 

285 

286def record_connection_wait_time( 

287 pool_name: str, 

288 duration_seconds: float, 

289) -> None: 

290 """ 

291 Record time taken to obtain a connection from the pool. 

292 

293 Args: 

294 pool_name: Connection pool identifier 

295 duration_seconds: Wait time in seconds 

296 

297 Example: 

298 >>> start = time.monotonic() 

299 >>> # ... wait for connection from pool ... 

300 >>> record_connection_wait_time('ConnectionPool<localhost:6379>', time.monotonic() - start) 

301 """ 

302 global _metrics_collector 

303 

304 if _metrics_collector is None: 

305 _metrics_collector = _get_or_create_collector() 

306 if _metrics_collector is None: 

307 return 

308 

309 try: 

310 _metrics_collector.record_connection_wait_time( 

311 pool_name=pool_name, 

312 duration_seconds=duration_seconds, 

313 ) 

314 except Exception: 

315 pass 

316 

317 

318def record_connection_closed( 

319 close_reason: Optional[CloseReason] = None, 

320 error_type: Optional[Exception] = None, 

321) -> None: 

322 """ 

323 Record a connection closed event. 

324 

325 Args: 

326 close_reason: Reason for closing (e.g. 'error', 'application_close') 

327 error_type: Error type if closed due to error 

328 

329 Example: 

330 >>> record_connection_closed('ConnectionPool<localhost:6379>', 'idle_timeout') 

331 """ 

332 global _metrics_collector 

333 

334 if _metrics_collector is None: 

335 _metrics_collector = _get_or_create_collector() 

336 if _metrics_collector is None: 

337 return 

338 

339 try: 

340 _metrics_collector.record_connection_closed( 

341 close_reason=close_reason, 

342 error_type=error_type, 

343 ) 

344 except Exception: 

345 pass 

346 

347 

348def record_connection_relaxed_timeout( 

349 connection_name: str, 

350 maint_notification: str, 

351 relaxed: bool, 

352) -> None: 

353 """ 

354 Record a connection timeout relaxation event. 

355 

356 Args: 

357 connection_name: Connection identifier 

358 maint_notification: Maintenance notification type 

359 relaxed: True to count up (relaxed), False to count down (unrelaxed) 

360 

361 Example: 

362 >>> record_connection_relaxed_timeout('localhost:6379_a1b2c3d4', 'MOVING', True) 

363 """ 

364 global _metrics_collector 

365 

366 if _metrics_collector is None: 

367 _metrics_collector = _get_or_create_collector() 

368 if _metrics_collector is None: 

369 return 

370 

371 try: 

372 _metrics_collector.record_connection_relaxed_timeout( 

373 connection_name=connection_name, 

374 maint_notification=maint_notification, 

375 relaxed=relaxed, 

376 ) 

377 except Exception: 

378 pass 

379 

380 

381def record_connection_handoff( 

382 pool_name: str, 

383) -> None: 

384 """ 

385 Record a connection handoff event (e.g., after MOVING notification). 

386 

387 Args: 

388 pool_name: Connection pool identifier 

389 

390 Example: 

391 >>> record_connection_handoff('ConnectionPool<localhost:6379>') 

392 """ 

393 global _metrics_collector 

394 

395 if _metrics_collector is None: 

396 _metrics_collector = _get_or_create_collector() 

397 if _metrics_collector is None: 

398 return 

399 

400 try: 

401 _metrics_collector.record_connection_handoff( 

402 pool_name=pool_name, 

403 ) 

404 except Exception: 

405 pass 

406 

407 

408def record_error_count( 

409 server_address: Optional[str] = None, 

410 server_port: Optional[int] = None, 

411 network_peer_address: Optional[str] = None, 

412 network_peer_port: Optional[int] = None, 

413 error_type: Optional[Exception] = None, 

414 retry_attempts: Optional[int] = None, 

415 is_internal: bool = True, 

416) -> None: 

417 """ 

418 Record error count. 

419 

420 Args: 

421 server_address: Server address 

422 server_port: Server port 

423 network_peer_address: Network peer address 

424 network_peer_port: Network peer port 

425 error_type: Error type (Exception) 

426 retry_attempts: Retry attempts 

427 is_internal: Whether the error is internal (e.g., timeout, network error) 

428 

429 Example: 

430 >>> record_error_count('localhost', 6379, 'localhost', 6379, ConnectionError(), 3) 

431 """ 

432 global _metrics_collector 

433 

434 if _metrics_collector is None: 

435 _metrics_collector = _get_or_create_collector() 

436 if _metrics_collector is None: 

437 return 

438 

439 try: 

440 _metrics_collector.record_error_count( 

441 server_address=server_address, 

442 server_port=server_port, 

443 network_peer_address=network_peer_address, 

444 network_peer_port=network_peer_port, 

445 error_type=error_type, 

446 retry_attempts=retry_attempts, 

447 is_internal=is_internal, 

448 ) 

449 except Exception: 

450 pass 

451 

452 

453def record_pubsub_message( 

454 direction: PubSubDirection, 

455 channel: Optional[str] = None, 

456 sharded: Optional[bool] = None, 

457) -> None: 

458 """ 

459 Record a PubSub message (published or received). 

460 

461 Args: 

462 direction: Message direction ('publish' or 'receive') 

463 channel: Pub/Sub channel name 

464 sharded: True if sharded Pub/Sub channel 

465 

466 Example: 

467 >>> record_pubsub_message(PubSubDirection.PUBLISH, 'channel', False) 

468 """ 

469 global _metrics_collector 

470 

471 if _metrics_collector is None: 

472 _metrics_collector = _get_or_create_collector() 

473 if _metrics_collector is None: 

474 return 

475 

476 # Check if channel names should be hidden 

477 effective_channel = channel 

478 if channel is not None: 

479 config = _get_config() 

480 if config is not None and config.hide_pubsub_channel_names: 

481 effective_channel = None 

482 

483 try: 

484 _metrics_collector.record_pubsub_message( 

485 direction=direction, 

486 channel=effective_channel, 

487 sharded=sharded, 

488 ) 

489 except Exception: 

490 pass 

491 

492 

493@deprecated_args( 

494 args_to_warn=["consumer_name"], 

495 reason="The consumer_name argument is no longer used and will be removed in the next major version.", 

496 version="7.2.1", 

497) 

498def record_streaming_lag( 

499 lag_seconds: float, 

500 stream_name: Optional[str] = None, 

501 consumer_group: Optional[str] = None, 

502 consumer_name: Optional[str] = None, # noqa 

503) -> None: 

504 """ 

505 Record the lag of a streaming message. 

506 

507 Args: 

508 lag_seconds: Lag in seconds 

509 stream_name: Stream name 

510 consumer_group: Consumer group name 

511 consumer_name: Consumer name 

512 """ 

513 global _metrics_collector 

514 

515 if _metrics_collector is None: 

516 _metrics_collector = _get_or_create_collector() 

517 if _metrics_collector is None: 

518 return 

519 

520 # Check if stream names should be hidden 

521 effective_stream_name = stream_name 

522 if stream_name is not None: 

523 config = _get_config() 

524 if config is not None and config.hide_stream_names: 

525 effective_stream_name = None 

526 

527 try: 

528 _metrics_collector.record_streaming_lag( 

529 lag_seconds=lag_seconds, 

530 stream_name=effective_stream_name, 

531 consumer_group=consumer_group, 

532 ) 

533 except Exception: 

534 pass 

535 

536 

537@deprecated_args( 

538 args_to_warn=["consumer_name"], 

539 reason="The consumer_name argument is no longer used and will be removed in the next major version.", 

540 version="7.2.1", 

541) 

542def record_streaming_lag_from_response( 

543 response, 

544 consumer_group: Optional[str] = None, 

545 consumer_name: Optional[str] = None, # noqa 

546) -> None: 

547 """ 

548 Record streaming lag from XREAD/XREADGROUP response. 

549 

550 Parses the response and calculates lag for each message based on message ID timestamp. 

551 

552 Args: 

553 response: Response from XREAD/XREADGROUP command 

554 consumer_group: Consumer group name (for XREADGROUP) 

555 consumer_name: Consumer name (for XREADGROUP) 

556 """ 

557 

558 global _metrics_collector 

559 

560 if _metrics_collector is None: 

561 _metrics_collector = _get_or_create_collector() 

562 if _metrics_collector is None: 

563 return 

564 

565 if not response: 

566 return 

567 

568 try: 

569 now = datetime.now().timestamp() 

570 

571 # Check if stream names should be hidden 

572 config = _get_config() 

573 hide_stream_names = config is not None and config.hide_stream_names 

574 

575 # RESP3 format: dict 

576 if isinstance(response, dict): 

577 for stream_name, stream_messages in response.items(): 

578 effective_stream_name = ( 

579 None if hide_stream_names else str_if_bytes(stream_name) 

580 ) 

581 for messages in stream_messages: 

582 for message in messages: 

583 message_id, _ = message 

584 message_id = str_if_bytes(message_id) 

585 timestamp, _ = message_id.split("-") 

586 # Ensure lag is non-negative (clock skew can cause negative values) 

587 lag_seconds = max(0.0, now - int(timestamp) / 1000) 

588 

589 _metrics_collector.record_streaming_lag( 

590 lag_seconds=lag_seconds, 

591 stream_name=effective_stream_name, 

592 consumer_group=consumer_group, 

593 ) 

594 else: 

595 # RESP2 format: list 

596 for stream_entry in response: 

597 stream_name = str_if_bytes(stream_entry[0]) 

598 effective_stream_name = None if hide_stream_names else stream_name 

599 

600 for message in stream_entry[1]: 

601 message_id, _ = message 

602 message_id = str_if_bytes(message_id) 

603 timestamp, _ = message_id.split("-") 

604 # Ensure lag is non-negative (clock skew can cause negative values) 

605 lag_seconds = max(0.0, now - int(timestamp) / 1000) 

606 

607 _metrics_collector.record_streaming_lag( 

608 lag_seconds=lag_seconds, 

609 stream_name=effective_stream_name, 

610 consumer_group=consumer_group, 

611 ) 

612 except Exception: 

613 pass 

614 

615 

616def record_maint_notification_count( 

617 server_address: str, 

618 server_port: int, 

619 network_peer_address: str, 

620 network_peer_port: int, 

621 maint_notification: str, 

622) -> None: 

623 """ 

624 Record a maintenance notification count. 

625 

626 Args: 

627 server_address: Server address 

628 server_port: Server port 

629 network_peer_address: Network peer address 

630 network_peer_port: Network peer port 

631 maint_notification: Maintenance notification type (e.g., 'MOVING', 'MIGRATING') 

632 

633 Example: 

634 >>> record_maint_notification_count('localhost', 6379, 'localhost', 6379, 'MOVING') 

635 """ 

636 global _metrics_collector 

637 

638 if _metrics_collector is None: 

639 _metrics_collector = _get_or_create_collector() 

640 if _metrics_collector is None: 

641 return 

642 

643 try: 

644 _metrics_collector.record_maint_notification_count( 

645 server_address=server_address, 

646 server_port=server_port, 

647 network_peer_address=network_peer_address, 

648 network_peer_port=network_peer_port, 

649 maint_notification=maint_notification, 

650 ) 

651 except Exception: 

652 pass 

653 

654 

655def record_csc_request( 

656 result: Optional[CSCResult] = None, 

657): 

658 """ 

659 Record a Client Side Caching (CSC) request. 

660 

661 Args: 

662 result: CSC result ('hit' or 'miss') 

663 """ 

664 global _metrics_collector 

665 

666 if _metrics_collector is None: 

667 _metrics_collector = _get_or_create_collector() 

668 if _metrics_collector is None: 

669 return 

670 

671 try: 

672 _metrics_collector.record_csc_request( 

673 result=result, 

674 ) 

675 except Exception: 

676 pass 

677 

678 

679def init_csc_items() -> None: 

680 """ 

681 Initialize observable gauge for CSC items metric. 

682 """ 

683 global _metrics_collector 

684 

685 if _metrics_collector is None: 

686 _metrics_collector = _get_or_create_collector() 

687 if _metrics_collector is None: 

688 return 

689 

690 def observable_callback(__): 

691 observables_registry = get_observables_registry_instance() 

692 callbacks = observables_registry.get(CSC_ITEMS_REGISTRY_KEY) 

693 observations = [] 

694 

695 for callback in callbacks: 

696 observations.extend(callback()) 

697 

698 return observations 

699 

700 try: 

701 _metrics_collector.init_csc_items( 

702 callback=observable_callback, 

703 ) 

704 except Exception: 

705 pass 

706 

707 

708def register_csc_items_callback( 

709 callback: Callable, 

710 pool_name: Optional[str] = None, 

711) -> None: 

712 """ 

713 Adds given callback to CSC items observable registry. 

714 

715 Args: 

716 callback: Callback function that returns the cache size 

717 pool_name: Connection pool name for observability 

718 """ 

719 global _metrics_collector 

720 

721 if _metrics_collector is None: 

722 _metrics_collector = _get_or_create_collector() 

723 if _metrics_collector is None: 

724 return 

725 

726 # Lazy import 

727 from opentelemetry.metrics import Observation 

728 

729 def csc_items_callback(): 

730 return [ 

731 Observation( 

732 callback(), 

733 attributes=AttributeBuilder.build_csc_attributes(pool_name=pool_name), 

734 ) 

735 ] 

736 

737 try: 

738 observables_registry = get_observables_registry_instance() 

739 observables_registry.register(CSC_ITEMS_REGISTRY_KEY, csc_items_callback) 

740 except Exception: 

741 pass 

742 

743 

744def record_csc_eviction( 

745 count: int, 

746 reason: Optional[CSCReason] = None, 

747) -> None: 

748 """ 

749 Record a Client Side Caching (CSC) eviction. 

750 

751 Args: 

752 count: Number of evictions 

753 reason: Reason for eviction 

754 """ 

755 global _metrics_collector 

756 

757 if _metrics_collector is None: 

758 _metrics_collector = _get_or_create_collector() 

759 if _metrics_collector is None: 

760 return 

761 

762 try: 

763 _metrics_collector.record_csc_eviction( 

764 count=count, 

765 reason=reason, 

766 ) 

767 except Exception: 

768 pass 

769 

770 

771def record_csc_network_saved( 

772 bytes_saved: int, 

773) -> None: 

774 """ 

775 Record the number of bytes saved by using Client Side Caching (CSC). 

776 

777 Args: 

778 bytes_saved: Number of bytes saved 

779 """ 

780 global _metrics_collector 

781 

782 if _metrics_collector is None: 

783 _metrics_collector = _get_or_create_collector() 

784 if _metrics_collector is None: 

785 return 

786 

787 try: 

788 _metrics_collector.record_csc_network_saved( 

789 bytes_saved=bytes_saved, 

790 ) 

791 except Exception: 

792 pass 

793 

794 

795def record_geo_failover( 

796 fail_from: "SyncDatabase", 

797 fail_to: "SyncDatabase", 

798 reason: GeoFailoverReason, 

799) -> None: 

800 """ 

801 Record a geo failover. 

802 

803 Args: 

804 fail_from: Database failed from 

805 fail_to: Database failed to 

806 reason: Reason for the failover 

807 """ 

808 global _metrics_collector 

809 

810 if _metrics_collector is None: 

811 _metrics_collector = _get_or_create_collector() 

812 if _metrics_collector is None: 

813 return 

814 

815 try: 

816 _metrics_collector.record_geo_failover( 

817 fail_from=fail_from, 

818 fail_to=fail_to, 

819 reason=reason, 

820 ) 

821 except Exception: 

822 pass 

823 

824 

825def _get_or_create_collector() -> Optional[RedisMetricsCollector]: 

826 """ 

827 Get or create the global metrics collector. 

828 

829 Returns: 

830 RedisMetricsCollector instance if observability is enabled, None otherwise 

831 """ 

832 try: 

833 manager = get_observability_instance().get_provider_manager() 

834 if manager is None or not manager.config.enabled_telemetry: 

835 return None 

836 

837 # Get meter from the global MeterProvider 

838 meter = manager.get_meter_provider().get_meter( 

839 RedisMetricsCollector.METER_NAME, RedisMetricsCollector.METER_VERSION 

840 ) 

841 

842 return RedisMetricsCollector(meter, manager.config) 

843 

844 except ImportError: 

845 # Observability module not available 

846 return None 

847 except Exception: 

848 # Any other error - don't break Redis operations 

849 return None 

850 

851 

852def _get_config() -> Optional["OTelConfig"]: 

853 """ 

854 Get the OTel configuration from the observability manager. 

855 

856 Returns: 

857 OTelConfig instance if observability is enabled, None otherwise 

858 """ 

859 try: 

860 manager = get_observability_instance().get_provider_manager() 

861 if manager is None: 

862 return None 

863 return manager.config 

864 except Exception: 

865 return None 

866 

867 

868def reset_collector() -> None: 

869 """ 

870 Reset the global collector (used for testing or re-initialization). 

871 """ 

872 global _metrics_collector 

873 _metrics_collector = None 

874 

875 

876def is_enabled() -> bool: 

877 """ 

878 Check if observability is enabled. 

879 

880 Returns: 

881 True if metrics are being collected, False otherwise 

882 """ 

883 global _metrics_collector 

884 

885 if _metrics_collector is None: 

886 _metrics_collector = _get_or_create_collector() 

887 

888 return _metrics_collector is not None