1"""
2OpenTelemetry metrics collector for redis-py.
3
4This module defines and manages all metric instruments according to
5OTel semantic conventions for database clients.
6"""
7
8import logging
9import time
10from enum import Enum
11from typing import TYPE_CHECKING, Callable, Optional, Union
12
13if TYPE_CHECKING:
14 from redis.asyncio.connection import ConnectionPool
15 from redis.asyncio.multidb.database import AsyncDatabase
16 from redis.connection import ConnectionPoolInterface
17 from redis.multidb.database import SyncDatabase
18
19from redis.observability.attributes import (
20 REDIS_CLIENT_CONNECTION_CLOSE_REASON,
21 REDIS_CLIENT_CONNECTION_NOTIFICATION,
22 AttributeBuilder,
23 CSCReason,
24 CSCResult,
25 GeoFailoverReason,
26 PubSubDirection,
27 get_pool_name,
28)
29from redis.observability.config import MetricGroup, OTelConfig
30from redis.utils import deprecated_args
31
32logger = logging.getLogger(__name__)
33
34# Optional imports - OTel SDK may not be installed
35try:
36 from opentelemetry.metrics import Meter
37
38 OTEL_AVAILABLE = True
39except ImportError:
40 OTEL_AVAILABLE = False
41 Counter = None
42 Histogram = None
43 Meter = None
44 UpDownCounter = None
45
46
47class CloseReason(Enum):
48 """
49 Enum representing the reason why a Redis client connection was closed.
50
51 Values:
52 APPLICATION_CLOSE: The connection was closed intentionally by the application
53 (for example, during normal shutdown or explicit cleanup).
54 ERROR: The connection was closed due to an unexpected error
55 (for example, network failure or protocol error).
56 HEALTHCHECK_FAILED: The connection was closed because a health check
57 or liveness check for the connection failed.
58 """
59
60 APPLICATION_CLOSE = "application_close"
61 ERROR = "error"
62 HEALTHCHECK_FAILED = "healthcheck_failed"
63
64
65class RedisMetricsCollector:
66 """
67 Collects and records OpenTelemetry metrics for Redis operations.
68
69 This class manages all metric instruments and provides methods to record
70 various Redis operations including connection pool events, command execution,
71 and cluster-specific operations.
72
73 Args:
74 meter: OpenTelemetry Meter instance
75 config: OTel configuration object
76 """
77
78 METER_NAME = "redis-py"
79 METER_VERSION = "1.0.0"
80
81 def __init__(self, meter: Meter, config: OTelConfig):
82 if not OTEL_AVAILABLE:
83 raise ImportError(
84 "OpenTelemetry API is not installed. "
85 "Install it with: pip install opentelemetry-api"
86 )
87
88 self.meter = meter
89 self.config = config
90 self.attr_builder = AttributeBuilder()
91 self.connection_count = None
92
93 # Initialize enabled metric instruments
94
95 if MetricGroup.RESILIENCY in self.config.metric_groups:
96 self._init_resiliency_metrics()
97
98 if MetricGroup.COMMAND in self.config.metric_groups:
99 self._init_command_metrics()
100
101 if MetricGroup.CONNECTION_BASIC in self.config.metric_groups:
102 self._init_connection_basic_metrics()
103
104 if MetricGroup.CONNECTION_ADVANCED in self.config.metric_groups:
105 self._init_connection_advanced_metrics()
106
107 if MetricGroup.PUBSUB in self.config.metric_groups:
108 self._init_pubsub_metrics()
109
110 if MetricGroup.STREAMING in self.config.metric_groups:
111 self._init_streaming_metrics()
112
113 if MetricGroup.CSC in self.config.metric_groups:
114 self._init_csc_metrics()
115
116 logger.info("RedisMetricsCollector initialized")
117
118 def _init_resiliency_metrics(self) -> None:
119 """Initialize resiliency metrics."""
120 self.client_errors = self.meter.create_counter(
121 name="redis.client.errors",
122 unit="{error}",
123 description="A counter of all errors (both returned to the user and handled internally in the client library)",
124 )
125
126 self.maintenance_notifications = self.meter.create_counter(
127 name="redis.client.maintenance.notifications",
128 unit="{notification}",
129 description="Tracks server-side maintenance notifications",
130 )
131
132 self.geo_failovers = self.meter.create_counter(
133 name="redis.client.geofailover.failovers",
134 unit="{geofailover}",
135 description="Total count of failovers happened using MultiDbClient.",
136 )
137
138 def _init_connection_basic_metrics(self) -> None:
139 """Initialize basic connection metrics."""
140 self.connection_create_time = self.meter.create_histogram(
141 name="db.client.connection.create_time",
142 unit="s",
143 description="Time to create a new connection",
144 explicit_bucket_boundaries_advisory=self.config.buckets_connection_create_time,
145 )
146
147 self.connection_relaxed_timeout = self.meter.create_up_down_counter(
148 name="redis.client.connection.relaxed_timeout",
149 unit="{relaxation}",
150 description="Counts up for relaxed timeout, counts down for unrelaxed timeout",
151 )
152
153 self.connection_handoff = self.meter.create_counter(
154 name="redis.client.connection.handoff",
155 unit="{handoff}",
156 description="Connections that have been handed off (e.g., after a MOVING notification)",
157 )
158
159 def _init_connection_advanced_metrics(self) -> None:
160 """Initialize advanced connection metrics."""
161 self.connection_timeouts = self.meter.create_counter(
162 name="db.client.connection.timeouts",
163 unit="{timeout}",
164 description="The number of connection timeouts that have occurred trying to obtain a connection from the pool.",
165 )
166
167 self.connection_wait_time = self.meter.create_histogram(
168 name="db.client.connection.wait_time",
169 unit="s",
170 description="Time to obtain an open connection from the pool",
171 explicit_bucket_boundaries_advisory=self.config.buckets_connection_wait_time,
172 )
173
174 self.connection_closed = self.meter.create_counter(
175 name="redis.client.connection.closed",
176 unit="{connection}",
177 description="Total number of closed connections",
178 )
179
180 def _init_command_metrics(self) -> None:
181 """Initialize command execution metric instruments."""
182 self.operation_duration = self.meter.create_histogram(
183 name="db.client.operation.duration",
184 unit="s",
185 description="Command execution duration",
186 explicit_bucket_boundaries_advisory=self.config.buckets_operation_duration,
187 )
188
189 def _init_pubsub_metrics(self) -> None:
190 """Initialize PubSub metric instruments."""
191 self.pubsub_messages = self.meter.create_counter(
192 name="redis.client.pubsub.messages",
193 unit="{message}",
194 description="Tracks published and received messages",
195 )
196
197 def _init_streaming_metrics(self) -> None:
198 """Initialize Streaming metric instruments."""
199 self.stream_lag = self.meter.create_histogram(
200 name="redis.client.stream.lag",
201 unit="s",
202 description="End-to-end lag per message, showing how stale are the messages when the application starts processing them.",
203 explicit_bucket_boundaries_advisory=self.config.buckets_stream_processing_duration,
204 )
205
206 def _init_csc_metrics(self) -> None:
207 """Initialize Client Side Caching (CSC) metric instruments."""
208 self.csc_requests = self.meter.create_counter(
209 name="redis.client.csc.requests",
210 unit="{request}",
211 description="The total number of requests to the cache",
212 )
213
214 self.csc_evictions = self.meter.create_counter(
215 name="redis.client.csc.evictions",
216 unit="{eviction}",
217 description="The total number of cache evictions",
218 )
219
220 self.csc_network_saved = self.meter.create_counter(
221 name="redis.client.csc.network_saved",
222 unit="By",
223 description="The total number of bytes saved by using CSC",
224 )
225
226 # Resiliency metric recording methods
227
228 def record_error_count(
229 self,
230 server_address: Optional[str] = None,
231 server_port: Optional[int] = None,
232 network_peer_address: Optional[str] = None,
233 network_peer_port: Optional[int] = None,
234 error_type: Optional[Exception] = None,
235 retry_attempts: Optional[int] = None,
236 is_internal: Optional[bool] = None,
237 ):
238 """
239 Record error count
240
241 Args:
242 server_address: Server address
243 server_port: Server port
244 network_peer_address: Network peer address
245 network_peer_port: Network peer port
246 error_type: Error type
247 retry_attempts: Retry attempts
248 is_internal: Whether the error is internal (e.g., timeout, network error)
249 """
250 if not hasattr(self, "client_errors"):
251 return
252
253 attrs = self.attr_builder.build_base_attributes(
254 server_address=server_address,
255 server_port=server_port,
256 )
257 attrs.update(
258 self.attr_builder.build_operation_attributes(
259 network_peer_address=network_peer_address,
260 network_peer_port=network_peer_port,
261 retry_attempts=retry_attempts,
262 )
263 )
264
265 attrs.update(
266 self.attr_builder.build_error_attributes(
267 error_type=error_type,
268 is_internal=is_internal,
269 )
270 )
271
272 self.client_errors.add(1, attributes=attrs)
273
274 def record_maint_notification_count(
275 self,
276 server_address: str,
277 server_port: int,
278 network_peer_address: str,
279 network_peer_port: int,
280 maint_notification: str,
281 ):
282 """
283 Record maintenance notification count
284
285 Args:
286 server_address: Server address
287 server_port: Server port
288 network_peer_address: Network peer address
289 network_peer_port: Network peer port
290 maint_notification: Maintenance notification
291 """
292 if not hasattr(self, "maintenance_notifications"):
293 return
294
295 attrs = self.attr_builder.build_base_attributes(
296 server_address=server_address,
297 server_port=server_port,
298 )
299
300 attrs.update(
301 self.attr_builder.build_operation_attributes(
302 network_peer_address=network_peer_address,
303 network_peer_port=network_peer_port,
304 )
305 )
306
307 attrs[REDIS_CLIENT_CONNECTION_NOTIFICATION] = maint_notification
308 self.maintenance_notifications.add(1, attributes=attrs)
309
310 def record_geo_failover(
311 self,
312 fail_from: Union["SyncDatabase", "AsyncDatabase"],
313 fail_to: Union["SyncDatabase", "AsyncDatabase"],
314 reason: GeoFailoverReason,
315 ):
316 """
317 Record geo failover
318
319 Args:
320 fail_from: Database failed from
321 fail_to: Database failed to
322 reason: Reason for the failover
323 """
324
325 if not hasattr(self, "geo_failovers"):
326 return
327
328 attrs = self.attr_builder.build_geo_failover_attributes(
329 fail_from=fail_from,
330 fail_to=fail_to,
331 reason=reason,
332 )
333
334 return self.geo_failovers.add(1, attributes=attrs)
335
336 def init_connection_count(
337 self,
338 callback: Callable,
339 ) -> None:
340 """
341 Initialize observable gauge for connection count metric.
342
343 Args:
344 callback: Callback function to retrieve connection count
345 """
346 if (
347 MetricGroup.CONNECTION_BASIC not in self.config.metric_groups
348 and not self.connection_count
349 ):
350 return
351
352 self.connection_count = self.meter.create_observable_gauge(
353 name="db.client.connection.count",
354 unit="{connection}",
355 description="Number of connections in the pool",
356 callbacks=[callback],
357 )
358
359 def init_csc_items(
360 self,
361 callback: Callable,
362 ) -> None:
363 """
364 Initialize observable gauge for CSC items metric.
365
366 Args:
367 callback: Callback function to retrieve CSC items count
368 """
369 if MetricGroup.CSC not in self.config.metric_groups and not self.csc_items:
370 return
371
372 self.csc_items = self.meter.create_observable_gauge(
373 name="redis.client.csc.items",
374 unit="{item}",
375 description="The total number of cached responses currently stored",
376 callbacks=[callback],
377 )
378
379 def record_connection_timeout(self, pool_name: str) -> None:
380 """
381 Record a connection timeout event.
382
383 Args:
384 pool_name: Connection pool name
385 """
386 if not hasattr(self, "connection_timeouts"):
387 return
388
389 attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name)
390 self.connection_timeouts.add(1, attributes=attrs)
391
392 def record_connection_create_time(
393 self,
394 connection_pool: Union["ConnectionPoolInterface", "ConnectionPool"],
395 duration_seconds: float,
396 ) -> None:
397 """
398 Record time taken to create a new connection.
399
400 Args:
401 connection_pool: Connection pool implementation
402 duration_seconds: Creation time in seconds
403 """
404 if not hasattr(self, "connection_create_time"):
405 return
406
407 attrs = self.attr_builder.build_connection_attributes(
408 pool_name=get_pool_name(connection_pool)
409 )
410 self.connection_create_time.record(duration_seconds, attributes=attrs)
411
412 def record_connection_wait_time(
413 self,
414 pool_name: str,
415 duration_seconds: float,
416 ) -> None:
417 """
418 Record time taken to obtain a connection from the pool.
419
420 Args:
421 pool_name: Connection pool name
422 duration_seconds: Wait time in seconds
423 """
424 if not hasattr(self, "connection_wait_time"):
425 return
426
427 attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name)
428 self.connection_wait_time.record(duration_seconds, attributes=attrs)
429
430 # Command execution metric recording methods
431
432 @deprecated_args(
433 args_to_warn=["batch_size"],
434 reason="The batch_size argument is no longer used and will be removed in the next major version.",
435 version="7.2.1",
436 )
437 def record_operation_duration(
438 self,
439 command_name: str,
440 duration_seconds: float,
441 server_address: Optional[str] = None,
442 server_port: Optional[int] = None,
443 db_namespace: Optional[int] = None,
444 batch_size: Optional[int] = None, # noqa
445 error_type: Optional[Exception] = None,
446 network_peer_address: Optional[str] = None,
447 network_peer_port: Optional[int] = None,
448 retry_attempts: Optional[int] = None,
449 is_blocking: Optional[bool] = None,
450 ) -> None:
451 """
452 Record command execution duration.
453
454 Args:
455 command_name: Redis command name (e.g., 'GET', 'SET', 'MULTI')
456 duration_seconds: Execution time in seconds
457 server_address: Redis server address
458 server_port: Redis server port
459 db_namespace: Redis database index
460 batch_size: Number of commands in batch (for pipelines/transactions)
461 error_type: Error type if operation failed
462 network_peer_address: Resolved peer address
463 network_peer_port: Peer port number
464 retry_attempts: Number of retry attempts made
465 is_blocking: Whether the operation is a blocking command
466 """
467 if not hasattr(self, "operation_duration"):
468 return
469
470 # Check if this command should be tracked
471 if not self.config.should_track_command(command_name):
472 return
473
474 # Build attributes
475 attrs = self.attr_builder.build_base_attributes(
476 server_address=server_address,
477 server_port=server_port,
478 db_namespace=db_namespace,
479 )
480
481 attrs.update(
482 self.attr_builder.build_operation_attributes(
483 command_name=command_name,
484 network_peer_address=network_peer_address,
485 network_peer_port=network_peer_port,
486 retry_attempts=retry_attempts,
487 is_blocking=is_blocking,
488 )
489 )
490
491 attrs.update(
492 self.attr_builder.build_error_attributes(
493 error_type=error_type,
494 )
495 )
496 self.operation_duration.record(duration_seconds, attributes=attrs)
497
498 def record_connection_closed(
499 self,
500 close_reason: Optional[CloseReason] = None,
501 error_type: Optional[Exception] = None,
502 ) -> None:
503 """
504 Record a connection closed event.
505
506 Args:
507 close_reason: Reason for closing (e.g. 'error', 'application_close')
508 error_type: Error type if closed due to error
509 """
510 if not hasattr(self, "connection_closed"):
511 return
512
513 attrs = self.attr_builder.build_connection_attributes()
514 if close_reason:
515 attrs[REDIS_CLIENT_CONNECTION_CLOSE_REASON] = close_reason.value
516
517 attrs.update(
518 self.attr_builder.build_error_attributes(
519 error_type=error_type,
520 )
521 )
522
523 self.connection_closed.add(1, attributes=attrs)
524
525 def record_connection_relaxed_timeout(
526 self,
527 connection_name: str,
528 maint_notification: str,
529 relaxed: bool,
530 ) -> None:
531 """
532 Record a connection timeout relaxation event.
533
534 Args:
535 connection_name: Connection name
536 maint_notification: Maintenance notification type
537 relaxed: True to count up (relaxed), False to count down (unrelaxed)
538 """
539 if not hasattr(self, "connection_relaxed_timeout"):
540 return
541
542 attrs = self.attr_builder.build_connection_attributes(
543 connection_name=connection_name
544 )
545 attrs[REDIS_CLIENT_CONNECTION_NOTIFICATION] = maint_notification
546 self.connection_relaxed_timeout.add(1 if relaxed else -1, attributes=attrs)
547
548 def record_connection_handoff(
549 self,
550 pool_name: str,
551 ) -> None:
552 """
553 Record a connection handoff event (e.g., after MOVING notification).
554
555 Args:
556 pool_name: Connection pool name
557 """
558 if not hasattr(self, "connection_handoff"):
559 return
560
561 attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name)
562 self.connection_handoff.add(1, attributes=attrs)
563
564 # PubSub metric recording methods
565
566 def record_pubsub_message(
567 self,
568 direction: PubSubDirection,
569 channel: Optional[str] = None,
570 sharded: Optional[bool] = None,
571 ) -> None:
572 """
573 Record a PubSub message (published or received).
574
575 Args:
576 direction: Message direction ('publish' or 'receive')
577 channel: Pub/Sub channel name
578 sharded: True if sharded Pub/Sub channel
579 """
580 if not hasattr(self, "pubsub_messages"):
581 return
582
583 attrs = self.attr_builder.build_pubsub_message_attributes(
584 direction=direction,
585 channel=channel,
586 sharded=sharded,
587 )
588 self.pubsub_messages.add(1, attributes=attrs)
589
590 # Streaming metric recording methods
591
592 @deprecated_args(
593 args_to_warn=["consumer_name"],
594 reason="The consumer_name argument is no longer used and will be removed in the next major version.",
595 version="7.2.1",
596 )
597 def record_streaming_lag(
598 self,
599 lag_seconds: float,
600 stream_name: Optional[str] = None,
601 consumer_group: Optional[str] = None,
602 consumer_name: Optional[str] = None, # noqa
603 ) -> None:
604 """
605 Record the lag of a streaming message.
606
607 Args:
608 lag_seconds: Lag in seconds
609 stream_name: Stream name
610 consumer_group: Consumer group name
611 consumer_name: Consumer name
612 """
613 if not hasattr(self, "stream_lag"):
614 return
615
616 attrs = self.attr_builder.build_streaming_attributes(
617 stream_name=stream_name,
618 consumer_group=consumer_group,
619 )
620 self.stream_lag.record(lag_seconds, attributes=attrs)
621
622 # CSC metric recording methods
623
624 def record_csc_request(
625 self,
626 result: Optional[CSCResult] = None,
627 ) -> None:
628 """
629 Record a Client Side Caching (CSC) request.
630
631 Args:
632 result: CSC result ('hit' or 'miss')
633 """
634 if not hasattr(self, "csc_requests"):
635 return
636
637 attrs = self.attr_builder.build_csc_attributes(result=result)
638 self.csc_requests.add(1, attributes=attrs)
639
640 def record_csc_eviction(
641 self,
642 count: int,
643 reason: Optional[CSCReason] = None,
644 ) -> None:
645 """
646 Record a Client Side Caching (CSC) eviction.
647
648 Args:
649 count: Number of evictions
650 reason: Reason for eviction
651 """
652 if not hasattr(self, "csc_evictions"):
653 return
654
655 attrs = self.attr_builder.build_csc_attributes(reason=reason)
656 self.csc_evictions.add(count, attributes=attrs)
657
658 def record_csc_network_saved(
659 self,
660 bytes_saved: int,
661 ) -> None:
662 """
663 Record the number of bytes saved by using Client Side Caching (CSC).
664
665 Args:
666 bytes_saved: Number of bytes saved
667 """
668 if not hasattr(self, "csc_network_saved"):
669 return
670
671 attrs = self.attr_builder.build_csc_attributes()
672 self.csc_network_saved.add(bytes_saved, attributes=attrs)
673
674 # Utility methods
675
676 @staticmethod
677 def monotonic_time() -> float:
678 """
679 Get monotonic time for duration measurements.
680
681 Returns:
682 Current monotonic time in seconds
683 """
684 return time.monotonic()
685
686 def __repr__(self) -> str:
687 return f"RedisMetricsCollector(meter={self.meter}, config={self.config})"