1"""
2OpenTelemetry metrics collector for redis-py.
3
4This module defines and manages all metric instruments according to
5OTel semantic conventions for database clients.
6"""
7
8import logging
9import time
10from enum import Enum
11from typing import TYPE_CHECKING, Callable, Optional, Union
12
13if TYPE_CHECKING:
14 from redis.asyncio.connection import ConnectionPool
15 from redis.asyncio.multidb.database import AsyncDatabase
16 from redis.connection import ConnectionPoolInterface
17 from redis.multidb.database import SyncDatabase
18
19from redis.observability.attributes import (
20 REDIS_CLIENT_CONNECTION_CLOSE_REASON,
21 REDIS_CLIENT_CONNECTION_NOTIFICATION,
22 AttributeBuilder,
23 ConnectionState,
24 CSCReason,
25 CSCResult,
26 GeoFailoverReason,
27 PubSubDirection,
28 get_pool_name,
29)
30from redis.observability.config import MetricGroup, OTelConfig
31from redis.utils import deprecated_args, deprecated_function
32
33logger = logging.getLogger(__name__)
34
35# Optional imports - OTel SDK may not be installed
36try:
37 from opentelemetry.metrics import Meter
38
39 OTEL_AVAILABLE = True
40except ImportError:
41 OTEL_AVAILABLE = False
42 Counter = None
43 Histogram = None
44 Meter = None
45 UpDownCounter = None
46
47
48class CloseReason(Enum):
49 """
50 Enum representing the reason why a Redis client connection was closed.
51
52 Values:
53 APPLICATION_CLOSE: The connection was closed intentionally by the application
54 (for example, during normal shutdown or explicit cleanup).
55 ERROR: The connection was closed due to an unexpected error
56 (for example, network failure or protocol error).
57 HEALTHCHECK_FAILED: The connection was closed because a health check
58 or liveness check for the connection failed.
59 """
60
61 APPLICATION_CLOSE = "application_close"
62 ERROR = "error"
63 HEALTHCHECK_FAILED = "healthcheck_failed"
64
65
66class RedisMetricsCollector:
67 """
68 Collects and records OpenTelemetry metrics for Redis operations.
69
70 This class manages all metric instruments and provides methods to record
71 various Redis operations including connection pool events, command execution,
72 and cluster-specific operations.
73
74 Args:
75 meter: OpenTelemetry Meter instance
76 config: OTel configuration object
77 """
78
79 METER_NAME = "redis-py"
80 METER_VERSION = "1.0.0"
81
82 def __init__(self, meter: Meter, config: OTelConfig):
83 if not OTEL_AVAILABLE:
84 raise ImportError(
85 "OpenTelemetry API is not installed. "
86 "Install it with: pip install opentelemetry-api"
87 )
88
89 self.meter = meter
90 self.config = config
91 self.attr_builder = AttributeBuilder()
92
93 # Initialize enabled metric instruments
94
95 if MetricGroup.RESILIENCY in self.config.metric_groups:
96 self._init_resiliency_metrics()
97
98 if MetricGroup.COMMAND in self.config.metric_groups:
99 self._init_command_metrics()
100
101 if MetricGroup.CONNECTION_BASIC in self.config.metric_groups:
102 self._init_connection_basic_metrics()
103
104 if MetricGroup.CONNECTION_ADVANCED in self.config.metric_groups:
105 self._init_connection_advanced_metrics()
106
107 if MetricGroup.PUBSUB in self.config.metric_groups:
108 self._init_pubsub_metrics()
109
110 if MetricGroup.STREAMING in self.config.metric_groups:
111 self._init_streaming_metrics()
112
113 if MetricGroup.CSC in self.config.metric_groups:
114 self._init_csc_metrics()
115
116 logger.info("RedisMetricsCollector initialized")
117
118 def _init_resiliency_metrics(self) -> None:
119 """Initialize resiliency metrics."""
120 self.client_errors = self.meter.create_counter(
121 name="redis.client.errors",
122 unit="{error}",
123 description="A counter of all errors (both returned to the user and handled internally in the client library)",
124 )
125
126 self.maintenance_notifications = self.meter.create_counter(
127 name="redis.client.maintenance.notifications",
128 unit="{notification}",
129 description="Tracks server-side maintenance notifications",
130 )
131
132 self.geo_failovers = self.meter.create_counter(
133 name="redis.client.geofailover.failovers",
134 unit="{geofailover}",
135 description="Total count of failovers happened using MultiDbClient.",
136 )
137
138 def _init_connection_basic_metrics(self) -> None:
139 """Initialize basic connection metrics."""
140 self.connection_create_time = self.meter.create_histogram(
141 name="db.client.connection.create_time",
142 unit="s",
143 description="Time to create a new connection",
144 explicit_bucket_boundaries_advisory=self.config.buckets_connection_create_time,
145 )
146
147 self.connection_relaxed_timeout = self.meter.create_up_down_counter(
148 name="redis.client.connection.relaxed_timeout",
149 unit="{relaxation}",
150 description="Counts up for relaxed timeout, counts down for unrelaxed timeout",
151 )
152
153 self.connection_handoff = self.meter.create_counter(
154 name="redis.client.connection.handoff",
155 unit="{handoff}",
156 description="Connections that have been handed off (e.g., after a MOVING notification)",
157 )
158
159 # DEPRECATED: This attribute is kept for backward compatibility.
160 # It requires manual initialization via init_connection_count() with a callback.
161 # Use connection_count_updown instead for push-based tracking.
162 # Will be removed in the next major version.
163 self.connection_count = None
164
165 # New push-based connection count tracking via UpDownCounter
166 self.connection_count_updown = self.meter.create_up_down_counter(
167 name="db.client.connection.count",
168 unit="{connection}",
169 description="Number of connections currently in the pool by state",
170 )
171
172 def _init_connection_advanced_metrics(self) -> None:
173 """Initialize advanced connection metrics."""
174 self.connection_timeouts = self.meter.create_counter(
175 name="db.client.connection.timeouts",
176 unit="{timeout}",
177 description="The number of connection timeouts that have occurred trying to obtain a connection from the pool.",
178 )
179
180 self.connection_wait_time = self.meter.create_histogram(
181 name="db.client.connection.wait_time",
182 unit="s",
183 description="Time to obtain an open connection from the pool",
184 explicit_bucket_boundaries_advisory=self.config.buckets_connection_wait_time,
185 )
186
187 self.connection_closed = self.meter.create_counter(
188 name="redis.client.connection.closed",
189 unit="{connection}",
190 description="Total number of closed connections",
191 )
192
193 def _init_command_metrics(self) -> None:
194 """Initialize command execution metric instruments."""
195 self.operation_duration = self.meter.create_histogram(
196 name="db.client.operation.duration",
197 unit="s",
198 description="Command execution duration",
199 explicit_bucket_boundaries_advisory=self.config.buckets_operation_duration,
200 )
201
202 def _init_pubsub_metrics(self) -> None:
203 """Initialize PubSub metric instruments."""
204 self.pubsub_messages = self.meter.create_counter(
205 name="redis.client.pubsub.messages",
206 unit="{message}",
207 description="Tracks published and received messages",
208 )
209
210 def _init_streaming_metrics(self) -> None:
211 """Initialize Streaming metric instruments."""
212 self.stream_lag = self.meter.create_histogram(
213 name="redis.client.stream.lag",
214 unit="s",
215 description="End-to-end lag per message, showing how stale are the messages when the application starts processing them.",
216 explicit_bucket_boundaries_advisory=self.config.buckets_stream_processing_duration,
217 )
218
219 def _init_csc_metrics(self) -> None:
220 """Initialize Client Side Caching (CSC) metric instruments."""
221 self.csc_requests = self.meter.create_counter(
222 name="redis.client.csc.requests",
223 unit="{request}",
224 description="The total number of requests to the cache",
225 )
226
227 self.csc_evictions = self.meter.create_counter(
228 name="redis.client.csc.evictions",
229 unit="{eviction}",
230 description="The total number of cache evictions",
231 )
232
233 self.csc_network_saved = self.meter.create_counter(
234 name="redis.client.csc.network_saved",
235 unit="By",
236 description="The total number of bytes saved by using CSC",
237 )
238
239 # Resiliency metric recording methods
240
241 def record_error_count(
242 self,
243 server_address: Optional[str] = None,
244 server_port: Optional[int] = None,
245 network_peer_address: Optional[str] = None,
246 network_peer_port: Optional[int] = None,
247 error_type: Optional[Exception] = None,
248 retry_attempts: Optional[int] = None,
249 is_internal: Optional[bool] = None,
250 ):
251 """
252 Record error count
253
254 Args:
255 server_address: Server address
256 server_port: Server port
257 network_peer_address: Network peer address
258 network_peer_port: Network peer port
259 error_type: Error type
260 retry_attempts: Retry attempts
261 is_internal: Whether the error is internal (e.g., timeout, network error)
262 """
263 if not hasattr(self, "client_errors"):
264 return
265
266 attrs = self.attr_builder.build_base_attributes(
267 server_address=server_address,
268 server_port=server_port,
269 )
270 attrs.update(
271 self.attr_builder.build_operation_attributes(
272 network_peer_address=network_peer_address,
273 network_peer_port=network_peer_port,
274 retry_attempts=retry_attempts,
275 )
276 )
277
278 attrs.update(
279 self.attr_builder.build_error_attributes(
280 error_type=error_type,
281 is_internal=is_internal,
282 )
283 )
284
285 self.client_errors.add(1, attributes=attrs)
286
287 def record_maint_notification_count(
288 self,
289 server_address: str,
290 server_port: int,
291 network_peer_address: str,
292 network_peer_port: int,
293 maint_notification: str,
294 ):
295 """
296 Record maintenance notification count
297
298 Args:
299 server_address: Server address
300 server_port: Server port
301 network_peer_address: Network peer address
302 network_peer_port: Network peer port
303 maint_notification: Maintenance notification
304 """
305 if not hasattr(self, "maintenance_notifications"):
306 return
307
308 attrs = self.attr_builder.build_base_attributes(
309 server_address=server_address,
310 server_port=server_port,
311 )
312
313 attrs.update(
314 self.attr_builder.build_operation_attributes(
315 network_peer_address=network_peer_address,
316 network_peer_port=network_peer_port,
317 )
318 )
319
320 attrs[REDIS_CLIENT_CONNECTION_NOTIFICATION] = maint_notification
321 self.maintenance_notifications.add(1, attributes=attrs)
322
323 def record_geo_failover(
324 self,
325 fail_from: Union["SyncDatabase", "AsyncDatabase"],
326 fail_to: Union["SyncDatabase", "AsyncDatabase"],
327 reason: GeoFailoverReason,
328 ):
329 """
330 Record geo failover
331
332 Args:
333 fail_from: Database failed from
334 fail_to: Database failed to
335 reason: Reason for the failover
336 """
337
338 if not hasattr(self, "geo_failovers"):
339 return
340
341 attrs = self.attr_builder.build_geo_failover_attributes(
342 fail_from=fail_from,
343 fail_to=fail_to,
344 reason=reason,
345 )
346
347 return self.geo_failovers.add(1, attributes=attrs)
348
349 def record_connection_count(
350 self,
351 pool_name: str,
352 connection_state: ConnectionState,
353 counter: int = 1,
354 ) -> None:
355 """
356 Record a connection count change for a single state.
357
358 Args:
359 pool_name: Connection pool name
360 connection_state: State to update (IDLE or USED)
361 counter: Number to add (positive) or subtract (negative)
362 """
363 if not hasattr(self, "connection_count_updown"):
364 return
365
366 attrs = self.attr_builder.build_connection_attributes(
367 pool_name=pool_name,
368 connection_state=connection_state,
369 )
370 self.connection_count_updown.add(counter, attributes=attrs)
371
372 @deprecated_function(
373 reason="Connection count is now tracked via record_connection_count(). "
374 "This functionality will be removed in the next major version",
375 version="7.4.0",
376 )
377 def init_connection_count(
378 self,
379 callback: Callable,
380 ) -> None:
381 """
382 Initialize observable gauge for connection count metric.
383
384 Args:
385 callback: Callback function to retrieve connection counts
386 """
387 if MetricGroup.CONNECTION_BASIC not in self.config.metric_groups:
388 return
389
390 # DEPRECATED: Create observable gauge for backward compatibility
391 # This gauge uses a different metric name to avoid conflicts with
392 # the new push-based connection_count_updown counter
393 self.connection_count = self.meter.create_observable_gauge(
394 name="db.client.connection.count.deprecated",
395 unit="{connection}",
396 description="The number of connections that are currently in state "
397 "described by the state attribute (deprecated - use db.client.connection.count instead)",
398 callbacks=[callback],
399 )
400
401 def init_csc_items(
402 self,
403 callback: Callable,
404 ) -> None:
405 """
406 Initialize observable gauge for CSC items metric.
407
408 Args:
409 callback: Callback function to retrieve CSC items count
410 """
411 if MetricGroup.CSC not in self.config.metric_groups and not self.csc_items:
412 return
413
414 self.csc_items = self.meter.create_observable_gauge(
415 name="redis.client.csc.items",
416 unit="{item}",
417 description="The total number of cached responses currently stored",
418 callbacks=[callback],
419 )
420
421 def record_connection_timeout(self, pool_name: str) -> None:
422 """
423 Record a connection timeout event.
424
425 Args:
426 pool_name: Connection pool name
427 """
428 if not hasattr(self, "connection_timeouts"):
429 return
430
431 attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name)
432 self.connection_timeouts.add(1, attributes=attrs)
433
434 def record_connection_create_time(
435 self,
436 connection_pool: Union["ConnectionPoolInterface", "ConnectionPool"],
437 duration_seconds: float,
438 ) -> None:
439 """
440 Record time taken to create a new connection.
441
442 Args:
443 connection_pool: Connection pool implementation
444 duration_seconds: Creation time in seconds
445 """
446 if not hasattr(self, "connection_create_time"):
447 return
448
449 attrs = self.attr_builder.build_connection_attributes(
450 pool_name=get_pool_name(connection_pool)
451 )
452 self.connection_create_time.record(duration_seconds, attributes=attrs)
453
454 def record_connection_wait_time(
455 self,
456 pool_name: str,
457 duration_seconds: float,
458 ) -> None:
459 """
460 Record time taken to obtain a connection from the pool.
461
462 Args:
463 pool_name: Connection pool name
464 duration_seconds: Wait time in seconds
465 """
466 if not hasattr(self, "connection_wait_time"):
467 return
468
469 attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name)
470 self.connection_wait_time.record(duration_seconds, attributes=attrs)
471
472 # Command execution metric recording methods
473
474 @deprecated_args(
475 args_to_warn=["batch_size"],
476 reason="The batch_size argument is no longer used and will be removed in the next major version.",
477 version="7.2.1",
478 )
479 def record_operation_duration(
480 self,
481 command_name: str,
482 duration_seconds: float,
483 server_address: Optional[str] = None,
484 server_port: Optional[int] = None,
485 db_namespace: Optional[int] = None,
486 batch_size: Optional[int] = None, # noqa
487 error_type: Optional[Exception] = None,
488 network_peer_address: Optional[str] = None,
489 network_peer_port: Optional[int] = None,
490 retry_attempts: Optional[int] = None,
491 is_blocking: Optional[bool] = None,
492 ) -> None:
493 """
494 Record command execution duration.
495
496 Args:
497 command_name: Redis command name (e.g., 'GET', 'SET', 'MULTI')
498 duration_seconds: Execution time in seconds
499 server_address: Redis server address
500 server_port: Redis server port
501 db_namespace: Redis database index
502 batch_size: Number of commands in batch (for pipelines/transactions)
503 error_type: Error type if operation failed
504 network_peer_address: Resolved peer address
505 network_peer_port: Peer port number
506 retry_attempts: Number of retry attempts made
507 is_blocking: Whether the operation is a blocking command
508 """
509 if not hasattr(self, "operation_duration"):
510 return
511
512 # Check if this command should be tracked
513 if not self.config.should_track_command(command_name):
514 return
515
516 # Build attributes
517 attrs = self.attr_builder.build_base_attributes(
518 server_address=server_address,
519 server_port=server_port,
520 db_namespace=db_namespace,
521 )
522
523 attrs.update(
524 self.attr_builder.build_operation_attributes(
525 command_name=command_name,
526 network_peer_address=network_peer_address,
527 network_peer_port=network_peer_port,
528 retry_attempts=retry_attempts,
529 is_blocking=is_blocking,
530 )
531 )
532
533 attrs.update(
534 self.attr_builder.build_error_attributes(
535 error_type=error_type,
536 )
537 )
538 self.operation_duration.record(duration_seconds, attributes=attrs)
539
540 def record_connection_closed(
541 self,
542 close_reason: Optional[CloseReason] = None,
543 error_type: Optional[Exception] = None,
544 ) -> None:
545 """
546 Record a connection closed event.
547
548 Args:
549 close_reason: Reason for closing (e.g. 'error', 'application_close')
550 error_type: Error type if closed due to error
551 """
552 if not hasattr(self, "connection_closed"):
553 return
554
555 attrs = self.attr_builder.build_connection_attributes()
556 if close_reason:
557 attrs[REDIS_CLIENT_CONNECTION_CLOSE_REASON] = close_reason.value
558
559 attrs.update(
560 self.attr_builder.build_error_attributes(
561 error_type=error_type,
562 )
563 )
564
565 self.connection_closed.add(1, attributes=attrs)
566
567 def record_connection_relaxed_timeout(
568 self,
569 connection_name: str,
570 maint_notification: str,
571 relaxed: bool,
572 ) -> None:
573 """
574 Record a connection timeout relaxation event.
575
576 Args:
577 connection_name: Connection name
578 maint_notification: Maintenance notification type
579 relaxed: True to count up (relaxed), False to count down (unrelaxed)
580 """
581 if not hasattr(self, "connection_relaxed_timeout"):
582 return
583
584 attrs = self.attr_builder.build_connection_attributes(pool_name=connection_name)
585 attrs[REDIS_CLIENT_CONNECTION_NOTIFICATION] = maint_notification
586 self.connection_relaxed_timeout.add(1 if relaxed else -1, attributes=attrs)
587
588 def record_connection_handoff(
589 self,
590 pool_name: str,
591 ) -> None:
592 """
593 Record a connection handoff event (e.g., after MOVING notification).
594
595 Args:
596 pool_name: Connection pool name
597 """
598 if not hasattr(self, "connection_handoff"):
599 return
600
601 attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name)
602 self.connection_handoff.add(1, attributes=attrs)
603
604 # PubSub metric recording methods
605
606 def record_pubsub_message(
607 self,
608 direction: PubSubDirection,
609 channel: Optional[str] = None,
610 sharded: Optional[bool] = None,
611 ) -> None:
612 """
613 Record a PubSub message (published or received).
614
615 Args:
616 direction: Message direction ('publish' or 'receive')
617 channel: Pub/Sub channel name
618 sharded: True if sharded Pub/Sub channel
619 """
620 if not hasattr(self, "pubsub_messages"):
621 return
622
623 attrs = self.attr_builder.build_pubsub_message_attributes(
624 direction=direction,
625 channel=channel,
626 sharded=sharded,
627 )
628 self.pubsub_messages.add(1, attributes=attrs)
629
630 # Streaming metric recording methods
631
632 @deprecated_args(
633 args_to_warn=["consumer_name"],
634 reason="The consumer_name argument is no longer used and will be removed in the next major version.",
635 version="7.2.1",
636 )
637 def record_streaming_lag(
638 self,
639 lag_seconds: float,
640 stream_name: Optional[str] = None,
641 consumer_group: Optional[str] = None,
642 consumer_name: Optional[str] = None, # noqa
643 ) -> None:
644 """
645 Record the lag of a streaming message.
646
647 Args:
648 lag_seconds: Lag in seconds
649 stream_name: Stream name
650 consumer_group: Consumer group name
651 consumer_name: Consumer name
652 """
653 if not hasattr(self, "stream_lag"):
654 return
655
656 attrs = self.attr_builder.build_streaming_attributes(
657 stream_name=stream_name,
658 consumer_group=consumer_group,
659 )
660 self.stream_lag.record(lag_seconds, attributes=attrs)
661
662 # CSC metric recording methods
663
664 def record_csc_request(
665 self,
666 result: Optional[CSCResult] = None,
667 ) -> None:
668 """
669 Record a Client Side Caching (CSC) request.
670
671 Args:
672 result: CSC result ('hit' or 'miss')
673 """
674 if not hasattr(self, "csc_requests"):
675 return
676
677 attrs = self.attr_builder.build_csc_attributes(result=result)
678 self.csc_requests.add(1, attributes=attrs)
679
680 def record_csc_eviction(
681 self,
682 count: int,
683 reason: Optional[CSCReason] = None,
684 ) -> None:
685 """
686 Record a Client Side Caching (CSC) eviction.
687
688 Args:
689 count: Number of evictions
690 reason: Reason for eviction
691 """
692 if not hasattr(self, "csc_evictions"):
693 return
694
695 attrs = self.attr_builder.build_csc_attributes(reason=reason)
696 self.csc_evictions.add(count, attributes=attrs)
697
698 def record_csc_network_saved(
699 self,
700 bytes_saved: int,
701 ) -> None:
702 """
703 Record the number of bytes saved by using Client Side Caching (CSC).
704
705 Args:
706 bytes_saved: Number of bytes saved
707 """
708 if not hasattr(self, "csc_network_saved"):
709 return
710
711 attrs = self.attr_builder.build_csc_attributes()
712 self.csc_network_saved.add(bytes_saved, attributes=attrs)
713
714 # Utility methods
715
716 @staticmethod
717 def monotonic_time() -> float:
718 """
719 Get monotonic time for duration measurements.
720
721 Returns:
722 Current monotonic time in seconds
723 """
724 return time.monotonic()
725
726 def __repr__(self) -> str:
727 return f"RedisMetricsCollector(meter={self.meter}, config={self.config})"