Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/observability/metrics.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

176 statements  

1""" 

2OpenTelemetry metrics collector for redis-py. 

3 

4This module defines and manages all metric instruments according to 

5OTel semantic conventions for database clients. 

6""" 

7 

8import logging 

9import time 

10from enum import Enum 

11from typing import TYPE_CHECKING, Callable, Optional, Union 

12 

13if TYPE_CHECKING: 

14 from redis.asyncio.connection import ConnectionPool 

15 from redis.asyncio.multidb.database import AsyncDatabase 

16 from redis.connection import ConnectionPoolInterface 

17 from redis.multidb.database import SyncDatabase 

18 

19from redis.observability.attributes import ( 

20 REDIS_CLIENT_CONNECTION_CLOSE_REASON, 

21 REDIS_CLIENT_CONNECTION_NOTIFICATION, 

22 AttributeBuilder, 

23 CSCReason, 

24 CSCResult, 

25 GeoFailoverReason, 

26 PubSubDirection, 

27 get_pool_name, 

28) 

29from redis.observability.config import MetricGroup, OTelConfig 

30from redis.utils import deprecated_args 

31 

32logger = logging.getLogger(__name__) 

33 

34# Optional imports - OTel SDK may not be installed 

35try: 

36 from opentelemetry.metrics import Meter 

37 

38 OTEL_AVAILABLE = True 

39except ImportError: 

40 OTEL_AVAILABLE = False 

41 Counter = None 

42 Histogram = None 

43 Meter = None 

44 UpDownCounter = None 

45 

46 

47class CloseReason(Enum): 

48 """ 

49 Enum representing the reason why a Redis client connection was closed. 

50 

51 Values: 

52 APPLICATION_CLOSE: The connection was closed intentionally by the application 

53 (for example, during normal shutdown or explicit cleanup). 

54 ERROR: The connection was closed due to an unexpected error 

55 (for example, network failure or protocol error). 

56 HEALTHCHECK_FAILED: The connection was closed because a health check 

57 or liveness check for the connection failed. 

58 """ 

59 

60 APPLICATION_CLOSE = "application_close" 

61 ERROR = "error" 

62 HEALTHCHECK_FAILED = "healthcheck_failed" 

63 

64 

65class RedisMetricsCollector: 

66 """ 

67 Collects and records OpenTelemetry metrics for Redis operations. 

68 

69 This class manages all metric instruments and provides methods to record 

70 various Redis operations including connection pool events, command execution, 

71 and cluster-specific operations. 

72 

73 Args: 

74 meter: OpenTelemetry Meter instance 

75 config: OTel configuration object 

76 """ 

77 

78 METER_NAME = "redis-py" 

79 METER_VERSION = "1.0.0" 

80 

81 def __init__(self, meter: Meter, config: OTelConfig): 

82 if not OTEL_AVAILABLE: 

83 raise ImportError( 

84 "OpenTelemetry API is not installed. " 

85 "Install it with: pip install opentelemetry-api" 

86 ) 

87 

88 self.meter = meter 

89 self.config = config 

90 self.attr_builder = AttributeBuilder() 

91 self.connection_count = None 

92 

93 # Initialize enabled metric instruments 

94 

95 if MetricGroup.RESILIENCY in self.config.metric_groups: 

96 self._init_resiliency_metrics() 

97 

98 if MetricGroup.COMMAND in self.config.metric_groups: 

99 self._init_command_metrics() 

100 

101 if MetricGroup.CONNECTION_BASIC in self.config.metric_groups: 

102 self._init_connection_basic_metrics() 

103 

104 if MetricGroup.CONNECTION_ADVANCED in self.config.metric_groups: 

105 self._init_connection_advanced_metrics() 

106 

107 if MetricGroup.PUBSUB in self.config.metric_groups: 

108 self._init_pubsub_metrics() 

109 

110 if MetricGroup.STREAMING in self.config.metric_groups: 

111 self._init_streaming_metrics() 

112 

113 if MetricGroup.CSC in self.config.metric_groups: 

114 self._init_csc_metrics() 

115 

116 logger.info("RedisMetricsCollector initialized") 

117 

118 def _init_resiliency_metrics(self) -> None: 

119 """Initialize resiliency metrics.""" 

120 self.client_errors = self.meter.create_counter( 

121 name="redis.client.errors", 

122 unit="{error}", 

123 description="A counter of all errors (both returned to the user and handled internally in the client library)", 

124 ) 

125 

126 self.maintenance_notifications = self.meter.create_counter( 

127 name="redis.client.maintenance.notifications", 

128 unit="{notification}", 

129 description="Tracks server-side maintenance notifications", 

130 ) 

