Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/observability/recorder.py: 14%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

282 statements  

1""" 

2Simple, clean API for recording observability metrics. 

3 

4This module provides a straightforward interface for Redis core code to record 

5metrics without needing to know about OpenTelemetry internals. 

6 

7Usage in Redis core code: 

8 from redis.observability.recorder import record_operation_duration 

9 

10 start_time = time.monotonic() 

11 # ... execute Redis command ... 

12 record_operation_duration( 

13 command_name='SET', 

14 duration_seconds=time.monotonic() - start_time, 

15 server_address='localhost', 

16 server_port=6379, 

17 db_namespace='0', 

18 error=None 

19 ) 

20""" 

21 

22from datetime import datetime 

23from typing import TYPE_CHECKING, Callable, List, Optional 

24 

25from redis.observability.attributes import ( 

26 AttributeBuilder, 

27 CSCReason, 

28 CSCResult, 

29 GeoFailoverReason, 

30 PubSubDirection, 

31) 

32from redis.observability.metrics import CloseReason, RedisMetricsCollector 

33from redis.observability.providers import get_observability_instance 

34from redis.observability.registry import get_observables_registry_instance 

35from redis.utils import deprecated_args, str_if_bytes 

36 

37if TYPE_CHECKING: 

38 from redis.connection import ConnectionPoolInterface 

39 from redis.multidb.database import SyncDatabase 

40 from redis.observability.config import OTelConfig 

41 

42# Global metrics collector instance (lazy-initialized) 

43_metrics_collector: Optional[RedisMetricsCollector] = None 

44 

45CONNECTION_COUNT_REGISTRY_KEY = "connection_count" 

46CSC_ITEMS_REGISTRY_KEY = "csc_items" 

47 

48 

49@deprecated_args( 

50 args_to_warn=["batch_size"], 

51 reason="The batch_size argument is no longer used and will be removed in the next major version.", 

52 version="7.2.1", 

53) 

54def record_operation_duration( 

55 command_name: str, 

56 duration_seconds: float, 

57 server_address: Optional[str] = None, 

58 server_port: Optional[int] = None, 

59 db_namespace: Optional[str] = None, 

60 error: Optional[Exception] = None, 

61 is_blocking: Optional[bool] = None, 

62 batch_size: Optional[int] = None, # noqa 

63 retry_attempts: Optional[int] = None, 

64) -> None: 

65 """ 

66 Record a Redis command execution duration. 

67 

68 This is a simple, clean API that Redis core code can call directly. 

69 If observability is not enabled, this returns immediately with zero overhead. 

70 

71 Args: 

72 command_name: Redis command name (e.g., 'GET', 'SET') 

73 duration_seconds: Command execution time in seconds 

74 server_address: Redis server address 

75 server_port: Redis server port 

76 db_namespace: Redis database index 

77 error: Exception if command failed, None if successful 

78 is_blocking: Whether the operation is a blocking command 

79 batch_size: Number of commands in batch (for pipelines/transactions) 

80 retry_attempts: Number of retry attempts made 

81 

82 Example: 

83 >>> start = time.monotonic() 

84 >>> # ... execute command ... 

85 >>> record_operation_duration('SET', time.monotonic() - start, 'localhost', 6379, '0') 

86 """ 

87 global _metrics_collector 

88 

89 # Fast path: if collector not initialized, observability is disabled 

90 if _metrics_collector is None: 

91 # Try to initialize (only once) 

92 _metrics_collector = _get_or_create_collector() 

93 if _metrics_collector is None: 

94 return # Observability not enabled 

95 

96 # Record the metric 

97 try: 

98 _metrics_collector.record_operation_duration( 

99 command_name=command_name, 

100 duration_seconds=duration_seconds, 

101 server_address=server_address, 

102 server_port=server_port, 

103 db_namespace=db_namespace, 

104 error_type=error, 

105 network_peer_address=server_address, 

106 network_peer_port=server_port, 

107 is_blocking=is_blocking, 

108 retry_attempts=retry_attempts, 

109 ) 

110 except Exception: 

111 # Don't let metric recording errors break Redis operations 

112 pass 

113 

114 

115def record_connection_create_time( 

116 connection_pool: "ConnectionPoolInterface", 

117 duration_seconds: float, 

118) -> None: 

