Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/observability/metrics.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

183 statements  

1""" 

2OpenTelemetry metrics collector for redis-py. 

3 

4This module defines and manages all metric instruments according to 

5OTel semantic conventions for database clients. 

6""" 

7 

8import logging 

9import time 

10from enum import Enum 

11from typing import TYPE_CHECKING, Callable, Optional, Union 

12 

13if TYPE_CHECKING: 

14 from redis.asyncio.connection import ConnectionPool 

15 from redis.asyncio.multidb.database import AsyncDatabase 

16 from redis.connection import ConnectionPoolInterface 

17 from redis.multidb.database import SyncDatabase 

18 

19from redis.observability.attributes import ( 

20 REDIS_CLIENT_CONNECTION_CLOSE_REASON, 

21 REDIS_CLIENT_CONNECTION_NOTIFICATION, 

22 AttributeBuilder, 

23 ConnectionState, 

24 CSCReason, 

25 CSCResult, 

26 GeoFailoverReason, 

27 PubSubDirection, 

28 get_pool_name, 

29) 

30from redis.observability.config import MetricGroup, OTelConfig 

31from redis.utils import deprecated_args, deprecated_function 

32 

33logger = logging.getLogger(__name__) 

34 

35# Optional imports - OTel SDK may not be installed 

36try: 

37 from opentelemetry.metrics import Meter 

38 

39 OTEL_AVAILABLE = True 

40except ImportError: 

41 OTEL_AVAILABLE = False 

42 Counter = None 

43 Histogram = None 

44 Meter = None 

45 UpDownCounter = None 

46 

47 

48class CloseReason(Enum): 

49 """ 

50 Enum representing the reason why a Redis client connection was closed. 

51 

52 Values: 

53 APPLICATION_CLOSE: The connection was closed intentionally by the application 

54 (for example, during normal shutdown or explicit cleanup). 

55 ERROR: The connection was closed due to an unexpected error 

56 (for example, network failure or protocol error). 

57 HEALTHCHECK_FAILED: The connection was closed because a health check 

58 or liveness check for the connection failed. 

59 """ 

60 

61 APPLICATION_CLOSE = "application_close" 

62 ERROR = "error" 

63 HEALTHCHECK_FAILED = "healthcheck_failed" 

64 

65 

66class RedisMetricsCollector: 

67 """ 

68 Collects and records OpenTelemetry metrics for Redis operations. 

69 

70 This class manages all metric instruments and provides methods to record 

71 various Redis operations including connection pool events, command execution, 

72 and cluster-specific operations. 

73 

74 Args: 

75 meter: OpenTelemetry Meter instance 

76 config: OTel configuration object 

77 """ 

78 

79 METER_NAME = "redis-py" 

80 METER_VERSION = "1.0.0" 

81 

82 def __init__(self, meter: Meter, config: OTelConfig): 

83 if not OTEL_AVAILABLE: 

84 raise ImportError( 

85 "OpenTelemetry API is not installed. " 

86 "Install it with: pip install opentelemetry-api" 

87 ) 

88 

89 self.meter = meter 

90 self.config = config 

91 self.attr_builder = AttributeBuilder() 

92 

93 # Initialize enabled metric instruments 

94 

95 if MetricGroup.RESILIENCY in self.config.metric_groups: 

96 self._init_resiliency_metrics() 

97 

98 if MetricGroup.COMMAND in self.config.metric_groups: 

99 self._init_command_metrics() 

100 

101 if MetricGroup.CONNECTION_BASIC in self.config.metric_groups: 

102 self._init_connection_basic_metrics() 

103 

104 if MetricGroup.CONNECTION_ADVANCED in self.config.metric_groups: 

105 self._init_connection_advanced_metrics() 

106 

107 if MetricGroup.PUBSUB in self.config.metric_groups: 

108 self._init_pubsub_metrics() 

109 

110 if MetricGroup.STREAMING in self.config.metric_groups: 

111 self._init_streaming_metrics() 

112 

113 if MetricGroup.CSC in self.config.metric_groups: 

114 self._init_csc_metrics() 

115 

116 logger.info("RedisMetricsCollector initialized") 