131 

132 self.geo_failovers = self.meter.create_counter( 

133 name="redis.client.geofailover.failovers", 

134 unit="{geofailover}", 

135 description="Total count of failovers happened using MultiDbClient.", 

136 ) 

137 

138 def _init_connection_basic_metrics(self) -> None: 

139 """Initialize basic connection metrics.""" 

140 self.connection_create_time = self.meter.create_histogram( 

141 name="db.client.connection.create_time", 

142 unit="s", 

143 description="Time to create a new connection", 

144 explicit_bucket_boundaries_advisory=self.config.buckets_connection_create_time, 

145 ) 

146 

147 self.connection_relaxed_timeout = self.meter.create_up_down_counter( 

148 name="redis.client.connection.relaxed_timeout", 

149 unit="{relaxation}", 

150 description="Counts up for relaxed timeout, counts down for unrelaxed timeout", 

151 ) 

152 

153 self.connection_handoff = self.meter.create_counter( 

154 name="redis.client.connection.handoff", 

155 unit="{handoff}", 

156 description="Connections that have been handed off (e.g., after a MOVING notification)", 

157 ) 

158 

159 def _init_connection_advanced_metrics(self) -> None: 

160 """Initialize advanced connection metrics.""" 

161 self.connection_timeouts = self.meter.create_counter( 

162 name="db.client.connection.timeouts", 

163 unit="{timeout}", 

164 description="The number of connection timeouts that have occurred trying to obtain a connection from the pool.", 

165 ) 

166 

167 self.connection_wait_time = self.meter.create_histogram( 

168 name="db.client.connection.wait_time", 

169 unit="s", 

170 description="Time to obtain an open connection from the pool", 

171 explicit_bucket_boundaries_advisory=self.config.buckets_connection_wait_time, 

172 ) 

173 

174 self.connection_closed = self.meter.create_counter( 

175 name="redis.client.connection.closed", 

176 unit="{connection}", 

177 description="Total number of closed connections", 

178 ) 

179 

180 def _init_command_metrics(self) -> None: 

181 """Initialize command execution metric instruments.""" 

182 self.operation_duration = self.meter.create_histogram( 

183 name="db.client.operation.duration", 

184 unit="s", 

185 description="Command execution duration", 

186 explicit_bucket_boundaries_advisory=self.config.buckets_operation_duration, 

187 ) 

188 

189 def _init_pubsub_metrics(self) -> None: 

190 """Initialize PubSub metric instruments.""" 

191 self.pubsub_messages = self.meter.create_counter( 

192 name="redis.client.pubsub.messages", 

193 unit="{message}", 

194 description="Tracks published and received messages", 

195 ) 

196 

197 def _init_streaming_metrics(self) -> None: 

198 """Initialize Streaming metric instruments.""" 

199 self.stream_lag = self.meter.create_histogram( 

200 name="redis.client.stream.lag", 

201 unit="s", 

202 description="End-to-end lag per message, showing how stale are the messages when the application starts processing them.", 

203 explicit_bucket_boundaries_advisory=self.config.buckets_stream_processing_duration, 

204 ) 

205 

206 def _init_csc_metrics(self) -> None: 

207 """Initialize Client Side Caching (CSC) metric instruments.""" 

208 self.csc_requests = self.meter.create_counter( 

209 name="redis.client.csc.requests", 

210 unit="{request}", 

211 description="The total number of requests to the cache", 

212 ) 

213 

214 self.csc_evictions = self.meter.create_counter( 

215 name="redis.client.csc.evictions", 

216 unit="{eviction}", 

217 description="The total number of cache evictions", 

218 ) 

219 

220 self.csc_network_saved = self.meter.create_counter( 

221 name="redis.client.csc.network_saved", 

222 unit="By", 

223 description="The total number of bytes saved by using CSC", 

224 ) 

225 

226 # Resiliency metric recording methods 

227 

228 def record_error_count( 

229 self, 

230 server_address: Optional[str] = None, 

231 server_port: Optional[int] = None, 

232 network_peer_address: Optional[str] = None, 

233 network_peer_port: Optional[int] = None, 

234 error_type: Optional[Exception] = None, 

235 retry_attempts: Optional[int] = None, 

236 is_internal: Optional[bool] = None, 

237 ): 

238 """ 

239 Record error count 

240 

241 Args: 

242 server_address: Server address 

243 server_port: Server port 

244 network_peer_address: Network peer address 

245 network_peer_port: Network peer port 

246 error_type: Error type 

247 retry_attempts: Retry attempts 

248 is_internal: Whether the error is internal (e.g., timeout, network error) 

249 """ 

