1"""
2Simple, clean API for recording observability metrics.
3
4This module provides a straightforward interface for Redis core code to record
5metrics without needing to know about OpenTelemetry internals.
6
7Usage in Redis core code:
8 from redis.observability.recorder import record_operation_duration
9
10 start_time = time.monotonic()
11 # ... execute Redis command ...
12 record_operation_duration(
13 command_name='SET',
14 duration_seconds=time.monotonic() - start_time,
15 server_address='localhost',
16 server_port=6379,
17 db_namespace='0',
18 error=None
19 )
20"""
21
22from datetime import datetime
23from typing import TYPE_CHECKING, Callable, List, Optional
24
25from redis.observability.attributes import (
26 AttributeBuilder,
27 CSCReason,
28 CSCResult,
29 GeoFailoverReason,
30 PubSubDirection,
31)
32from redis.observability.metrics import CloseReason, RedisMetricsCollector
33from redis.observability.providers import get_observability_instance
34from redis.observability.registry import get_observables_registry_instance
35from redis.utils import deprecated_args, str_if_bytes
36
37if TYPE_CHECKING:
38 from redis.connection import ConnectionPoolInterface
39 from redis.multidb.database import SyncDatabase
40 from redis.observability.config import OTelConfig
41
42# Global metrics collector instance (lazy-initialized)
43_metrics_collector: Optional[RedisMetricsCollector] = None
44
45CONNECTION_COUNT_REGISTRY_KEY = "connection_count"
46CSC_ITEMS_REGISTRY_KEY = "csc_items"
47
48
49@deprecated_args(
50 args_to_warn=["batch_size"],
51 reason="The batch_size argument is no longer used and will be removed in the next major version.",
52 version="7.2.1",
53)
54def record_operation_duration(
55 command_name: str,
56 duration_seconds: float,
57 server_address: Optional[str] = None,
58 server_port: Optional[int] = None,
59 db_namespace: Optional[str] = None,
60 error: Optional[Exception] = None,
61 is_blocking: Optional[bool] = None,
62 batch_size: Optional[int] = None, # noqa
63 retry_attempts: Optional[int] = None,
64) -> None:
65 """
66 Record a Redis command execution duration.
67
68 This is a simple, clean API that Redis core code can call directly.
69 If observability is not enabled, this returns immediately with zero overhead.
70
71 Args:
72 command_name: Redis command name (e.g., 'GET', 'SET')
73 duration_seconds: Command execution time in seconds
74 server_address: Redis server address
75 server_port: Redis server port
76 db_namespace: Redis database index
77 error: Exception if command failed, None if successful
78 is_blocking: Whether the operation is a blocking command
79 batch_size: Number of commands in batch (for pipelines/transactions)
80 retry_attempts: Number of retry attempts made
81
82 Example:
83 >>> start = time.monotonic()
84 >>> # ... execute command ...
85 >>> record_operation_duration('SET', time.monotonic() - start, 'localhost', 6379, '0')
86 """
87 global _metrics_collector
88
89 # Fast path: if collector not initialized, observability is disabled
90 if _metrics_collector is None:
91 # Try to initialize (only once)
92 _metrics_collector = _get_or_create_collector()
93 if _metrics_collector is None:
94 return # Observability not enabled
95
96 # Record the metric
97 try:
98 _metrics_collector.record_operation_duration(
99 command_name=command_name,
100 duration_seconds=duration_seconds,
101 server_address=server_address,
102 server_port=server_port,
103 db_namespace=db_namespace,
104 error_type=error,
105 network_peer_address=server_address,
106 network_peer_port=server_port,
107 is_blocking=is_blocking,
108 retry_attempts=retry_attempts,
109 )
110 except Exception:
111 # Don't let metric recording errors break Redis operations
112 pass
113
114
115def record_connection_create_time(
116 connection_pool: "ConnectionPoolInterface",
117 duration_seconds: float,
118) -> None:
119 """
120 Record connection creation time.
121
122 Args:
123 connection_pool: Connection pool implementation
124 duration_seconds: Time taken to create connection in seconds
125
126 Example:
127 >>> start = time.monotonic()
128 >>> # ... create connection ...
129 >>> record_connection_create_time('ConnectionPool<localhost:6379>', time.monotonic() - start)
130 """
131 global _metrics_collector
132
133 # Fast path: if collector not initialized, observability is disabled
134 if _metrics_collector is None:
135 _metrics_collector = _get_or_create_collector()
136 if _metrics_collector is None:
137 return
138
139 try:
140 _metrics_collector.record_connection_create_time(
141 connection_pool=connection_pool,
142 duration_seconds=duration_seconds,
143 )
144 except Exception:
145 pass
146
147
148def init_connection_count() -> None:
149 """
150 Initialize observable gauge for connection count metric.
151 """
152 global _metrics_collector
153
154 if _metrics_collector is None:
155 _metrics_collector = _get_or_create_collector()
156 if _metrics_collector is None:
157 return
158
159 def observable_callback(__):
160 observables_registry = get_observables_registry_instance()
161 callbacks = observables_registry.get(CONNECTION_COUNT_REGISTRY_KEY)
162 observations = []
163
164 for callback in callbacks:
165 observations.extend(callback())
166
167 return observations
168
169 try:
170 _metrics_collector.init_connection_count(
171 callback=observable_callback,
172 )
173 except Exception:
174 pass
175
176
177def register_pools_connection_count(
178 connection_pools: List["ConnectionPoolInterface"],
179) -> None:
180 """
181 Add connection pools to connection count observable registry.
182 """
183 global _metrics_collector
184
185 if _metrics_collector is None:
186 _metrics_collector = _get_or_create_collector()
187 if _metrics_collector is None:
188 return
189
190 try:
191 # Lazy import
192 from opentelemetry.metrics import Observation
193
194 def connection_count_callback():
195 observations = []
196 for connection_pool in connection_pools:
197 for count, attributes in connection_pool.get_connection_count():
198 observations.append(Observation(count, attributes=attributes))
199 return observations
200
201 observables_registry = get_observables_registry_instance()
202 observables_registry.register(
203 CONNECTION_COUNT_REGISTRY_KEY, connection_count_callback
204 )
205 except Exception:
206 pass
207
208
209def record_connection_timeout(
210 pool_name: str,
211) -> None:
212 """
213 Record a connection timeout event.
214
215 Args:
216 pool_name: Connection pool identifier
217
218 Example:
219 >>> record_connection_timeout('ConnectionPool<localhost:6379>')
220 """
221 global _metrics_collector
222
223 if _metrics_collector is None:
224 _metrics_collector = _get_or_create_collector()
225 if _metrics_collector is None:
226 return
227
228 try:
229 _metrics_collector.record_connection_timeout(
230 pool_name=pool_name,
231 )
232 except Exception:
233 pass
234
235
236def record_connection_wait_time(
237 pool_name: str,
238 duration_seconds: float,
239) -> None:
240 """
241 Record time taken to obtain a connection from the pool.
242
243 Args:
244 pool_name: Connection pool identifier
245 duration_seconds: Wait time in seconds
246
247 Example:
248 >>> start = time.monotonic()
249 >>> # ... wait for connection from pool ...
250 >>> record_connection_wait_time('ConnectionPool<localhost:6379>', time.monotonic() - start)
251 """
252 global _metrics_collector
253
254 if _metrics_collector is None:
255 _metrics_collector = _get_or_create_collector()
256 if _metrics_collector is None:
257 return
258
259 try:
260 _metrics_collector.record_connection_wait_time(
261 pool_name=pool_name,
262 duration_seconds=duration_seconds,
263 )
264 except Exception:
265 pass
266
267
268def record_connection_closed(
269 close_reason: Optional[CloseReason] = None,
270 error_type: Optional[Exception] = None,
271) -> None:
272 """
273 Record a connection closed event.
274
275 Args:
276 close_reason: Reason for closing (e.g. 'error', 'application_close')
277 error_type: Error type if closed due to error
278
279 Example:
280 >>> record_connection_closed('ConnectionPool<localhost:6379>', 'idle_timeout')
281 """
282 global _metrics_collector
283
284 if _metrics_collector is None:
285 _metrics_collector = _get_or_create_collector()
286 if _metrics_collector is None:
287 return
288
289 try:
290 _metrics_collector.record_connection_closed(
291 close_reason=close_reason,
292 error_type=error_type,
293 )
294 except Exception:
295 pass
296
297
298def record_connection_relaxed_timeout(
299 connection_name: str,
300 maint_notification: str,
301 relaxed: bool,
302) -> None:
303 """
304 Record a connection timeout relaxation event.
305
306 Args:
307 connection_name: Connection identifier
308 maint_notification: Maintenance notification type
309 relaxed: True to count up (relaxed), False to count down (unrelaxed)
310
311 Example:
312 >>> record_connection_relaxed_timeout('Connection<localhost:6379>', 'MOVING', True)
313 """
314 global _metrics_collector
315
316 if _metrics_collector is None:
317 _metrics_collector = _get_or_create_collector()
318 if _metrics_collector is None:
319 return
320
321 try:
322 _metrics_collector.record_connection_relaxed_timeout(
323 connection_name=connection_name,
324 maint_notification=maint_notification,
325 relaxed=relaxed,
326 )
327 except Exception:
328 pass
329
330
331def record_connection_handoff(
332 pool_name: str,
333) -> None:
334 """
335 Record a connection handoff event (e.g., after MOVING notification).
336
337 Args:
338 pool_name: Connection pool identifier
339
340 Example:
341 >>> record_connection_handoff('ConnectionPool<localhost:6379>')
342 """
343 global _metrics_collector
344
345 if _metrics_collector is None:
346 _metrics_collector = _get_or_create_collector()
347 if _metrics_collector is None:
348 return
349
350 try:
351 _metrics_collector.record_connection_handoff(
352 pool_name=pool_name,
353 )
354 except Exception:
355 pass
356
357
358def record_error_count(
359 server_address: Optional[str] = None,
360 server_port: Optional[int] = None,
361 network_peer_address: Optional[str] = None,
362 network_peer_port: Optional[int] = None,
363 error_type: Optional[Exception] = None,
364 retry_attempts: Optional[int] = None,
365 is_internal: bool = True,
366) -> None:
367 """
368 Record error count.
369
370 Args:
371 server_address: Server address
372 server_port: Server port
373 network_peer_address: Network peer address
374 network_peer_port: Network peer port
375 error_type: Error type (Exception)
376 retry_attempts: Retry attempts
377 is_internal: Whether the error is internal (e.g., timeout, network error)
378
379 Example:
380 >>> record_error_count('localhost', 6379, 'localhost', 6379, ConnectionError(), 3)
381 """
382 global _metrics_collector
383
384 if _metrics_collector is None:
385 _metrics_collector = _get_or_create_collector()
386 if _metrics_collector is None:
387 return
388
389 try:
390 _metrics_collector.record_error_count(
391 server_address=server_address,
392 server_port=server_port,
393 network_peer_address=network_peer_address,
394 network_peer_port=network_peer_port,
395 error_type=error_type,
396 retry_attempts=retry_attempts,
397 is_internal=is_internal,
398 )
399 except Exception:
400 pass
401
402
403def record_pubsub_message(
404 direction: PubSubDirection,
405 channel: Optional[str] = None,
406 sharded: Optional[bool] = None,
407) -> None:
408 """
409 Record a PubSub message (published or received).
410
411 Args:
412 direction: Message direction ('publish' or 'receive')
413 channel: Pub/Sub channel name
414 sharded: True if sharded Pub/Sub channel
415
416 Example:
417 >>> record_pubsub_message(PubSubDirection.PUBLISH, 'channel', False)
418 """
419 global _metrics_collector
420
421 if _metrics_collector is None:
422 _metrics_collector = _get_or_create_collector()
423 if _metrics_collector is None:
424 return
425
426 # Check if channel names should be hidden
427 effective_channel = channel
428 if channel is not None:
429 config = _get_config()
430 if config is not None and config.hide_pubsub_channel_names:
431 effective_channel = None
432
433 try:
434 _metrics_collector.record_pubsub_message(
435 direction=direction,
436 channel=effective_channel,
437 sharded=sharded,
438 )
439 except Exception:
440 pass
441
442
443@deprecated_args(
444 args_to_warn=["consumer_name"],
445 reason="The consumer_name argument is no longer used and will be removed in the next major version.",
446 version="7.2.1",
447)
448def record_streaming_lag(
449 lag_seconds: float,
450 stream_name: Optional[str] = None,
451 consumer_group: Optional[str] = None,
452 consumer_name: Optional[str] = None, # noqa
453) -> None:
454 """
455 Record the lag of a streaming message.
456
457 Args:
458 lag_seconds: Lag in seconds
459 stream_name: Stream name
460 consumer_group: Consumer group name
461 consumer_name: Consumer name
462 """
463 global _metrics_collector
464
465 if _metrics_collector is None:
466 _metrics_collector = _get_or_create_collector()
467 if _metrics_collector is None:
468 return
469
470 # Check if stream names should be hidden
471 effective_stream_name = stream_name
472 if stream_name is not None:
473 config = _get_config()
474 if config is not None and config.hide_stream_names:
475 effective_stream_name = None
476
477 try:
478 _metrics_collector.record_streaming_lag(
479 lag_seconds=lag_seconds,
480 stream_name=effective_stream_name,
481 consumer_group=consumer_group,
482 )
483 except Exception:
484 pass
485
486
487@deprecated_args(
488 args_to_warn=["consumer_name"],
489 reason="The consumer_name argument is no longer used and will be removed in the next major version.",
490 version="7.2.1",
491)
492def record_streaming_lag_from_response(
493 response,
494 consumer_group: Optional[str] = None,
495 consumer_name: Optional[str] = None, # noqa
496) -> None:
497 """
498 Record streaming lag from XREAD/XREADGROUP response.
499
500 Parses the response and calculates lag for each message based on message ID timestamp.
501
502 Args:
503 response: Response from XREAD/XREADGROUP command
504 consumer_group: Consumer group name (for XREADGROUP)
505 consumer_name: Consumer name (for XREADGROUP)
506 """
507
508 global _metrics_collector
509
510 if _metrics_collector is None:
511 _metrics_collector = _get_or_create_collector()
512 if _metrics_collector is None:
513 return
514
515 if not response:
516 return
517
518 try:
519 now = datetime.now().timestamp()
520
521 # Check if stream names should be hidden
522 config = _get_config()
523 hide_stream_names = config is not None and config.hide_stream_names
524
525 # RESP3 format: dict
526 if isinstance(response, dict):
527 for stream_name, stream_messages in response.items():
528 effective_stream_name = (
529 None if hide_stream_names else str_if_bytes(stream_name)
530 )
531 for messages in stream_messages:
532 for message in messages:
533 message_id, _ = message
534 message_id = str_if_bytes(message_id)
535 timestamp, _ = message_id.split("-")
536 # Ensure lag is non-negative (clock skew can cause negative values)
537 lag_seconds = max(0.0, now - int(timestamp) / 1000)
538
539 _metrics_collector.record_streaming_lag(
540 lag_seconds=lag_seconds,
541 stream_name=effective_stream_name,
542 consumer_group=consumer_group,
543 )
544 else:
545 # RESP2 format: list
546 for stream_entry in response:
547 stream_name = str_if_bytes(stream_entry[0])
548 effective_stream_name = None if hide_stream_names else stream_name
549
550 for message in stream_entry[1]:
551 message_id, _ = message
552 message_id = str_if_bytes(message_id)
553 timestamp, _ = message_id.split("-")
554 # Ensure lag is non-negative (clock skew can cause negative values)
555 lag_seconds = max(0.0, now - int(timestamp) / 1000)
556
557 _metrics_collector.record_streaming_lag(
558 lag_seconds=lag_seconds,
559 stream_name=effective_stream_name,
560 consumer_group=consumer_group,
561 )
562 except Exception:
563 pass
564
565
566def record_maint_notification_count(
567 server_address: str,
568 server_port: int,
569 network_peer_address: str,
570 network_peer_port: int,
571 maint_notification: str,
572) -> None:
573 """
574 Record a maintenance notification count.
575
576 Args:
577 server_address: Server address
578 server_port: Server port
579 network_peer_address: Network peer address
580 network_peer_port: Network peer port
581 maint_notification: Maintenance notification type (e.g., 'MOVING', 'MIGRATING')
582
583 Example:
584 >>> record_maint_notification_count('localhost', 6379, 'localhost', 6379, 'MOVING')
585 """
586 global _metrics_collector
587
588 if _metrics_collector is None:
589 _metrics_collector = _get_or_create_collector()
590 if _metrics_collector is None:
591 return
592
593 try:
594 _metrics_collector.record_maint_notification_count(
595 server_address=server_address,
596 server_port=server_port,
597 network_peer_address=network_peer_address,
598 network_peer_port=network_peer_port,
599 maint_notification=maint_notification,
600 )
601 except Exception:
602 pass
603
604
605def record_csc_request(
606 result: Optional[CSCResult] = None,
607):
608 """
609 Record a Client Side Caching (CSC) request.
610
611 Args:
612 result: CSC result ('hit' or 'miss')
613 """
614 global _metrics_collector
615
616 if _metrics_collector is None:
617 _metrics_collector = _get_or_create_collector()
618 if _metrics_collector is None:
619 return
620
621 try:
622 _metrics_collector.record_csc_request(
623 result=result,
624 )
625 except Exception:
626 pass
627
628
629def init_csc_items() -> None:
630 """
631 Initialize observable gauge for CSC items metric.
632 """
633 global _metrics_collector
634
635 if _metrics_collector is None:
636 _metrics_collector = _get_or_create_collector()
637 if _metrics_collector is None:
638 return
639
640 def observable_callback(__):
641 observables_registry = get_observables_registry_instance()
642 callbacks = observables_registry.get(CSC_ITEMS_REGISTRY_KEY)
643 observations = []
644
645 for callback in callbacks:
646 observations.extend(callback())
647
648 return observations
649
650 try:
651 _metrics_collector.init_csc_items(
652 callback=observable_callback,
653 )
654 except Exception:
655 pass
656
657
658def register_csc_items_callback(
659 callback: Callable,
660 pool_name: Optional[str] = None,
661) -> None:
662 """
663 Adds given callback to CSC items observable registry.
664
665 Args:
666 callback: Callback function that returns the cache size
667 pool_name: Connection pool name for observability
668 """
669 global _metrics_collector
670
671 if _metrics_collector is None:
672 _metrics_collector = _get_or_create_collector()
673 if _metrics_collector is None:
674 return
675
676 # Lazy import
677 from opentelemetry.metrics import Observation
678
679 def csc_items_callback():
680 return [
681 Observation(
682 callback(),
683 attributes=AttributeBuilder.build_csc_attributes(pool_name=pool_name),
684 )
685 ]
686
687 try:
688 observables_registry = get_observables_registry_instance()
689 observables_registry.register(CSC_ITEMS_REGISTRY_KEY, csc_items_callback)
690 except Exception:
691 pass
692
693
694def record_csc_eviction(
695 count: int,
696 reason: Optional[CSCReason] = None,
697) -> None:
698 """
699 Record a Client Side Caching (CSC) eviction.
700
701 Args:
702 count: Number of evictions
703 reason: Reason for eviction
704 """
705 global _metrics_collector
706
707 if _metrics_collector is None:
708 _metrics_collector = _get_or_create_collector()
709 if _metrics_collector is None:
710 return
711
712 try:
713 _metrics_collector.record_csc_eviction(
714 count=count,
715 reason=reason,
716 )
717 except Exception:
718 pass
719
720
721def record_csc_network_saved(
722 bytes_saved: int,
723) -> None:
724 """
725 Record the number of bytes saved by using Client Side Caching (CSC).
726
727 Args:
728 bytes_saved: Number of bytes saved
729 """
730 global _metrics_collector
731
732 if _metrics_collector is None:
733 _metrics_collector = _get_or_create_collector()
734 if _metrics_collector is None:
735 return
736
737 try:
738 _metrics_collector.record_csc_network_saved(
739 bytes_saved=bytes_saved,
740 )
741 except Exception:
742 pass
743
744
745def record_geo_failover(
746 fail_from: "SyncDatabase",
747 fail_to: "SyncDatabase",
748 reason: GeoFailoverReason,
749) -> None:
750 """
751 Record a geo failover.
752
753 Args:
754 fail_from: Database failed from
755 fail_to: Database failed to
756 reason: Reason for the failover
757 """
758 global _metrics_collector
759
760 if _metrics_collector is None:
761 _metrics_collector = _get_or_create_collector()
762 if _metrics_collector is None:
763 return
764
765 try:
766 _metrics_collector.record_geo_failover(
767 fail_from=fail_from,
768 fail_to=fail_to,
769 reason=reason,
770 )
771 except Exception:
772 pass
773
774
775def _get_or_create_collector() -> Optional[RedisMetricsCollector]:
776 """
777 Get or create the global metrics collector.
778
779 Returns:
780 RedisMetricsCollector instance if observability is enabled, None otherwise
781 """
782 try:
783 manager = get_observability_instance().get_provider_manager()
784 if manager is None or not manager.config.enabled_telemetry:
785 return None
786
787 # Get meter from the global MeterProvider
788 meter = manager.get_meter_provider().get_meter(
789 RedisMetricsCollector.METER_NAME, RedisMetricsCollector.METER_VERSION
790 )
791
792 return RedisMetricsCollector(meter, manager.config)
793
794 except ImportError:
795 # Observability module not available
796 return None
797 except Exception:
798 # Any other error - don't break Redis operations
799 return None
800
801
802def _get_config() -> Optional["OTelConfig"]:
803 """
804 Get the OTel configuration from the observability manager.
805
806 Returns:
807 OTelConfig instance if observability is enabled, None otherwise
808 """
809 try:
810 manager = get_observability_instance().get_provider_manager()
811 if manager is None:
812 return None
813 return manager.config
814 except Exception:
815 return None
816
817
818def reset_collector() -> None:
819 """
820 Reset the global collector (used for testing or re-initialization).
821 """
822 global _metrics_collector
823 _metrics_collector = None
824
825
826def is_enabled() -> bool:
827 """
828 Check if observability is enabled.
829
830 Returns:
831 True if metrics are being collected, False otherwise
832 """
833 global _metrics_collector
834
835 if _metrics_collector is None:
836 _metrics_collector = _get_or_create_collector()
837
838 return _metrics_collector is not None