119 """ 

120 Record connection creation time. 

121 

122 Args: 

123 connection_pool: Connection pool implementation 

124 duration_seconds: Time taken to create connection in seconds 

125 

126 Example: 

127 >>> start = time.monotonic() 

128 >>> # ... create connection ... 

129 >>> record_connection_create_time('ConnectionPool<localhost:6379>', time.monotonic() - start) 

130 """ 

131 global _metrics_collector 

132 

133 # Fast path: if collector not initialized, observability is disabled 

134 if _metrics_collector is None: 

135 _metrics_collector = _get_or_create_collector() 

136 if _metrics_collector is None: 

137 return 

138 

139 try: 

140 _metrics_collector.record_connection_create_time( 

141 connection_pool=connection_pool, 

142 duration_seconds=duration_seconds, 

143 ) 

144 except Exception: 

145 pass 

146 

147 

148def init_connection_count() -> None: 

149 """ 

150 Initialize observable gauge for connection count metric. 

151 """ 

152 global _metrics_collector 

153 

154 if _metrics_collector is None: 

155 _metrics_collector = _get_or_create_collector() 

156 if _metrics_collector is None: 

157 return 

158 

159 def observable_callback(__): 

160 observables_registry = get_observables_registry_instance() 

161 callbacks = observables_registry.get(CONNECTION_COUNT_REGISTRY_KEY) 

162 observations = [] 

163 

164 for callback in callbacks: 

165 observations.extend(callback()) 

166 

167 return observations 

168 

169 try: 

170 _metrics_collector.init_connection_count( 

171 callback=observable_callback, 

172 ) 

173 except Exception: 

174 pass 

175 

176 

177def register_pools_connection_count( 

178 connection_pools: List["ConnectionPoolInterface"], 

179) -> None: 

180 """ 

181 Add connection pools to connection count observable registry. 

182 """ 

183 global _metrics_collector 

184 

185 if _metrics_collector is None: 

186 _metrics_collector = _get_or_create_collector() 

187 if _metrics_collector is None: 

188 return 

189 

190 try: 

191 # Lazy import 

192 from opentelemetry.metrics import Observation 

193 

194 def connection_count_callback(): 

195 observations = [] 

196 for connection_pool in connection_pools: 

197 for count, attributes in connection_pool.get_connection_count(): 

198 observations.append(Observation(count, attributes=attributes)) 

199 return observations 

200 

201 observables_registry = get_observables_registry_instance() 

202 observables_registry.register( 

203 CONNECTION_COUNT_REGISTRY_KEY, connection_count_callback 

204 ) 

205 except Exception: 

206 pass 

207 

208 

209def record_connection_timeout( 

210 pool_name: str, 

211) -> None: 

212 """ 

213 Record a connection timeout event. 

214 

215 Args: 

216 pool_name: Connection pool identifier 

217 

218 Example: 

219 >>> record_connection_timeout('ConnectionPool<localhost:6379>') 

220 """ 

221 global _metrics_collector 

222 

223 if _metrics_collector is None: 

224 _metrics_collector = _get_or_create_collector() 

225 if _metrics_collector is None: 

226 return 

227 

228 try: 

229 _metrics_collector.record_connection_timeout( 

230 pool_name=pool_name, 

231 ) 

232 except Exception: 

233 pass 

234 

235 

236def record_connection_wait_time( 

237 pool_name: str, 

238 duration_seconds: float, 

239) -> None: 

240 """ 

241 Record time taken to obtain a connection from the pool. 

242 

243 Args: 

244 pool_name: Connection pool identifier 

245 duration_seconds: Wait time in seconds 

246 

247 Example: 

248 >>> start = time.monotonic() 

249 >>> # ... wait for connection from pool ... 

250 >>> record_connection_wait_time('ConnectionPool<localhost:6379>', time.monotonic() - start) 

251 """ 

252 global _metrics_collector 

253 

254 if _metrics_collector is None: 

255 _metrics_collector = _get_or_create_collector() 

256 if _metrics_collector is None: 

257 return 

258 

259 try: 

260 _metrics_collector.record_connection_wait_time( 

261 pool_name=pool_name, 

262 duration_seconds=duration_seconds, 

263 ) 

