1"""
2Simple, clean API for recording observability metrics.
3
4This module provides a straightforward interface for Redis core code to record
5metrics without needing to know about OpenTelemetry internals.
6
7Usage in Redis core code:
8 from redis.observability.recorder import record_operation_duration
9
10 start_time = time.monotonic()
11 # ... execute Redis command ...
12 record_operation_duration(
13 command_name='SET',
14 duration_seconds=time.monotonic() - start_time,
15 server_address='localhost',
16 server_port=6379,
17 db_namespace='0',
18 error=None
19 )
20"""
21
22from datetime import datetime
23from typing import TYPE_CHECKING, Callable, List, Optional
24
25from redis.observability.attributes import (
26 AttributeBuilder,
27 ConnectionState,
28 CSCReason,
29 CSCResult,
30 GeoFailoverReason,
31 PubSubDirection,
32)
33from redis.observability.metrics import CloseReason, RedisMetricsCollector
34from redis.observability.providers import get_observability_instance
35from redis.observability.registry import get_observables_registry_instance
36from redis.utils import deprecated_args, deprecated_function, str_if_bytes
37
38if TYPE_CHECKING:
39 from redis.connection import ConnectionPoolInterface
40 from redis.multidb.database import SyncDatabase
41 from redis.observability.config import OTelConfig
42
43# Global metrics collector instance (lazy-initialized)
44_metrics_collector: Optional[RedisMetricsCollector] = None
45
46CSC_ITEMS_REGISTRY_KEY = "csc_items"
47CONNECTION_COUNT_REGISTRY_KEY = "connection_count"
48
49
50@deprecated_args(
51 args_to_warn=["batch_size"],
52 reason="The batch_size argument is no longer used and will be removed in the next major version.",
53 version="7.2.1",
54)
55def record_operation_duration(
56 command_name: str,
57 duration_seconds: float,
58 server_address: Optional[str] = None,
59 server_port: Optional[int] = None,
60 db_namespace: Optional[str] = None,
61 error: Optional[Exception] = None,
62 is_blocking: Optional[bool] = None,
63 batch_size: Optional[int] = None, # noqa
64 retry_attempts: Optional[int] = None,
65) -> None:
66 """
67 Record a Redis command execution duration.
68
69 This is a simple, clean API that Redis core code can call directly.
70 If observability is not enabled, this returns immediately with zero overhead.
71
72 Args:
73 command_name: Redis command name (e.g., 'GET', 'SET')
74 duration_seconds: Command execution time in seconds
75 server_address: Redis server address
76 server_port: Redis server port
77 db_namespace: Redis database index
78 error: Exception if command failed, None if successful
79 is_blocking: Whether the operation is a blocking command
80 batch_size: Number of commands in batch (for pipelines/transactions)
81 retry_attempts: Number of retry attempts made
82
83 Example:
84 >>> start = time.monotonic()
85 >>> # ... execute command ...
86 >>> record_operation_duration('SET', time.monotonic() - start, 'localhost', 6379, '0')
87 """
88 global _metrics_collector
89
90 # Fast path: if collector not initialized, observability is disabled
91 if _metrics_collector is None:
92 # Try to initialize (only once)
93 _metrics_collector = _get_or_create_collector()
94 if _metrics_collector is None:
95 return # Observability not enabled
96
97 # Record the metric
98 try:
99 _metrics_collector.record_operation_duration(
100 command_name=command_name,
101 duration_seconds=duration_seconds,
102 server_address=server_address,
103 server_port=server_port,
104 db_namespace=db_namespace,
105 error_type=error,
106 network_peer_address=server_address,
107 network_peer_port=server_port,
108 is_blocking=is_blocking,
109 retry_attempts=retry_attempts,
110 )
111 except Exception:
112 # Don't let metric recording errors break Redis operations
113 pass
114
115
116def record_connection_create_time(
117 connection_pool: "ConnectionPoolInterface",
118 duration_seconds: float,
119) -> None:
120 """
121 Record connection creation time.
122
123 Args:
124 connection_pool: Connection pool implementation
125 duration_seconds: Time taken to create connection in seconds
126
127 Example:
128 >>> start = time.monotonic()
129 >>> # ... create connection ...
130 >>> record_connection_create_time('ConnectionPool<localhost:6379>', time.monotonic() - start)
131 """
132 global _metrics_collector
133
134 # Fast path: if collector not initialized, observability is disabled
135 if _metrics_collector is None:
136 _metrics_collector = _get_or_create_collector()
137 if _metrics_collector is None:
138 return
139
140 try:
141 _metrics_collector.record_connection_create_time(
142 connection_pool=connection_pool,
143 duration_seconds=duration_seconds,
144 )
145 except Exception:
146 pass
147
148
149def record_connection_count(
150 pool_name: str,
151 connection_state: ConnectionState,
152 counter: int = 1,
153) -> None:
154 """
155 Record a connection count change for a single state.
156
157 Args:
158 pool_name: Connection pool identifier
159 connection_state: State to update (IDLE or USED)
160 counter: Number to add (positive) or subtract (negative)
161
162 Example:
163 # New connection created (goes to IDLE first)
164 >>> record_connection_count('pool_abc123', ConnectionState.IDLE, 1)
165
166 # Acquire from pool (transition)
167 >>> record_connection_count('pool_abc123', ConnectionState.IDLE, -1)
168 >>> record_connection_count('pool_abc123', ConnectionState.USED, 1)
169
170 # Release to pool (transition)
171 >>> record_connection_count('pool_abc123', ConnectionState.USED, -1)
172 >>> record_connection_count('pool_abc123', ConnectionState.IDLE, 1)
173
174 # Pool disconnect 5 idle connections
175 >>> record_connection_count('pool_abc123', ConnectionState.IDLE, -5)
176 """
177 global _metrics_collector
178
179 if _metrics_collector is None:
180 _metrics_collector = _get_or_create_collector()
181 if _metrics_collector is None:
182 return
183
184 try:
185 _metrics_collector.record_connection_count(
186 pool_name=pool_name,
187 connection_state=connection_state,
188 counter=counter,
189 )
190 except Exception:
191 pass
192
193
194@deprecated_function(
195 reason="Connection count is now tracked via record_connection_count(). "
196 "This functionality will be removed in the next major version",
197 version="7.4.0",
198)
199def init_connection_count() -> None:
200 """
201 Initialize observable gauge for connection count metric.
202 """
203 collector = _get_or_create_collector()
204 if collector is None:
205 return
206
207 def observable_callback(__):
208 observables_registry = get_observables_registry_instance()
209 callbacks = observables_registry.get(CONNECTION_COUNT_REGISTRY_KEY)
210 observations = []
211
212 for callback in callbacks:
213 observations.extend(callback())
214
215 return observations
216
217 try:
218 collector.init_connection_count(
219 callback=observable_callback,
220 )
221 except Exception:
222 pass
223
224
225@deprecated_function(
226 reason="Connection count is now tracked via record_connection_count(). "
227 "This functionality will be removed in the next major version",
228 version="7.4.0",
229)
230def register_pools_connection_count(
231 connection_pools: List["ConnectionPoolInterface"],
232) -> None:
233 """
234 Add connection pools to connection count observable registry.
235 """
236 collector = _get_or_create_collector()
237 if collector is None:
238 return
239
240 try:
241 # Lazy import
242 from opentelemetry.metrics import Observation
243
244 def connection_count_callback():
245 observations = []
246 for connection_pool in connection_pools:
247 for count, attributes in connection_pool.get_connection_count():
248 observations.append(Observation(count, attributes=attributes))
249 return observations
250
251 observables_registry = get_observables_registry_instance()
252 observables_registry.register(
253 CONNECTION_COUNT_REGISTRY_KEY, connection_count_callback
254 )
255 except Exception:
256 pass
257
258
259def record_connection_timeout(
260 pool_name: str,
261) -> None:
262 """
263 Record a connection timeout event.
264
265 Args:
266 pool_name: Connection pool identifier
267
268 Example:
269 >>> record_connection_timeout('ConnectionPool<localhost:6379>')
270 """
271 global _metrics_collector
272
273 if _metrics_collector is None:
274 _metrics_collector = _get_or_create_collector()
275 if _metrics_collector is None:
276 return
277
278 try:
279 _metrics_collector.record_connection_timeout(
280 pool_name=pool_name,
281 )
282 except Exception:
283 pass
284
285
286def record_connection_wait_time(
287 pool_name: str,
288 duration_seconds: float,
289) -> None:
290 """
291 Record time taken to obtain a connection from the pool.
292
293 Args:
294 pool_name: Connection pool identifier
295 duration_seconds: Wait time in seconds
296
297 Example:
298 >>> start = time.monotonic()
299 >>> # ... wait for connection from pool ...
300 >>> record_connection_wait_time('ConnectionPool<localhost:6379>', time.monotonic() - start)
301 """
302 global _metrics_collector
303
304 if _metrics_collector is None:
305 _metrics_collector = _get_or_create_collector()
306 if _metrics_collector is None:
307 return
308
309 try:
310 _metrics_collector.record_connection_wait_time(
311 pool_name=pool_name,
312 duration_seconds=duration_seconds,
313 )
314 except Exception:
315 pass
316
317
318def record_connection_closed(
319 close_reason: Optional[CloseReason] = None,
320 error_type: Optional[Exception] = None,
321) -> None:
322 """
323 Record a connection closed event.
324
325 Args:
326 close_reason: Reason for closing (e.g. 'error', 'application_close')
327 error_type: Error type if closed due to error
328
329 Example:
330 >>> record_connection_closed('ConnectionPool<localhost:6379>', 'idle_timeout')
331 """
332 global _metrics_collector
333
334 if _metrics_collector is None:
335 _metrics_collector = _get_or_create_collector()
336 if _metrics_collector is None:
337 return
338
339 try:
340 _metrics_collector.record_connection_closed(
341 close_reason=close_reason,
342 error_type=error_type,
343 )
344 except Exception:
345 pass
346
347
348def record_connection_relaxed_timeout(
349 connection_name: str,
350 maint_notification: str,
351 relaxed: bool,
352) -> None:
353 """
354 Record a connection timeout relaxation event.
355
356 Args:
357 connection_name: Connection identifier
358 maint_notification: Maintenance notification type
359 relaxed: True to count up (relaxed), False to count down (unrelaxed)
360
361 Example:
362 >>> record_connection_relaxed_timeout('localhost:6379_a1b2c3d4', 'MOVING', True)
363 """
364 global _metrics_collector
365
366 if _metrics_collector is None:
367 _metrics_collector = _get_or_create_collector()
368 if _metrics_collector is None:
369 return
370
371 try:
372 _metrics_collector.record_connection_relaxed_timeout(
373 connection_name=connection_name,
374 maint_notification=maint_notification,
375 relaxed=relaxed,
376 )
377 except Exception:
378 pass
379
380
381def record_connection_handoff(
382 pool_name: str,
383) -> None:
384 """
385 Record a connection handoff event (e.g., after MOVING notification).
386
387 Args:
388 pool_name: Connection pool identifier
389
390 Example:
391 >>> record_connection_handoff('ConnectionPool<localhost:6379>')
392 """
393 global _metrics_collector
394
395 if _metrics_collector is None:
396 _metrics_collector = _get_or_create_collector()
397 if _metrics_collector is None:
398 return
399
400 try:
401 _metrics_collector.record_connection_handoff(
402 pool_name=pool_name,
403 )
404 except Exception:
405 pass
406
407
408def record_error_count(
409 server_address: Optional[str] = None,
410 server_port: Optional[int] = None,
411 network_peer_address: Optional[str] = None,
412 network_peer_port: Optional[int] = None,
413 error_type: Optional[Exception] = None,
414 retry_attempts: Optional[int] = None,
415 is_internal: bool = True,
416) -> None:
417 """
418 Record error count.
419
420 Args:
421 server_address: Server address
422 server_port: Server port
423 network_peer_address: Network peer address
424 network_peer_port: Network peer port
425 error_type: Error type (Exception)
426 retry_attempts: Retry attempts
427 is_internal: Whether the error is internal (e.g., timeout, network error)
428
429 Example:
430 >>> record_error_count('localhost', 6379, 'localhost', 6379, ConnectionError(), 3)
431 """
432 global _metrics_collector
433
434 if _metrics_collector is None:
435 _metrics_collector = _get_or_create_collector()
436 if _metrics_collector is None:
437 return
438
439 try:
440 _metrics_collector.record_error_count(
441 server_address=server_address,
442 server_port=server_port,
443 network_peer_address=network_peer_address,
444 network_peer_port=network_peer_port,
445 error_type=error_type,
446 retry_attempts=retry_attempts,
447 is_internal=is_internal,
448 )
449 except Exception:
450 pass
451
452
453def record_pubsub_message(
454 direction: PubSubDirection,
455 channel: Optional[str] = None,
456 sharded: Optional[bool] = None,
457) -> None:
458 """
459 Record a PubSub message (published or received).
460
461 Args:
462 direction: Message direction ('publish' or 'receive')
463 channel: Pub/Sub channel name
464 sharded: True if sharded Pub/Sub channel
465
466 Example:
467 >>> record_pubsub_message(PubSubDirection.PUBLISH, 'channel', False)
468 """
469 global _metrics_collector
470
471 if _metrics_collector is None:
472 _metrics_collector = _get_or_create_collector()
473 if _metrics_collector is None:
474 return
475
476 # Check if channel names should be hidden
477 effective_channel = channel
478 if channel is not None:
479 config = _get_config()
480 if config is not None and config.hide_pubsub_channel_names:
481 effective_channel = None
482
483 try:
484 _metrics_collector.record_pubsub_message(
485 direction=direction,
486 channel=effective_channel,
487 sharded=sharded,
488 )
489 except Exception:
490 pass
491
492
493@deprecated_args(
494 args_to_warn=["consumer_name"],
495 reason="The consumer_name argument is no longer used and will be removed in the next major version.",
496 version="7.2.1",
497)
498def record_streaming_lag(
499 lag_seconds: float,
500 stream_name: Optional[str] = None,
501 consumer_group: Optional[str] = None,
502 consumer_name: Optional[str] = None, # noqa
503) -> None:
504 """
505 Record the lag of a streaming message.
506
507 Args:
508 lag_seconds: Lag in seconds
509 stream_name: Stream name
510 consumer_group: Consumer group name
511 consumer_name: Consumer name
512 """
513 global _metrics_collector
514
515 if _metrics_collector is None:
516 _metrics_collector = _get_or_create_collector()
517 if _metrics_collector is None:
518 return
519
520 # Check if stream names should be hidden
521 effective_stream_name = stream_name
522 if stream_name is not None:
523 config = _get_config()
524 if config is not None and config.hide_stream_names:
525 effective_stream_name = None
526
527 try:
528 _metrics_collector.record_streaming_lag(
529 lag_seconds=lag_seconds,
530 stream_name=effective_stream_name,
531 consumer_group=consumer_group,
532 )
533 except Exception:
534 pass
535
536
537@deprecated_args(
538 args_to_warn=["consumer_name"],
539 reason="The consumer_name argument is no longer used and will be removed in the next major version.",
540 version="7.2.1",
541)
542def record_streaming_lag_from_response(
543 response,
544 consumer_group: Optional[str] = None,
545 consumer_name: Optional[str] = None, # noqa
546) -> None:
547 """
548 Record streaming lag from XREAD/XREADGROUP response.
549
550 Parses the response and calculates lag for each message based on message ID timestamp.
551
552 Args:
553 response: Response from XREAD/XREADGROUP command
554 consumer_group: Consumer group name (for XREADGROUP)
555 consumer_name: Consumer name (for XREADGROUP)
556 """
557
558 global _metrics_collector
559
560 if _metrics_collector is None:
561 _metrics_collector = _get_or_create_collector()
562 if _metrics_collector is None:
563 return
564
565 if not response:
566 return
567
568 try:
569 now = datetime.now().timestamp()
570
571 # Check if stream names should be hidden
572 config = _get_config()
573 hide_stream_names = config is not None and config.hide_stream_names
574
575 # RESP3 format: dict
576 if isinstance(response, dict):
577 for stream_name, stream_messages in response.items():
578 effective_stream_name = (
579 None if hide_stream_names else str_if_bytes(stream_name)
580 )
581 for messages in stream_messages:
582 for message in messages:
583 message_id, _ = message
584 message_id = str_if_bytes(message_id)
585 timestamp, _ = message_id.split("-")
586 # Ensure lag is non-negative (clock skew can cause negative values)
587 lag_seconds = max(0.0, now - int(timestamp) / 1000)
588
589 _metrics_collector.record_streaming_lag(
590 lag_seconds=lag_seconds,
591 stream_name=effective_stream_name,
592 consumer_group=consumer_group,
593 )
594 else:
595 # RESP2 format: list
596 for stream_entry in response:
597 stream_name = str_if_bytes(stream_entry[0])
598 effective_stream_name = None if hide_stream_names else stream_name
599
600 for message in stream_entry[1]:
601 message_id, _ = message
602 message_id = str_if_bytes(message_id)
603 timestamp, _ = message_id.split("-")
604 # Ensure lag is non-negative (clock skew can cause negative values)
605 lag_seconds = max(0.0, now - int(timestamp) / 1000)
606
607 _metrics_collector.record_streaming_lag(
608 lag_seconds=lag_seconds,
609 stream_name=effective_stream_name,
610 consumer_group=consumer_group,
611 )
612 except Exception:
613 pass
614
615
616def record_maint_notification_count(
617 server_address: str,
618 server_port: int,
619 network_peer_address: str,
620 network_peer_port: int,
621 maint_notification: str,
622) -> None:
623 """
624 Record a maintenance notification count.
625
626 Args:
627 server_address: Server address
628 server_port: Server port
629 network_peer_address: Network peer address
630 network_peer_port: Network peer port
631 maint_notification: Maintenance notification type (e.g., 'MOVING', 'MIGRATING')
632
633 Example:
634 >>> record_maint_notification_count('localhost', 6379, 'localhost', 6379, 'MOVING')
635 """
636 global _metrics_collector
637
638 if _metrics_collector is None:
639 _metrics_collector = _get_or_create_collector()
640 if _metrics_collector is None:
641 return
642
643 try:
644 _metrics_collector.record_maint_notification_count(
645 server_address=server_address,
646 server_port=server_port,
647 network_peer_address=network_peer_address,
648 network_peer_port=network_peer_port,
649 maint_notification=maint_notification,
650 )
651 except Exception:
652 pass
653
654
655def record_csc_request(
656 result: Optional[CSCResult] = None,
657):
658 """
659 Record a Client Side Caching (CSC) request.
660
661 Args:
662 result: CSC result ('hit' or 'miss')
663 """
664 global _metrics_collector
665
666 if _metrics_collector is None:
667 _metrics_collector = _get_or_create_collector()
668 if _metrics_collector is None:
669 return
670
671 try:
672 _metrics_collector.record_csc_request(
673 result=result,
674 )
675 except Exception:
676 pass
677
678
679def init_csc_items() -> None:
680 """
681 Initialize observable gauge for CSC items metric.
682 """
683 global _metrics_collector
684
685 if _metrics_collector is None:
686 _metrics_collector = _get_or_create_collector()
687 if _metrics_collector is None:
688 return
689
690 def observable_callback(__):
691 observables_registry = get_observables_registry_instance()
692 callbacks = observables_registry.get(CSC_ITEMS_REGISTRY_KEY)
693 observations = []
694
695 for callback in callbacks:
696 observations.extend(callback())
697
698 return observations
699
700 try:
701 _metrics_collector.init_csc_items(
702 callback=observable_callback,
703 )
704 except Exception:
705 pass
706
707
708def register_csc_items_callback(
709 callback: Callable,
710 pool_name: Optional[str] = None,
711) -> None:
712 """
713 Adds given callback to CSC items observable registry.
714
715 Args:
716 callback: Callback function that returns the cache size
717 pool_name: Connection pool name for observability
718 """
719 global _metrics_collector
720
721 if _metrics_collector is None:
722 _metrics_collector = _get_or_create_collector()
723 if _metrics_collector is None:
724 return
725
726 # Lazy import
727 from opentelemetry.metrics import Observation
728
729 def csc_items_callback():
730 return [
731 Observation(
732 callback(),
733 attributes=AttributeBuilder.build_csc_attributes(pool_name=pool_name),
734 )
735 ]
736
737 try:
738 observables_registry = get_observables_registry_instance()
739 observables_registry.register(CSC_ITEMS_REGISTRY_KEY, csc_items_callback)
740 except Exception:
741 pass
742
743
744def record_csc_eviction(
745 count: int,
746 reason: Optional[CSCReason] = None,
747) -> None:
748 """
749 Record a Client Side Caching (CSC) eviction.
750
751 Args:
752 count: Number of evictions
753 reason: Reason for eviction
754 """
755 global _metrics_collector
756
757 if _metrics_collector is None:
758 _metrics_collector = _get_or_create_collector()
759 if _metrics_collector is None:
760 return
761
762 try:
763 _metrics_collector.record_csc_eviction(
764 count=count,
765 reason=reason,
766 )
767 except Exception:
768 pass
769
770
771def record_csc_network_saved(
772 bytes_saved: int,
773) -> None:
774 """
775 Record the number of bytes saved by using Client Side Caching (CSC).
776
777 Args:
778 bytes_saved: Number of bytes saved
779 """
780 global _metrics_collector
781
782 if _metrics_collector is None:
783 _metrics_collector = _get_or_create_collector()
784 if _metrics_collector is None:
785 return
786
787 try:
788 _metrics_collector.record_csc_network_saved(
789 bytes_saved=bytes_saved,
790 )
791 except Exception:
792 pass
793
794
795def record_geo_failover(
796 fail_from: "SyncDatabase",
797 fail_to: "SyncDatabase",
798 reason: GeoFailoverReason,
799) -> None:
800 """
801 Record a geo failover.
802
803 Args:
804 fail_from: Database failed from
805 fail_to: Database failed to
806 reason: Reason for the failover
807 """
808 global _metrics_collector
809
810 if _metrics_collector is None:
811 _metrics_collector = _get_or_create_collector()
812 if _metrics_collector is None:
813 return
814
815 try:
816 _metrics_collector.record_geo_failover(
817 fail_from=fail_from,
818 fail_to=fail_to,
819 reason=reason,
820 )
821 except Exception:
822 pass
823
824
825def _get_or_create_collector() -> Optional[RedisMetricsCollector]:
826 """
827 Get or create the global metrics collector.
828
829 Returns:
830 RedisMetricsCollector instance if observability is enabled, None otherwise
831 """
832 try:
833 manager = get_observability_instance().get_provider_manager()
834 if manager is None or not manager.config.enabled_telemetry:
835 return None
836
837 # Get meter from the global MeterProvider
838 meter = manager.get_meter_provider().get_meter(
839 RedisMetricsCollector.METER_NAME, RedisMetricsCollector.METER_VERSION
840 )
841
842 return RedisMetricsCollector(meter, manager.config)
843
844 except ImportError:
845 # Observability module not available
846 return None
847 except Exception:
848 # Any other error - don't break Redis operations
849 return None
850
851
852def _get_config() -> Optional["OTelConfig"]:
853 """
854 Get the OTel configuration from the observability manager.
855
856 Returns:
857 OTelConfig instance if observability is enabled, None otherwise
858 """
859 try:
860 manager = get_observability_instance().get_provider_manager()
861 if manager is None:
862 return None
863 return manager.config
864 except Exception:
865 return None
866
867
868def reset_collector() -> None:
869 """
870 Reset the global collector (used for testing or re-initialization).
871 """
872 global _metrics_collector
873 _metrics_collector = None
874
875
876def is_enabled() -> bool:
877 """
878 Check if observability is enabled.
879
880 Returns:
881 True if metrics are being collected, False otherwise
882 """
883 global _metrics_collector
884
885 if _metrics_collector is None:
886 _metrics_collector = _get_or_create_collector()
887
888 return _metrics_collector is not None