117 

118 def _init_resiliency_metrics(self) -> None: 

119 """Initialize resiliency metrics.""" 

120 self.client_errors = self.meter.create_counter( 

121 name="redis.client.errors", 

122 unit="{error}", 

123 description="A counter of all errors (both returned to the user and handled internally in the client library)", 

124 ) 

125 

126 self.maintenance_notifications = self.meter.create_counter( 

127 name="redis.client.maintenance.notifications", 

128 unit="{notification}", 

129 description="Tracks server-side maintenance notifications", 

130 ) 

131 

132 self.geo_failovers = self.meter.create_counter( 

133 name="redis.client.geofailover.failovers", 

134 unit="{geofailover}", 

135 description="Total count of failovers happened using MultiDbClient.", 

136 ) 

137 

138 def _init_connection_basic_metrics(self) -> None: 

139 """Initialize basic connection metrics.""" 

140 self.connection_create_time = self.meter.create_histogram( 

141 name="db.client.connection.create_time", 

142 unit="s", 

143 description="Time to create a new connection", 

144 explicit_bucket_boundaries_advisory=self.config.buckets_connection_create_time, 

145 ) 

146 

147 self.connection_relaxed_timeout = self.meter.create_up_down_counter( 

148 name="redis.client.connection.relaxed_timeout", 

149 unit="{relaxation}", 

150 description="Counts up for relaxed timeout, counts down for unrelaxed timeout", 

151 ) 

152 

153 self.connection_handoff = self.meter.create_counter( 

154 name="redis.client.connection.handoff", 

155 unit="{handoff}", 

156 description="Connections that have been handed off (e.g., after a MOVING notification)", 

157 ) 

158 

159 # DEPRECATED: This attribute is kept for backward compatibility. 

160 # It requires manual initialization via init_connection_count() with a callback. 

161 # Use connection_count_updown instead for push-based tracking. 

162 # Will be removed in the next major version. 

163 self.connection_count = None 

164 

165 # New push-based connection count tracking via UpDownCounter 

166 self.connection_count_updown = self.meter.create_up_down_counter( 

167 name="db.client.connection.count", 

168 unit="{connection}", 

169 description="Number of connections currently in the pool by state", 

170 ) 

171 

172 def _init_connection_advanced_metrics(self) -> None: 

173 """Initialize advanced connection metrics.""" 

174 self.connection_timeouts = self.meter.create_counter( 

175 name="db.client.connection.timeouts", 

176 unit="{timeout}", 

177 description="The number of connection timeouts that have occurred trying to obtain a connection from the pool.", 

178 ) 

179 

180 self.connection_wait_time = self.meter.create_histogram( 

181 name="db.client.connection.wait_time", 

182 unit="s", 

183 description="Time to obtain an open connection from the pool", 

184 explicit_bucket_boundaries_advisory=self.config.buckets_connection_wait_time, 

185 ) 

186 

187 self.connection_closed = self.meter.create_counter( 

188 name="redis.client.connection.closed", 

189 unit="{connection}", 

190 description="Total number of closed connections", 

191 ) 

192 

193 def _init_command_metrics(self) -> None: 

194 """Initialize command execution metric instruments.""" 

195 self.operation_duration = self.meter.create_histogram( 

196 name="db.client.operation.duration", 

197 unit="s", 

198 description="Command execution duration", 

199 explicit_bucket_boundaries_advisory=self.config.buckets_operation_duration, 

200 ) 

201 

202 def _init_pubsub_metrics(self) -> None: 

203 """Initialize PubSub metric instruments.""" 

204 self.pubsub_messages = self.meter.create_counter( 

205 name="redis.client.pubsub.messages", 

206 unit="{message}", 

207 description="Tracks published and received messages", 

208 ) 

209 

210 def _init_streaming_metrics(self) -> None: 

211 """Initialize Streaming metric instruments.""" 

212 self.stream_lag = self.meter.create_histogram( 

213 name="redis.client.stream.lag", 

214 unit="s", 

215 description="End-to-end lag per message, showing how stale are the messages when the application starts processing them.", 

216 explicit_bucket_boundaries_advisory=self.config.buckets_stream_processing_duration, 

217 ) 