264 except Exception: 

265 pass 

266 

267 

268def record_connection_closed( 

269 close_reason: Optional[CloseReason] = None, 

270 error_type: Optional[Exception] = None, 

271) -> None: 

272 """ 

273 Record a connection closed event. 

274 

275 Args: 

276 close_reason: Reason for closing (e.g. 'error', 'application_close') 

277 error_type: Error type if closed due to error 

278 

279 Example: 

280 >>> record_connection_closed('ConnectionPool<localhost:6379>', 'idle_timeout') 

281 """ 

282 global _metrics_collector 

283 

284 if _metrics_collector is None: 

285 _metrics_collector = _get_or_create_collector() 

286 if _metrics_collector is None: 

287 return 

288 

289 try: 

290 _metrics_collector.record_connection_closed( 

291 close_reason=close_reason, 

292 error_type=error_type, 

293 ) 

294 except Exception: 

295 pass 

296 

297 

298def record_connection_relaxed_timeout( 

299 connection_name: str, 

300 maint_notification: str, 

301 relaxed: bool, 

302) -> None: 

303 """ 

304 Record a connection timeout relaxation event. 

305 

306 Args: 

307 connection_name: Connection identifier 

308 maint_notification: Maintenance notification type 

309 relaxed: True to count up (relaxed), False to count down (unrelaxed) 

310 

311 Example: 

312 >>> record_connection_relaxed_timeout('Connection<localhost:6379>', 'MOVING', True) 

313 """ 

314 global _metrics_collector 

315 

316 if _metrics_collector is None: 

317 _metrics_collector = _get_or_create_collector() 

318 if _metrics_collector is None: 

319 return 

320 

321 try: 

322 _metrics_collector.record_connection_relaxed_timeout( 

323 connection_name=connection_name, 

324 maint_notification=maint_notification, 

325 relaxed=relaxed, 

326 ) 

327 except Exception: 

328 pass 

329 

330 

331def record_connection_handoff( 

332 pool_name: str, 

333) -> None: 

334 """ 

335 Record a connection handoff event (e.g., after MOVING notification). 

336 

337 Args: 

338 pool_name: Connection pool identifier 

339 

340 Example: 

341 >>> record_connection_handoff('ConnectionPool<localhost:6379>') 

342 """ 

343 global _metrics_collector 

344 

345 if _metrics_collector is None: 

346 _metrics_collector = _get_or_create_collector() 

347 if _metrics_collector is None: 

348 return 

349 

350 try: 

351 _metrics_collector.record_connection_handoff( 

352 pool_name=pool_name, 

353 ) 

354 except Exception: 

355 pass 

356 

357 

358def record_error_count( 

359 server_address: Optional[str] = None, 

360 server_port: Optional[int] = None, 

361 network_peer_address: Optional[str] = None, 

362 network_peer_port: Optional[int] = None, 

363 error_type: Optional[Exception] = None, 

364 retry_attempts: Optional[int] = None, 

365 is_internal: bool = True, 

366) -> None: 

367 """ 

368 Record error count. 

369 

370 Args: 

371 server_address: Server address 

372 server_port: Server port 

373 network_peer_address: Network peer address 

374 network_peer_port: Network peer port 

375 error_type: Error type (Exception) 

376 retry_attempts: Retry attempts 

377 is_internal: Whether the error is internal (e.g., timeout, network error) 

378 

379 Example: 

380 >>> record_error_count('localhost', 6379, 'localhost', 6379, ConnectionError(), 3) 

381 """ 

382 global _metrics_collector 

383 

384 if _metrics_collector is None: 

385 _metrics_collector = _get_or_create_collector() 

386 if _metrics_collector is None: 

387 return 

388 

389 try: 

390 _metrics_collector.record_error_count( 

391 server_address=server_address, 

392 server_port=server_port, 

393 network_peer_address=network_peer_address, 

394 network_peer_port=network_peer_port, 

395 error_type=error_type, 

396 retry_attempts=retry_attempts, 

397 is_internal=is_internal, 

398 ) 

399 except Exception: 

400 pass 

401 

402 

403def record_pubsub_message( 

404 direction: PubSubDirection, 

405 channel: Optional[str] = None, 

406 sharded: Optional[bool] = None, 

407) -> None: 

