1"""
2OpenTelemetry semantic convention attributes for Redis.
3
4This module provides constants and helper functions for building OTel attributes
5according to the semantic conventions for database clients.
6
7Reference: https://opentelemetry.io/docs/specs/semconv/database/redis/
8"""
9
10from enum import Enum
11from typing import TYPE_CHECKING, Any, Dict, Optional, Union
12
13import redis
14
15if TYPE_CHECKING:
16 from redis.asyncio.connection import ConnectionPool
17 from redis.asyncio.multidb.database import AsyncDatabase
18 from redis.connection import ConnectionPoolInterface
19 from redis.multidb.database import SyncDatabase
20
21# Database semantic convention attributes
22DB_SYSTEM = "db.system"
23DB_NAMESPACE = "db.namespace"
24DB_OPERATION_NAME = "db.operation.name"
25DB_RESPONSE_STATUS_CODE = "db.response.status_code"
26DB_STORED_PROCEDURE_NAME = "db.stored_procedure.name"
27
28# Error attributes
29ERROR_TYPE = "error.type"
30
31# Network attributes
32NETWORK_PEER_ADDRESS = "network.peer.address"
33NETWORK_PEER_PORT = "network.peer.port"
34
35# Server attributes
36SERVER_ADDRESS = "server.address"
37SERVER_PORT = "server.port"
38
39# Connection pool attributes
40DB_CLIENT_CONNECTION_POOL_NAME = "db.client.connection.pool.name"
41DB_CLIENT_CONNECTION_STATE = "db.client.connection.state"
42DB_CLIENT_CONNECTION_NAME = "db.client.connection.name"
43
44# Geofailover attributes
45DB_CLIENT_GEOFAILOVER_FAIL_FROM = "db.client.geofailover.fail_from"
46DB_CLIENT_GEOFAILOVER_FAIL_TO = "db.client.geofailover.fail_to"
47DB_CLIENT_GEOFAILOVER_REASON = "db.client.geofailover.reason"
48
49# Redis-specific attributes
50REDIS_CLIENT_LIBRARY = "redis.client.library"
51REDIS_CLIENT_CONNECTION_PUBSUB = "redis.client.connection.pubsub"
52REDIS_CLIENT_CONNECTION_CLOSE_REASON = "redis.client.connection.close.reason"
53REDIS_CLIENT_CONNECTION_NOTIFICATION = "redis.client.connection.notification"
54REDIS_CLIENT_OPERATION_RETRY_ATTEMPTS = "redis.client.operation.retry_attempts"
55REDIS_CLIENT_OPERATION_BLOCKING = "redis.client.operation.blocking"
56REDIS_CLIENT_PUBSUB_MESSAGE_DIRECTION = "redis.client.pubsub.message.direction"
57REDIS_CLIENT_PUBSUB_CHANNEL = "redis.client.pubsub.channel"
58REDIS_CLIENT_PUBSUB_SHARDED = "redis.client.pubsub.sharded"
59REDIS_CLIENT_ERROR_INTERNAL = "redis.client.errors.internal"
60REDIS_CLIENT_ERROR_CATEGORY = "redis.client.errors.category"
61REDIS_CLIENT_STREAM_NAME = "redis.client.stream.name"
62REDIS_CLIENT_CONSUMER_GROUP = "redis.client.consumer_group"
63REDIS_CLIENT_CSC_RESULT = "redis.client.csc.result"
64REDIS_CLIENT_CSC_REASON = "redis.client.csc.reason"
65
66
67class ConnectionState(Enum):
68 IDLE = "idle"
69 USED = "used"
70
71
72class PubSubDirection(Enum):
73 PUBLISH = "publish"
74 RECEIVE = "receive"
75
76
77class CSCResult(Enum):
78 HIT = "hit"
79 MISS = "miss"
80
81
82class CSCReason(Enum):
83 FULL = "full"
84 INVALIDATION = "invalidation"
85
86
87class GeoFailoverReason(Enum):
88 AUTOMATIC = "automatic"
89 MANUAL = "manual"
90
91
92class AttributeBuilder:
93 """
94 Helper class to build OTel semantic convention attributes for Redis operations.
95 """
96
97 @staticmethod
98 def build_base_attributes(
99 server_address: Optional[str] = None,
100 server_port: Optional[int] = None,
101 db_namespace: Optional[int] = None,
102 ) -> Dict[str, Any]:
103 """
104 Build base attributes common to all Redis operations.
105
106 Args:
107 server_address: Redis server address (FQDN or IP)
108 server_port: Redis server port
109 db_namespace: Redis database index
110
111 Returns:
112 Dictionary of base attributes
113 """
114 attrs: Dict[str, Any] = {
115 DB_SYSTEM: "redis",
116 REDIS_CLIENT_LIBRARY: f"redis-py:v{redis.__version__}",
117 }
118
119 if server_address is not None:
120 attrs[SERVER_ADDRESS] = server_address
121
122 if server_port is not None:
123 attrs[SERVER_PORT] = server_port
124
125 if db_namespace is not None:
126 attrs[DB_NAMESPACE] = str(db_namespace)
127
128 return attrs
129
130 @staticmethod
131 def build_operation_attributes(
132 command_name: Optional[Union[str, bytes]] = None,
133 batch_size: Optional[int] = None, # noqa
134 network_peer_address: Optional[str] = None,
135 network_peer_port: Optional[int] = None,
136 stored_procedure_name: Optional[str] = None,
137 retry_attempts: Optional[int] = None,
138 is_blocking: Optional[bool] = None,
139 ) -> Dict[str, Any]:
140 """
141 Build attributes for a Redis operation (command execution).
142
143 Args:
144 command_name: Redis command name (e.g., 'GET', 'SET', 'MULTI'), can be str or bytes
145 batch_size: Number of commands in batch (for pipelines/transactions)
146 network_peer_address: Resolved peer address
147 network_peer_port: Peer port number
148 stored_procedure_name: Lua script name or SHA1 digest
149 retry_attempts: Number of retry attempts made
150 is_blocking: Whether the operation is a blocking command
151
152 Returns:
153 Dictionary of operation attributes
154 """
155 attrs: Dict[str, Any] = {}
156
157 if command_name is not None:
158 # Ensure command_name is a string (it can be bytes from args[0])
159 if isinstance(command_name, bytes):
160 command_name = command_name.decode("utf-8", errors="replace")
161 attrs[DB_OPERATION_NAME] = command_name.upper()
162
163 if network_peer_address is not None:
164 attrs[NETWORK_PEER_ADDRESS] = network_peer_address
165
166 if network_peer_port is not None:
167 attrs[NETWORK_PEER_PORT] = network_peer_port
168
169 if stored_procedure_name is not None:
170 attrs[DB_STORED_PROCEDURE_NAME] = stored_procedure_name
171
172 if retry_attempts is not None and retry_attempts > 0:
173 attrs[REDIS_CLIENT_OPERATION_RETRY_ATTEMPTS] = retry_attempts
174
175 if is_blocking is not None:
176 attrs[REDIS_CLIENT_OPERATION_BLOCKING] = is_blocking
177
178 return attrs
179
180 @staticmethod
181 def build_connection_attributes(
182 pool_name: Optional[str] = None,
183 connection_state: Optional[ConnectionState] = None,
184 connection_name: Optional[str] = None,
185 is_pubsub: Optional[bool] = None,
186 ) -> Dict[str, Any]:
187 """
188 Build attributes for connection pool metrics.
189
190 Args:
191 pool_name: Unique connection pool name
192 connection_state: Connection state ('idle' or 'used')
193 is_pubsub: Whether this is a PubSub connection
194 connection_name: Unique connection name
195
196 Returns:
197 Dictionary of connection pool attributes
198 """
199 attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
200
201 if pool_name is not None:
202 attrs[DB_CLIENT_CONNECTION_POOL_NAME] = pool_name
203
204 if connection_state is not None:
205 attrs[DB_CLIENT_CONNECTION_STATE] = connection_state.value
206
207 if is_pubsub is not None:
208 attrs[REDIS_CLIENT_CONNECTION_PUBSUB] = is_pubsub
209
210 if connection_name is not None:
211 attrs[DB_CLIENT_CONNECTION_NAME] = connection_name
212
213 return attrs
214
215 @staticmethod
216 def build_error_attributes(
217 error_type: Optional[Exception] = None,
218 is_internal: Optional[bool] = None,
219 ) -> Dict[str, Any]:
220 """
221 Build error attributes.
222
223 Args:
224 is_internal: Whether the error is internal (e.g., timeout, network error)
225 error_type: The exception that occurred
226
227 Returns:
228 Dictionary of error attributes
229 """
230 attrs: Dict[str, Any] = {}
231
232 if error_type is not None:
233 attrs[ERROR_TYPE] = error_type.__class__.__name__
234
235 if (
236 hasattr(error_type, "status_code")
237 and error_type.status_code is not None
238 ):
239 attrs[DB_RESPONSE_STATUS_CODE] = error_type.status_code
240 else:
241 attrs[DB_RESPONSE_STATUS_CODE] = "error"
242
243 if hasattr(error_type, "error_type") and error_type.error_type is not None:
244 attrs[REDIS_CLIENT_ERROR_CATEGORY] = error_type.error_type.value
245 else:
246 attrs[REDIS_CLIENT_ERROR_CATEGORY] = "other"
247
248 if is_internal is not None:
249 attrs[REDIS_CLIENT_ERROR_INTERNAL] = is_internal
250
251 return attrs
252
253 @staticmethod
254 def build_pubsub_message_attributes(
255 direction: PubSubDirection,
256 channel: Optional[str] = None,
257 sharded: Optional[bool] = None,
258 ) -> Dict[str, Any]:
259 """
260 Build attributes for a PubSub message.
261
262 Args:
263 direction: Message direction ('publish' or 'receive')
264 channel: Pub/Sub channel name
265 sharded: True if sharded Pub/Sub channel
266
267 Returns:
268 Dictionary of PubSub message attributes
269 """
270 attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
271 attrs[REDIS_CLIENT_PUBSUB_MESSAGE_DIRECTION] = direction.value
272
273 if channel is not None:
274 attrs[REDIS_CLIENT_PUBSUB_CHANNEL] = channel
275
276 if sharded is not None:
277 attrs[REDIS_CLIENT_PUBSUB_SHARDED] = sharded
278
279 return attrs
280
281 @staticmethod
282 def build_streaming_attributes(
283 stream_name: Optional[str] = None,
284 consumer_group: Optional[str] = None,
285 consumer_name: Optional[str] = None, # noqa
286 ) -> Dict[str, Any]:
287 """
288 Build attributes for a streaming operation.
289
290 Args:
291 stream_name: Name of the stream
292 consumer_group: Name of the consumer group
293 consumer_name: Name of the consumer
294
295 Returns:
296 Dictionary of streaming attributes
297 """
298 attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
299
300 if stream_name is not None:
301 attrs[REDIS_CLIENT_STREAM_NAME] = stream_name
302
303 if consumer_group is not None:
304 attrs[REDIS_CLIENT_CONSUMER_GROUP] = consumer_group
305
306 return attrs
307
308 @staticmethod
309 def build_csc_attributes(
310 pool_name: Optional[str] = None,
311 result: Optional[CSCResult] = None,
312 reason: Optional[CSCReason] = None,
313 ) -> Dict[str, Any]:
314 """
315 Build attributes for a Client Side Caching (CSC) operation.
316
317 Args:
318 pool_name: Connection pool name (used only for csc_items metric)
319 result: CSC result ('hit' or 'miss')
320 reason: Reason for CSC eviction ('full' or 'invalidation')
321
322 Returns:
323 Dictionary of CSC attributes
324 """
325 attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
326
327 if pool_name is not None:
328 attrs[DB_CLIENT_CONNECTION_POOL_NAME] = pool_name
329
330 if result is not None:
331 attrs[REDIS_CLIENT_CSC_RESULT] = result.value
332
333 if reason is not None:
334 attrs[REDIS_CLIENT_CSC_REASON] = reason.value
335
336 return attrs
337
338 @staticmethod
339 def build_geo_failover_attributes(
340 fail_from: Union["SyncDatabase", "AsyncDatabase"],
341 fail_to: Union["SyncDatabase", "AsyncDatabase"],
342 reason: GeoFailoverReason,
343 ) -> Dict[str, Any]:
344 """
345 Build attributes for a geo failover.
346
347 Args:
348 fail_from: Database failed from
349 fail_to: Database failed to
350 reason: Reason for the failover
351
352 Returns:
353 Dictionary of geo failover attributes
354 """
355 attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
356
357 attrs[DB_CLIENT_GEOFAILOVER_FAIL_FROM] = get_db_name(fail_from)
358 attrs[DB_CLIENT_GEOFAILOVER_FAIL_TO] = get_db_name(fail_to)
359 attrs[DB_CLIENT_GEOFAILOVER_REASON] = reason.value
360
361 return attrs
362
363 @staticmethod
364 def build_pool_name(
365 server_address: str,
366 server_port: int,
367 db_namespace: int = 0,
368 ) -> str:
369 """
370 Build a unique connection pool name.
371
372 Args:
373 server_address: Redis server address
374 server_port: Redis server port
375 db_namespace: Redis database index
376
377 Returns:
378 Unique pool name in format "address:port/db"
379 """
380 return f"{server_address}:{server_port}/{db_namespace}"
381
382
383def get_pool_name(pool: Union["ConnectionPoolInterface", "ConnectionPool"]) -> str:
384 """
385 Get a short string representation of a connection pool for observability.
386
387 This provides a concise pool identifier suitable for use as a metric attribute,
388 in the format: host:port_uniqueID (matching go-redis format)
389
390 Args:
391 pool: Connection pool instance
392
393 Returns:
394 Short pool name in format "host:port_uniqueID"
395
396 Example:
397 >>> pool = ConnectionPool(host='localhost', port=6379, db=0)
398 >>> get_pool_name(pool)
399 'localhost:6379_a1b2c3d4'
400 """
401 host = pool.connection_kwargs.get("host", "unknown")
402 port = pool.connection_kwargs.get("port", 6379)
403
404 # Get unique pool ID if available (added for observability)
405 pool_id = getattr(pool, "_pool_id", "")
406
407 if pool_id:
408 return f"{host}:{port}_{pool_id}"
409 else:
410 return f"{host}:{port}"
411
412
413def get_db_name(database: Union["SyncDatabase", "AsyncDatabase"]):
414 """
415 Get a short string representation of a database for observability.
416
417 Args:
418 database: Database instance
419
420 Returns:
421 Short database name in format "{host}:{port}/{weight}"
422 """
423
424 host = database.client.get_connection_kwargs()["host"]
425 port = database.client.get_connection_kwargs()["port"]
426 weight = database.weight
427
428 return f"{host}:{port}/{weight}"