218 

219 def _init_csc_metrics(self) -> None: 

220 """Initialize Client Side Caching (CSC) metric instruments.""" 

221 self.csc_requests = self.meter.create_counter( 

222 name="redis.client.csc.requests", 

223 unit="{request}", 

224 description="The total number of requests to the cache", 

225 ) 

226 

227 self.csc_evictions = self.meter.create_counter( 

228 name="redis.client.csc.evictions", 

229 unit="{eviction}", 

230 description="The total number of cache evictions", 

231 ) 

232 

233 self.csc_network_saved = self.meter.create_counter( 

234 name="redis.client.csc.network_saved", 

235 unit="By", 

236 description="The total number of bytes saved by using CSC", 

237 ) 

238 

239 # Resiliency metric recording methods 

240 

241 def record_error_count( 

242 self, 

243 server_address: Optional[str] = None, 

244 server_port: Optional[int] = None, 

245 network_peer_address: Optional[str] = None, 

246 network_peer_port: Optional[int] = None, 

247 error_type: Optional[Exception] = None, 

248 retry_attempts: Optional[int] = None, 

249 is_internal: Optional[bool] = None, 

250 ): 

251 """ 

252 Record error count 

253 

254 Args: 

255 server_address: Server address 

256 server_port: Server port 

257 network_peer_address: Network peer address 

258 network_peer_port: Network peer port 

259 error_type: Error type 

260 retry_attempts: Retry attempts 

261 is_internal: Whether the error is internal (e.g., timeout, network error) 

262 """ 

263 if not hasattr(self, "client_errors"): 

264 return 

265 

266 attrs = self.attr_builder.build_base_attributes( 

267 server_address=server_address, 

268 server_port=server_port, 

269 ) 

270 attrs.update( 

271 self.attr_builder.build_operation_attributes( 

272 network_peer_address=network_peer_address, 

273 network_peer_port=network_peer_port, 

274 retry_attempts=retry_attempts, 

275 ) 

276 ) 

277 

278 attrs.update( 

279 self.attr_builder.build_error_attributes( 

280 error_type=error_type, 

281 is_internal=is_internal, 

282 ) 

283 ) 

284 

285 self.client_errors.add(1, attributes=attrs) 

286 

287 def record_maint_notification_count( 

288 self, 

289 server_address: str, 

290 server_port: int, 

291 network_peer_address: str, 

292 network_peer_port: int, 

293 maint_notification: str, 

294 ): 

295 """ 

296 Record maintenance notification count 

297 

298 Args: 

299 server_address: Server address 

300 server_port: Server port 

301 network_peer_address: Network peer address 

302 network_peer_port: Network peer port 

303 maint_notification: Maintenance notification 

304 """ 

305 if not hasattr(self, "maintenance_notifications"): 

306 return 

307 

308 attrs = self.attr_builder.build_base_attributes( 

309 server_address=server_address, 

310 server_port=server_port, 

311 ) 

312 

313 attrs.update( 

314 self.attr_builder.build_operation_attributes( 

315 network_peer_address=network_peer_address, 

316 network_peer_port=network_peer_port, 

317 ) 

318 ) 

319 

320 attrs[REDIS_CLIENT_CONNECTION_NOTIFICATION] = maint_notification 

321 self.maintenance_notifications.add(1, attributes=attrs) 

322 

323 def record_geo_failover( 

324 self, 

325 fail_from: Union["SyncDatabase", "AsyncDatabase"], 

326 fail_to: Union["SyncDatabase", "AsyncDatabase"], 

327 reason: GeoFailoverReason, 

328 ): 

329 """ 

330 Record geo failover 

331 

332 Args: 

333 fail_from: Database failed from 

334 fail_to: Database failed to 

335 reason: Reason for the failover 

336 """ 

337 

338 if not hasattr(self, "geo_failovers"): 

339 return 

340 

341 attrs = self.attr_builder.build_geo_failover_attributes( 

342 fail_from=fail_from, 

343 fail_to=fail_to, 

344 reason=reason, 

345 ) 

346 