408 """ 

409 Record a PubSub message (published or received). 

410 

411 Args: 

412 direction: Message direction ('publish' or 'receive') 

413 channel: Pub/Sub channel name 

414 sharded: True if sharded Pub/Sub channel 

415 

416 Example: 

417 >>> record_pubsub_message(PubSubDirection.PUBLISH, 'channel', False) 

418 """ 

419 global _metrics_collector 

420 

421 if _metrics_collector is None: 

422 _metrics_collector = _get_or_create_collector() 

423 if _metrics_collector is None: 

424 return 

425 

426 # Check if channel names should be hidden 

427 effective_channel = channel 

428 if channel is not None: 

429 config = _get_config() 

430 if config is not None and config.hide_pubsub_channel_names: 

431 effective_channel = None 

432 

433 try: 

434 _metrics_collector.record_pubsub_message( 

435 direction=direction, 

436 channel=effective_channel, 

437 sharded=sharded, 

438 ) 

439 except Exception: 

440 pass 

441 

442 

443@deprecated_args( 

444 args_to_warn=["consumer_name"], 

445 reason="The consumer_name argument is no longer used and will be removed in the next major version.", 

446 version="7.2.1", 

447) 

448def record_streaming_lag( 

449 lag_seconds: float, 

450 stream_name: Optional[str] = None, 

451 consumer_group: Optional[str] = None, 

452 consumer_name: Optional[str] = None, # noqa 

453) -> None: 

454 """ 

455 Record the lag of a streaming message. 

456 

457 Args: 

458 lag_seconds: Lag in seconds 

459 stream_name: Stream name 

460 consumer_group: Consumer group name 

461 consumer_name: Consumer name 

462 """ 

463 global _metrics_collector 

464 

465 if _metrics_collector is None: 

466 _metrics_collector = _get_or_create_collector() 

467 if _metrics_collector is None: 

468 return 

469 

470 # Check if stream names should be hidden 

471 effective_stream_name = stream_name 

472 if stream_name is not None: 

473 config = _get_config() 

474 if config is not None and config.hide_stream_names: 

475 effective_stream_name = None 

476 

477 try: 

478 _metrics_collector.record_streaming_lag( 

479 lag_seconds=lag_seconds, 

480 stream_name=effective_stream_name, 

481 consumer_group=consumer_group, 

482 ) 

483 except Exception: 

484 pass 

485 

486 

487@deprecated_args( 

488 args_to_warn=["consumer_name"], 

489 reason="The consumer_name argument is no longer used and will be removed in the next major version.", 

490 version="7.2.1", 

491) 

492def record_streaming_lag_from_response( 

493 response, 

494 consumer_group: Optional[str] = None, 

495 consumer_name: Optional[str] = None, # noqa 

496) -> None: 

497 """ 

498 Record streaming lag from XREAD/XREADGROUP response. 

499 

500 Parses the response and calculates lag for each message based on message ID timestamp. 

501 

502 Args: 

503 response: Response from XREAD/XREADGROUP command 

504 consumer_group: Consumer group name (for XREADGROUP) 

505 consumer_name: Consumer name (for XREADGROUP) 

506 """ 

507 

508 global _metrics_collector 

509 

510 if _metrics_collector is None: 

511 _metrics_collector = _get_or_create_collector() 

512 if _metrics_collector is None: 

513 return 

514 

515 if not response: 

516 return 

517 

518 try: 

519 now = datetime.now().timestamp() 

520 

521 # Check if stream names should be hidden 

522 config = _get_config() 

523 hide_stream_names = config is not None and config.hide_stream_names 

524 

525 # RESP3 format: dict 

526 if isinstance(response, dict): 

527 for stream_name, stream_messages in response.items(): 

528 effective_stream_name = ( 

529 None if hide_stream_names else str_if_bytes(stream_name) 

530 ) 

531 for messages in stream_messages: 

532 for message in messages: 

533 message_id, _ = message 

534 message_id = str_if_bytes(message_id) 

535 timestamp, _ = message_id.split("-") 

536 # Ensure lag is non-negative (clock skew can cause negative values) 

537 lag_seconds = max(0.0, now - int(timestamp) / 1000) 

538 