250 if not hasattr(self, "client_errors"): 

251 return 

252 

253 attrs = self.attr_builder.build_base_attributes( 

254 server_address=server_address, 

255 server_port=server_port, 

256 ) 

257 attrs.update( 

258 self.attr_builder.build_operation_attributes( 

259 network_peer_address=network_peer_address, 

260 network_peer_port=network_peer_port, 

261 retry_attempts=retry_attempts, 

262 ) 

263 ) 

264 

265 attrs.update( 

266 self.attr_builder.build_error_attributes( 

267 error_type=error_type, 

268 is_internal=is_internal, 

269 ) 

270 ) 

271 

272 self.client_errors.add(1, attributes=attrs) 

273 

274 def record_maint_notification_count( 

275 self, 

276 server_address: str, 

277 server_port: int, 

278 network_peer_address: str, 

279 network_peer_port: int, 

280 maint_notification: str, 

281 ): 

282 """ 

283 Record maintenance notification count 

284 

285 Args: 

286 server_address: Server address 

287 server_port: Server port 

288 network_peer_address: Network peer address 

289 network_peer_port: Network peer port 

290 maint_notification: Maintenance notification 

291 """ 

292 if not hasattr(self, "maintenance_notifications"): 

293 return 

294 

295 attrs = self.attr_builder.build_base_attributes( 

296 server_address=server_address, 

297 server_port=server_port, 

298 ) 

299 

300 attrs.update( 

301 self.attr_builder.build_operation_attributes( 

302 network_peer_address=network_peer_address, 

303 network_peer_port=network_peer_port, 

304 ) 

305 ) 

306 

307 attrs[REDIS_CLIENT_CONNECTION_NOTIFICATION] = maint_notification 

308 self.maintenance_notifications.add(1, attributes=attrs) 

309 

310 def record_geo_failover( 

311 self, 

312 fail_from: Union["SyncDatabase", "AsyncDatabase"], 

313 fail_to: Union["SyncDatabase", "AsyncDatabase"], 

314 reason: GeoFailoverReason, 

315 ): 

316 """ 

317 Record geo failover 

318 

319 Args: 

320 fail_from: Database failed from 

321 fail_to: Database failed to 

322 reason: Reason for the failover 

323 """ 

324 

325 if not hasattr(self, "geo_failovers"): 

326 return 

327 

328 attrs = self.attr_builder.build_geo_failover_attributes( 

329 fail_from=fail_from, 

330 fail_to=fail_to, 

331 reason=reason, 

332 ) 

333 

334 return self.geo_failovers.add(1, attributes=attrs) 

335 

336 def init_connection_count( 

337 self, 

338 callback: Callable, 

339 ) -> None: 

340 """ 

341 Initialize observable gauge for connection count metric. 

342 

343 Args: 

344 callback: Callback function to retrieve connection count 

345 """ 

346 if ( 

347 MetricGroup.CONNECTION_BASIC not in self.config.metric_groups 

348 and not self.connection_count 

349 ): 

350 return 

351 

352 self.connection_count = self.meter.create_observable_gauge( 

353 name="db.client.connection.count", 

354 unit="{connection}", 

355 description="Number of connections in the pool", 

356 callbacks=[callback], 

357 ) 

358 

359 def init_csc_items( 

360 self, 

361 callback: Callable, 

362 ) -> None: 

363 """ 

364 Initialize observable gauge for CSC items metric. 

365 

366 Args: 

367 callback: Callback function to retrieve CSC items count 

368 """ 

369 if MetricGroup.CSC not in self.config.metric_groups and not self.csc_items: 

370 return 

371 

372 self.csc_items = self.meter.create_observable_gauge( 

373 name="redis.client.csc.items", 

374 unit="{item}", 

375 description="The total number of cached responses currently stored", 

376 callbacks=[callback], 

377 ) 

378 

379 def record_connection_timeout(self, pool_name: str) -> None: 

380 """ 

381 Record a connection timeout event. 

382 

383 Args: 

384 pool_name: Connection pool name 

385 """ 

386 if not hasattr(self, "connection_timeouts"): 

387 return 

388 

389 attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name) 

390 self.connection_timeouts.add(1, attributes=attrs) 

391 

392 def record_connection_create_time( 

393 self, 

394 connection_pool: Union["ConnectionPoolInterface", "ConnectionPool"], 

395 duration_seconds: float, 

396 ) -> None: 

397 """ 

398 Record time taken to create a new connection. 

399 

400 Args: 

401 connection_pool: Connection pool implementation 

402 duration_seconds: Creation time in seconds 

403 """ 