347 return self.geo_failovers.add(1, attributes=attrs) 

348 

349 def record_connection_count( 

350 self, 

351 pool_name: str, 

352 connection_state: ConnectionState, 

353 counter: int = 1, 

354 ) -> None: 

355 """ 

356 Record a connection count change for a single state. 

357 

358 Args: 

359 pool_name: Connection pool name 

360 connection_state: State to update (IDLE or USED) 

361 counter: Number to add (positive) or subtract (negative) 

362 """ 

363 if not hasattr(self, "connection_count_updown"): 

364 return 

365 

366 attrs = self.attr_builder.build_connection_attributes( 

367 pool_name=pool_name, 

368 connection_state=connection_state, 

369 ) 

370 self.connection_count_updown.add(counter, attributes=attrs) 

371 

372 @deprecated_function( 

373 reason="Connection count is now tracked via record_connection_count(). " 

374 "This functionality will be removed in the next major version", 

375 version="7.4.0", 

376 ) 

377 def init_connection_count( 

378 self, 

379 callback: Callable, 

380 ) -> None: 

381 """ 

382 Initialize observable gauge for connection count metric. 

383 

384 Args: 

385 callback: Callback function to retrieve connection counts 

386 """ 

387 if MetricGroup.CONNECTION_BASIC not in self.config.metric_groups: 

388 return 

389 

390 # DEPRECATED: Create observable gauge for backward compatibility 

391 # This gauge uses a different metric name to avoid conflicts with 

392 # the new push-based connection_count_updown counter 

393 self.connection_count = self.meter.create_observable_gauge( 

394 name="db.client.connection.count.deprecated", 

395 unit="{connection}", 

396 description="The number of connections that are currently in state " 

397 "described by the state attribute (deprecated - use db.client.connection.count instead)", 

398 callbacks=[callback], 

399 ) 

400 

401 def init_csc_items( 

402 self, 

403 callback: Callable, 

404 ) -> None: 

405 """ 

406 Initialize observable gauge for CSC items metric. 

407 

408 Args: 

409 callback: Callback function to retrieve CSC items count 

410 """ 

411 if MetricGroup.CSC not in self.config.metric_groups and not self.csc_items: 

412 return 

413 

414 self.csc_items = self.meter.create_observable_gauge( 

415 name="redis.client.csc.items", 

416 unit="{item}", 

417 description="The total number of cached responses currently stored", 

418 callbacks=[callback], 

419 ) 

420 

421 def record_connection_timeout(self, pool_name: str) -> None: 

422 """ 

423 Record a connection timeout event. 

424 

425 Args: 

426 pool_name: Connection pool name 

427 """ 

428 if not hasattr(self, "connection_timeouts"): 

429 return 

430 

431 attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name) 

432 self.connection_timeouts.add(1, attributes=attrs) 

433 

434 def record_connection_create_time( 

435 self, 

436 connection_pool: Union["ConnectionPoolInterface", "ConnectionPool"], 

437 duration_seconds: float, 

438 ) -> None: 

439 """ 

440 Record time taken to create a new connection. 

441 

442 Args: 

443 connection_pool: Connection pool implementation 

444 duration_seconds: Creation time in seconds 

445 """ 

446 if not hasattr(self, "connection_create_time"): 

447 return 

448 

449 attrs = self.attr_builder.build_connection_attributes( 

450 pool_name=get_pool_name(connection_pool) 

451 ) 

452 self.connection_create_time.record(duration_seconds, attributes=attrs) 

453 

454 def record_connection_wait_time( 

455 self, 

456 pool_name: str, 

457 duration_seconds: float, 

458 ) -> None: 

459 """ 

460 Record time taken to obtain a connection from the pool. 

461 

462 Args: 

463 pool_name: Connection pool name 

464 duration_seconds: Wait time in seconds 

465 """ 

466 if not hasattr(self, "connection_wait_time"): 

467 return 

468 

469 attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name) 

470 self.connection_wait_time.record(duration_seconds, attributes=attrs) 

471 

472 # Command execution metric recording methods 

473 