539 _metrics_collector.record_streaming_lag( 

540 lag_seconds=lag_seconds, 

541 stream_name=effective_stream_name, 

542 consumer_group=consumer_group, 

543 ) 

544 else: 

545 # RESP2 format: list 

546 for stream_entry in response: 

547 stream_name = str_if_bytes(stream_entry[0]) 

548 effective_stream_name = None if hide_stream_names else stream_name 

549 

550 for message in stream_entry[1]: 

551 message_id, _ = message 

552 message_id = str_if_bytes(message_id) 

553 timestamp, _ = message_id.split("-") 

554 # Ensure lag is non-negative (clock skew can cause negative values) 

555 lag_seconds = max(0.0, now - int(timestamp) / 1000) 

556 

557 _metrics_collector.record_streaming_lag( 

558 lag_seconds=lag_seconds, 

559 stream_name=effective_stream_name, 

560 consumer_group=consumer_group, 

561 ) 

562 except Exception: 

563 pass 

564 

565 

566def record_maint_notification_count( 

567 server_address: str, 

568 server_port: int, 

569 network_peer_address: str, 

570 network_peer_port: int, 

571 maint_notification: str, 

572) -> None: 

573 """ 

574 Record a maintenance notification count. 

575 

576 Args: 

577 server_address: Server address 

578 server_port: Server port 

579 network_peer_address: Network peer address 

580 network_peer_port: Network peer port 

581 maint_notification: Maintenance notification type (e.g., 'MOVING', 'MIGRATING') 

582 

583 Example: 

584 >>> record_maint_notification_count('localhost', 6379, 'localhost', 6379, 'MOVING') 

585 """ 

586 global _metrics_collector 

587 

588 if _metrics_collector is None: 

589 _metrics_collector = _get_or_create_collector() 

590 if _metrics_collector is None: 

591 return 

592 

593 try: 

594 _metrics_collector.record_maint_notification_count( 

595 server_address=server_address, 

596 server_port=server_port, 

597 network_peer_address=network_peer_address, 

598 network_peer_port=network_peer_port, 

599 maint_notification=maint_notification, 

600 ) 

601 except Exception: 

602 pass 

603 

604 

605def record_csc_request( 

606 result: Optional[CSCResult] = None, 

607): 

608 """ 

609 Record a Client Side Caching (CSC) request. 

610 

611 Args: 

612 result: CSC result ('hit' or 'miss') 

613 """ 

614 global _metrics_collector 

615 

616 if _metrics_collector is None: 

617 _metrics_collector = _get_or_create_collector() 

618 if _metrics_collector is None: 

619 return 

620 

621 try: 

622 _metrics_collector.record_csc_request( 

623 result=result, 

624 ) 

625 except Exception: 

626 pass 

627 

628 

629def init_csc_items() -> None: 

630 """ 

631 Initialize observable gauge for CSC items metric. 

632 """ 

633 global _metrics_collector 

634 

635 if _metrics_collector is None: 

636 _metrics_collector = _get_or_create_collector() 

637 if _metrics_collector is None: 

638 return 

639 

640 def observable_callback(__): 

641 observables_registry = get_observables_registry_instance() 

642 callbacks = observables_registry.get(CSC_ITEMS_REGISTRY_KEY) 

643 observations = [] 

644 

645 for callback in callbacks: 

646 observations.extend(callback()) 

647 

648 return observations 

649 

650 try: 

651 _metrics_collector.init_csc_items( 

652 callback=observable_callback, 

653 ) 

654 except Exception: 

655 pass 

656 

657 

658def register_csc_items_callback( 

659 callback: Callable, 

660 pool_name: Optional[str] = None, 

661) -> None: 

662 """ 

663 Adds given callback to CSC items observable registry. 

664 

665 Args: 

666 callback: Callback function that returns the cache size 

667 pool_name: Connection pool name for observability 

668 """ 

669 global _metrics_collector 

670 

671 if _metrics_collector is None: 

672 _metrics_collector = _get_or_create_collector() 

673 if _metrics_collector is None: 

674 return 

675 

676 # Lazy import 

677 from opentelemetry.metrics import Observation 

678 

679 def csc_items_callback(): 

680 return [ 

681 Observation( 

682 callback(), 

683 attributes=AttributeBuilder.build_csc_attributes(pool_name=pool_name), 

684 ) 

685 ] 