404 if not hasattr(self, "connection_create_time"): 

405 return 

406 

407 attrs = self.attr_builder.build_connection_attributes( 

408 pool_name=get_pool_name(connection_pool) 

409 ) 

410 self.connection_create_time.record(duration_seconds, attributes=attrs) 

411 

412 def record_connection_wait_time( 

413 self, 

414 pool_name: str, 

415 duration_seconds: float, 

416 ) -> None: 

417 """ 

418 Record time taken to obtain a connection from the pool. 

419 

420 Args: 

421 pool_name: Connection pool name 

422 duration_seconds: Wait time in seconds 

423 """ 

424 if not hasattr(self, "connection_wait_time"): 

425 return 

426 

427 attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name) 

428 self.connection_wait_time.record(duration_seconds, attributes=attrs) 

429 

430 # Command execution metric recording methods 

431 

432 @deprecated_args( 

433 args_to_warn=["batch_size"], 

434 reason="The batch_size argument is no longer used and will be removed in the next major version.", 

435 version="7.2.1", 

436 ) 

437 def record_operation_duration( 

438 self, 

439 command_name: str, 

440 duration_seconds: float, 

441 server_address: Optional[str] = None, 

442 server_port: Optional[int] = None, 

443 db_namespace: Optional[int] = None, 

444 batch_size: Optional[int] = None, # noqa 

445 error_type: Optional[Exception] = None, 

446 network_peer_address: Optional[str] = None, 

447 network_peer_port: Optional[int] = None, 

448 retry_attempts: Optional[int] = None, 

449 is_blocking: Optional[bool] = None, 

450 ) -> None: 

451 """ 

452 Record command execution duration. 

453 

454 Args: 

455 command_name: Redis command name (e.g., 'GET', 'SET', 'MULTI') 

456 duration_seconds: Execution time in seconds 

457 server_address: Redis server address 

458 server_port: Redis server port 

459 db_namespace: Redis database index 

460 batch_size: Number of commands in batch (for pipelines/transactions) 

461 error_type: Error type if operation failed 

462 network_peer_address: Resolved peer address 

463 network_peer_port: Peer port number 

464 retry_attempts: Number of retry attempts made 

465 is_blocking: Whether the operation is a blocking command 

466 """ 

467 if not hasattr(self, "operation_duration"): 

468 return 

469 

470 # Check if this command should be tracked 

471 if not self.config.should_track_command(command_name): 

472 return 

473 

474 # Build attributes 

475 attrs = self.attr_builder.build_base_attributes( 

476 server_address=server_address, 

477 server_port=server_port, 

478 db_namespace=db_namespace, 

479 ) 

480 

481 attrs.update( 

482 self.attr_builder.build_operation_attributes( 

483 command_name=command_name, 

484 network_peer_address=network_peer_address, 

485 network_peer_port=network_peer_port, 

486 retry_attempts=retry_attempts, 

487 is_blocking=is_blocking, 

488 ) 

489 ) 

490 

491 attrs.update( 

492 self.attr_builder.build_error_attributes( 

493 error_type=error_type, 

494 ) 

495 ) 

496 self.operation_duration.record(duration_seconds, attributes=attrs) 

497 

498 def record_connection_closed( 

499 self, 

500 close_reason: Optional[CloseReason] = None, 

501 error_type: Optional[Exception] = None, 

502 ) -> None: 

503 """ 

504 Record a connection closed event. 

505 

506 Args: 

507 close_reason: Reason for closing (e.g. 'error', 'application_close') 

508 error_type: Error type if closed due to error 

509 """ 

510 if not hasattr(self, "connection_closed"): 

511 return 

512 

513 attrs = self.attr_builder.build_connection_attributes() 

514 if close_reason: 

515 attrs[REDIS_CLIENT_CONNECTION_CLOSE_REASON] = close_reason.value 

516 

517 attrs.update( 

518 self.attr_builder.build_error_attributes( 

519 error_type=error_type, 

520 ) 

521 ) 

522 

523 self.connection_closed.add(1, attributes=attrs) 

524 

525 def record_connection_relaxed_timeout( 

526 self, 

527 connection_name: str, 

528 maint_notification: str, 

529 relaxed: bool, 

530 ) -> None: 

531 """ 

532 Record a connection timeout relaxation event. 

533 

534 Args: 

535 connection_name: Connection name 

536 maint_notification: Maintenance notification type 

537 relaxed: True to count up (relaxed), False to count down (unrelaxed) 

538 """ 

539 if not hasattr(self, "connection_relaxed_timeout"): 

540 return 

541 