474 @deprecated_args( 

475 args_to_warn=["batch_size"], 

476 reason="The batch_size argument is no longer used and will be removed in the next major version.", 

477 version="7.2.1", 

478 ) 

479 def record_operation_duration( 

480 self, 

481 command_name: str, 

482 duration_seconds: float, 

483 server_address: Optional[str] = None, 

484 server_port: Optional[int] = None, 

485 db_namespace: Optional[int] = None, 

486 batch_size: Optional[int] = None, # noqa 

487 error_type: Optional[Exception] = None, 

488 network_peer_address: Optional[str] = None, 

489 network_peer_port: Optional[int] = None, 

490 retry_attempts: Optional[int] = None, 

491 is_blocking: Optional[bool] = None, 

492 ) -> None: 

493 """ 

494 Record command execution duration. 

495 

496 Args: 

497 command_name: Redis command name (e.g., 'GET', 'SET', 'MULTI') 

498 duration_seconds: Execution time in seconds 

499 server_address: Redis server address 

500 server_port: Redis server port 

501 db_namespace: Redis database index 

502 batch_size: Number of commands in batch (for pipelines/transactions) 

503 error_type: Error type if operation failed 

504 network_peer_address: Resolved peer address 

505 network_peer_port: Peer port number 

506 retry_attempts: Number of retry attempts made 

507 is_blocking: Whether the operation is a blocking command 

508 """ 

509 if not hasattr(self, "operation_duration"): 

510 return 

511 

512 # Check if this command should be tracked 

513 if not self.config.should_track_command(command_name): 

514 return 

515 

516 # Build attributes 

517 attrs = self.attr_builder.build_base_attributes( 

518 server_address=server_address, 

519 server_port=server_port, 

520 db_namespace=db_namespace, 

521 ) 

522 

523 attrs.update( 

524 self.attr_builder.build_operation_attributes( 

525 command_name=command_name, 

526 network_peer_address=network_peer_address, 

527 network_peer_port=network_peer_port, 

528 retry_attempts=retry_attempts, 

529 is_blocking=is_blocking, 

530 ) 

531 ) 

532 

533 attrs.update( 

534 self.attr_builder.build_error_attributes( 

535 error_type=error_type, 

536 ) 

537 ) 

538 self.operation_duration.record(duration_seconds, attributes=attrs) 

539 

540 def record_connection_closed( 

541 self, 

542 close_reason: Optional[CloseReason] = None, 

543 error_type: Optional[Exception] = None, 

544 ) -> None: 

545 """ 

546 Record a connection closed event. 

547 

548 Args: 

549 close_reason: Reason for closing (e.g. 'error', 'application_close') 

550 error_type: Error type if closed due to error 

551 """ 

552 if not hasattr(self, "connection_closed"): 

553 return 

554 

555 attrs = self.attr_builder.build_connection_attributes() 

556 if close_reason: 

557 attrs[REDIS_CLIENT_CONNECTION_CLOSE_REASON] = close_reason.value 

558 

559 attrs.update( 

560 self.attr_builder.build_error_attributes( 

561 error_type=error_type, 

562 ) 

563 ) 

564 

565 self.connection_closed.add(1, attributes=attrs) 

566 

567 def record_connection_relaxed_timeout( 

568 self, 

569 connection_name: str, 

570 maint_notification: str, 

571 relaxed: bool, 

572 ) -> None: 

573 """ 

574 Record a connection timeout relaxation event. 

575 

576 Args: 

577 connection_name: Connection name 

578 maint_notification: Maintenance notification type 

579 relaxed: True to count up (relaxed), False to count down (unrelaxed) 

580 """ 

581 if not hasattr(self, "connection_relaxed_timeout"): 

582 return 

583 

584 attrs = self.attr_builder.build_connection_attributes(pool_name=connection_name) 

585 attrs[REDIS_CLIENT_CONNECTION_NOTIFICATION] = maint_notification 

586 self.connection_relaxed_timeout.add(1 if relaxed else -1, attributes=attrs) 

587 

588 def record_connection_handoff( 

589 self, 

590 pool_name: str, 

591 ) -> None: 

592 """ 

593 Record a connection handoff event (e.g., after MOVING notification). 

594 

595 Args: 

596 pool_name: Connection pool name 

597 """ 