686 

687 try: 

688 observables_registry = get_observables_registry_instance() 

689 observables_registry.register(CSC_ITEMS_REGISTRY_KEY, csc_items_callback) 

690 except Exception: 

691 pass 

692 

693 

694def record_csc_eviction( 

695 count: int, 

696 reason: Optional[CSCReason] = None, 

697) -> None: 

698 """ 

699 Record a Client Side Caching (CSC) eviction. 

700 

701 Args: 

702 count: Number of evictions 

703 reason: Reason for eviction 

704 """ 

705 global _metrics_collector 

706 

707 if _metrics_collector is None: 

708 _metrics_collector = _get_or_create_collector() 

709 if _metrics_collector is None: 

710 return 

711 

712 try: 

713 _metrics_collector.record_csc_eviction( 

714 count=count, 

715 reason=reason, 

716 ) 

717 except Exception: 

718 pass 

719 

720 

721def record_csc_network_saved( 

722 bytes_saved: int, 

723) -> None: 

724 """ 

725 Record the number of bytes saved by using Client Side Caching (CSC). 

726 

727 Args: 

728 bytes_saved: Number of bytes saved 

729 """ 

730 global _metrics_collector 

731 

732 if _metrics_collector is None: 

733 _metrics_collector = _get_or_create_collector() 

734 if _metrics_collector is None: 

735 return 

736 

737 try: 

738 _metrics_collector.record_csc_network_saved( 

739 bytes_saved=bytes_saved, 

740 ) 

741 except Exception: 

742 pass 

743 

744 

745def record_geo_failover( 

746 fail_from: "SyncDatabase", 

747 fail_to: "SyncDatabase", 

748 reason: GeoFailoverReason, 

749) -> None: 

750 """ 

751 Record a geo failover. 

752 

753 Args: 

754 fail_from: Database failed from 

755 fail_to: Database failed to 

756 reason: Reason for the failover 

757 """ 

758 global _metrics_collector 

759 

760 if _metrics_collector is None: 

761 _metrics_collector = _get_or_create_collector() 

762 if _metrics_collector is None: 

763 return 

764 

765 try: 

766 _metrics_collector.record_geo_failover( 

767 fail_from=fail_from, 

768 fail_to=fail_to, 

769 reason=reason, 

770 ) 

771 except Exception: 

772 pass 

773 

774 

775def _get_or_create_collector() -> Optional[RedisMetricsCollector]: 

776 """ 

777 Get or create the global metrics collector. 

778 

779 Returns: 

780 RedisMetricsCollector instance if observability is enabled, None otherwise 

781 """ 

782 try: 

783 manager = get_observability_instance().get_provider_manager() 

784 if manager is None or not manager.config.enabled_telemetry: 

785 return None 

786 

787 # Get meter from the global MeterProvider 

788 meter = manager.get_meter_provider().get_meter( 

789 RedisMetricsCollector.METER_NAME, RedisMetricsCollector.METER_VERSION 

790 ) 

791 

792 return RedisMetricsCollector(meter, manager.config) 

793 

794 except ImportError: 

795 # Observability module not available 

796 return None 

797 except Exception: 

798 # Any other error - don't break Redis operations 

799 return None 

800 

801 

802def _get_config() -> Optional["OTelConfig"]: 

803 """ 

804 Get the OTel configuration from the observability manager. 

805 

806 Returns: 

807 OTelConfig instance if observability is enabled, None otherwise 

808 """ 

809 try: 

810 manager = get_observability_instance().get_provider_manager() 

811 if manager is None: 

812 return None 

813 return manager.config 

814 except Exception: 

815 return None 

816 

817 

818def reset_collector() -> None: 

819 """ 

820 Reset the global collector (used for testing or re-initialization). 

821 """ 

822 global _metrics_collector 

823 _metrics_collector = None 

824 

825 

826def is_enabled() -> bool: 

827 """ 

828 Check if observability is enabled. 

829 

830 Returns: 

831 True if metrics are being collected, False otherwise 

832 """ 

833 global _metrics_collector 

834 

835 if _metrics_collector is None: 

836 _metrics_collector = _get_or_create_collector() 

837 

838 return _metrics_collector is not None