542 attrs = self.attr_builder.build_connection_attributes( 

543 connection_name=connection_name 

544 ) 

545 attrs[REDIS_CLIENT_CONNECTION_NOTIFICATION] = maint_notification 

546 self.connection_relaxed_timeout.add(1 if relaxed else -1, attributes=attrs) 

547 

548 def record_connection_handoff( 

549 self, 

550 pool_name: str, 

551 ) -> None: 

552 """ 

553 Record a connection handoff event (e.g., after MOVING notification). 

554 

555 Args: 

556 pool_name: Connection pool name 

557 """ 

558 if not hasattr(self, "connection_handoff"): 

559 return 

560 

561 attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name) 

562 self.connection_handoff.add(1, attributes=attrs) 

563 

564 # PubSub metric recording methods 

565 

566 def record_pubsub_message( 

567 self, 

568 direction: PubSubDirection, 

569 channel: Optional[str] = None, 

570 sharded: Optional[bool] = None, 

571 ) -> None: 

572 """ 

573 Record a PubSub message (published or received). 

574 

575 Args: 

576 direction: Message direction ('publish' or 'receive') 

577 channel: Pub/Sub channel name 

578 sharded: True if sharded Pub/Sub channel 

579 """ 

580 if not hasattr(self, "pubsub_messages"): 

581 return 

582 

583 attrs = self.attr_builder.build_pubsub_message_attributes( 

584 direction=direction, 

585 channel=channel, 

586 sharded=sharded, 

587 ) 

588 self.pubsub_messages.add(1, attributes=attrs) 

589 

590 # Streaming metric recording methods 

591 

592 @deprecated_args( 

593 args_to_warn=["consumer_name"], 

594 reason="The consumer_name argument is no longer used and will be removed in the next major version.", 

595 version="7.2.1", 

596 ) 

597 def record_streaming_lag( 

598 self, 

599 lag_seconds: float, 

600 stream_name: Optional[str] = None, 

601 consumer_group: Optional[str] = None, 

602 consumer_name: Optional[str] = None, # noqa 

603 ) -> None: 

604 """ 

605 Record the lag of a streaming message. 

606 

607 Args: 

608 lag_seconds: Lag in seconds 

609 stream_name: Stream name 

610 consumer_group: Consumer group name 

611 consumer_name: Consumer name 

612 """ 

613 if not hasattr(self, "stream_lag"): 

614 return 

615 

616 attrs = self.attr_builder.build_streaming_attributes( 

617 stream_name=stream_name, 

618 consumer_group=consumer_group, 

619 ) 

620 self.stream_lag.record(lag_seconds, attributes=attrs) 

621 

622 # CSC metric recording methods 

623 

624 def record_csc_request( 

625 self, 

626 result: Optional[CSCResult] = None, 

627 ) -> None: 

628 """ 

629 Record a Client Side Caching (CSC) request. 

630 

631 Args: 

632 result: CSC result ('hit' or 'miss') 

633 """ 

634 if not hasattr(self, "csc_requests"): 

635 return 

636 

637 attrs = self.attr_builder.build_csc_attributes(result=result) 

638 self.csc_requests.add(1, attributes=attrs) 

639 

640 def record_csc_eviction( 

641 self, 

642 count: int, 

643 reason: Optional[CSCReason] = None, 

644 ) -> None: 

645 """ 

646 Record a Client Side Caching (CSC) eviction. 

647 

648 Args: 

649 count: Number of evictions 

650 reason: Reason for eviction 

651 """ 

652 if not hasattr(self, "csc_evictions"): 

653 return 

654 

655 attrs = self.attr_builder.build_csc_attributes(reason=reason) 

656 self.csc_evictions.add(count, attributes=attrs) 

657 

658 def record_csc_network_saved( 

659 self, 

660 bytes_saved: int, 

661 ) -> None: 

662 """ 

663 Record the number of bytes saved by using Client Side Caching (CSC). 

664 

665 Args: 

666 bytes_saved: Number of bytes saved 

667 """ 

668 if not hasattr(self, "csc_network_saved"): 

669 return 

670 

671 attrs = self.attr_builder.build_csc_attributes() 

672 self.csc_network_saved.add(bytes_saved, attributes=attrs) 

673 

674 # Utility methods 

675 

676 @staticmethod 

677 def monotonic_time() -> float: 

678 """ 

679 Get monotonic time for duration measurements. 

680 

681 Returns: 

682 Current monotonic time in seconds 

683 """ 

684 return time.monotonic() 

685 

686 def __repr__(self) -> str: 

687 return f"RedisMetricsCollector(meter={self.meter}, config={self.config})"