598 if not hasattr(self, "connection_handoff"): 

599 return 

600 

601 attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name) 

602 self.connection_handoff.add(1, attributes=attrs) 

603 

604 # PubSub metric recording methods 

605 

606 def record_pubsub_message( 

607 self, 

608 direction: PubSubDirection, 

609 channel: Optional[str] = None, 

610 sharded: Optional[bool] = None, 

611 ) -> None: 

612 """ 

613 Record a PubSub message (published or received). 

614 

615 Args: 

616 direction: Message direction ('publish' or 'receive') 

617 channel: Pub/Sub channel name 

618 sharded: True if sharded Pub/Sub channel 

619 """ 

620 if not hasattr(self, "pubsub_messages"): 

621 return 

622 

623 attrs = self.attr_builder.build_pubsub_message_attributes( 

624 direction=direction, 

625 channel=channel, 

626 sharded=sharded, 

627 ) 

628 self.pubsub_messages.add(1, attributes=attrs) 

629 

630 # Streaming metric recording methods 

631 

632 @deprecated_args( 

633 args_to_warn=["consumer_name"], 

634 reason="The consumer_name argument is no longer used and will be removed in the next major version.", 

635 version="7.2.1", 

636 ) 

637 def record_streaming_lag( 

638 self, 

639 lag_seconds: float, 

640 stream_name: Optional[str] = None, 

641 consumer_group: Optional[str] = None, 

642 consumer_name: Optional[str] = None, # noqa 

643 ) -> None: 

644 """ 

645 Record the lag of a streaming message. 

646 

647 Args: 

648 lag_seconds: Lag in seconds 

649 stream_name: Stream name 

650 consumer_group: Consumer group name 

651 consumer_name: Consumer name 

652 """ 

653 if not hasattr(self, "stream_lag"): 

654 return 

655 

656 attrs = self.attr_builder.build_streaming_attributes( 

657 stream_name=stream_name, 

658 consumer_group=consumer_group, 

659 ) 

660 self.stream_lag.record(lag_seconds, attributes=attrs) 

661 

662 # CSC metric recording methods 

663 

664 def record_csc_request( 

665 self, 

666 result: Optional[CSCResult] = None, 

667 ) -> None: 

668 """ 

669 Record a Client Side Caching (CSC) request. 

670 

671 Args: 

672 result: CSC result ('hit' or 'miss') 

673 """ 

674 if not hasattr(self, "csc_requests"): 

675 return 

676 

677 attrs = self.attr_builder.build_csc_attributes(result=result) 

678 self.csc_requests.add(1, attributes=attrs) 

679 

680 def record_csc_eviction( 

681 self, 

682 count: int, 

683 reason: Optional[CSCReason] = None, 

684 ) -> None: 

685 """ 

686 Record a Client Side Caching (CSC) eviction. 

687 

688 Args: 

689 count: Number of evictions 

690 reason: Reason for eviction 

691 """ 

692 if not hasattr(self, "csc_evictions"): 

693 return 

694 

695 attrs = self.attr_builder.build_csc_attributes(reason=reason) 

696 self.csc_evictions.add(count, attributes=attrs) 

697 

698 def record_csc_network_saved( 

699 self, 

700 bytes_saved: int, 

701 ) -> None: 

702 """ 

703 Record the number of bytes saved by using Client Side Caching (CSC). 

704 

705 Args: 

706 bytes_saved: Number of bytes saved 

707 """ 

708 if not hasattr(self, "csc_network_saved"): 

709 return 

710 

711 attrs = self.attr_builder.build_csc_attributes() 

712 self.csc_network_saved.add(bytes_saved, attributes=attrs) 

713 

714 # Utility methods 

715 

716 @staticmethod 

717 def monotonic_time() -> float: 

718 """ 

719 Get monotonic time for duration measurements. 

720 

721 Returns: 

722 Current monotonic time in seconds 

723 """ 

724 return time.monotonic() 

725 

726 def __repr__(self) -> str: 

727 return f"RedisMetricsCollector(meter={self.meter}, config={self.config})"