Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/observability/attributes.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

159 statements  

1""" 

2OpenTelemetry semantic convention attributes for Redis. 

3 

4This module provides constants and helper functions for building OTel attributes 

5according to the semantic conventions for database clients. 

6 

7Reference: https://opentelemetry.io/docs/specs/semconv/database/redis/ 

8""" 

9 

10from enum import Enum 

11from typing import TYPE_CHECKING, Any, Dict, Optional, Union 

12 

13import redis 

14 

15if TYPE_CHECKING: 

16 from redis.asyncio.connection import ConnectionPool 

17 from redis.asyncio.multidb.database import AsyncDatabase 

18 from redis.connection import ConnectionPoolInterface 

19 from redis.multidb.database import SyncDatabase 

20 

21# Database semantic convention attributes 

22DB_SYSTEM = "db.system" 

23DB_NAMESPACE = "db.namespace" 

24DB_OPERATION_NAME = "db.operation.name" 

25DB_RESPONSE_STATUS_CODE = "db.response.status_code" 

26DB_STORED_PROCEDURE_NAME = "db.stored_procedure.name" 

27 

28# Error attributes 

29ERROR_TYPE = "error.type" 

30 

31# Network attributes 

32NETWORK_PEER_ADDRESS = "network.peer.address" 

33NETWORK_PEER_PORT = "network.peer.port" 

34 

35# Server attributes 

36SERVER_ADDRESS = "server.address" 

37SERVER_PORT = "server.port" 

38 

39# Connection pool attributes 

40DB_CLIENT_CONNECTION_POOL_NAME = "db.client.connection.pool.name" 

41DB_CLIENT_CONNECTION_STATE = "db.client.connection.state" 

42DB_CLIENT_CONNECTION_NAME = "db.client.connection.name" 

43 

44# Geofailover attributes 

45DB_CLIENT_GEOFAILOVER_FAIL_FROM = "db.client.geofailover.fail_from" 

46DB_CLIENT_GEOFAILOVER_FAIL_TO = "db.client.geofailover.fail_to" 

47DB_CLIENT_GEOFAILOVER_REASON = "db.client.geofailover.reason" 

48 

49# Redis-specific attributes 

50REDIS_CLIENT_LIBRARY = "redis.client.library" 

51REDIS_CLIENT_CONNECTION_PUBSUB = "redis.client.connection.pubsub" 

52REDIS_CLIENT_CONNECTION_CLOSE_REASON = "redis.client.connection.close.reason" 

53REDIS_CLIENT_CONNECTION_NOTIFICATION = "redis.client.connection.notification" 

54REDIS_CLIENT_OPERATION_RETRY_ATTEMPTS = "redis.client.operation.retry_attempts" 

55REDIS_CLIENT_OPERATION_BLOCKING = "redis.client.operation.blocking" 

56REDIS_CLIENT_PUBSUB_MESSAGE_DIRECTION = "redis.client.pubsub.message.direction" 

57REDIS_CLIENT_PUBSUB_CHANNEL = "redis.client.pubsub.channel" 

58REDIS_CLIENT_PUBSUB_SHARDED = "redis.client.pubsub.sharded" 

59REDIS_CLIENT_ERROR_INTERNAL = "redis.client.errors.internal" 

60REDIS_CLIENT_ERROR_CATEGORY = "redis.client.errors.category" 

61REDIS_CLIENT_STREAM_NAME = "redis.client.stream.name" 

62REDIS_CLIENT_CONSUMER_GROUP = "redis.client.consumer_group" 

63REDIS_CLIENT_CSC_RESULT = "redis.client.csc.result" 

64REDIS_CLIENT_CSC_REASON = "redis.client.csc.reason" 

65 

66 

67class ConnectionState(Enum): 

68 IDLE = "idle" 

69 USED = "used" 

70 

71 

72class PubSubDirection(Enum): 

73 PUBLISH = "publish" 

74 RECEIVE = "receive" 

75 

76 

77class CSCResult(Enum): 

78 HIT = "hit" 

79 MISS = "miss" 

80 

81 

82class CSCReason(Enum): 

83 FULL = "full" 

84 INVALIDATION = "invalidation" 

85 

86 

87class GeoFailoverReason(Enum): 

88 AUTOMATIC = "automatic" 

89 MANUAL = "manual" 

90 

91 

92class AttributeBuilder: 

93 """ 

94 Helper class to build OTel semantic convention attributes for Redis operations. 

95 """ 

96 

97 @staticmethod 

98 def build_base_attributes( 

99 server_address: Optional[str] = None, 

100 server_port: Optional[int] = None, 

101 db_namespace: Optional[int] = None, 

102 ) -> Dict[str, Any]: 

103 """ 

104 Build base attributes common to all Redis operations. 

105 

106 Args: 

107 server_address: Redis server address (FQDN or IP) 

108 server_port: Redis server port 

109 db_namespace: Redis database index 

110 

111 Returns: 

112 Dictionary of base attributes 

113 """ 

114 attrs: Dict[str, Any] = { 

115 DB_SYSTEM: "redis", 

116 REDIS_CLIENT_LIBRARY: f"redis-py:v{redis.__version__}", 

117 } 

118 

119 if server_address is not None: 

120 attrs[SERVER_ADDRESS] = server_address 

121 

122 if server_port is not None: 

123 attrs[SERVER_PORT] = server_port 

124 

125 if db_namespace is not None: 

126 attrs[DB_NAMESPACE] = str(db_namespace) 

127 

128 return attrs 

129 

130 @staticmethod 

131 def build_operation_attributes( 

132 command_name: Optional[Union[str, bytes]] = None, 

133 batch_size: Optional[int] = None, # noqa 

134 network_peer_address: Optional[str] = None, 

135 network_peer_port: Optional[int] = None, 

136 stored_procedure_name: Optional[str] = None, 

137 retry_attempts: Optional[int] = None, 

138 is_blocking: Optional[bool] = None, 

139 ) -> Dict[str, Any]: 

140 """ 

141 Build attributes for a Redis operation (command execution). 

142 

143 Args: 

144 command_name: Redis command name (e.g., 'GET', 'SET', 'MULTI'), can be str or bytes 

145 batch_size: Number of commands in batch (for pipelines/transactions) 

146 network_peer_address: Resolved peer address 

147 network_peer_port: Peer port number 

148 stored_procedure_name: Lua script name or SHA1 digest 

149 retry_attempts: Number of retry attempts made 

150 is_blocking: Whether the operation is a blocking command 

151 

152 Returns: 

153 Dictionary of operation attributes 

154 """ 

155 attrs: Dict[str, Any] = {} 

156 

157 if command_name is not None: 

158 # Ensure command_name is a string (it can be bytes from args[0]) 

159 if isinstance(command_name, bytes): 

160 command_name = command_name.decode("utf-8", errors="replace") 

161 attrs[DB_OPERATION_NAME] = command_name.upper() 

162 

163 if network_peer_address is not None: 

164 attrs[NETWORK_PEER_ADDRESS] = network_peer_address 

165 

166 if network_peer_port is not None: 

167 attrs[NETWORK_PEER_PORT] = network_peer_port 

168 

169 if stored_procedure_name is not None: 

170 attrs[DB_STORED_PROCEDURE_NAME] = stored_procedure_name 

171 

172 if retry_attempts is not None and retry_attempts > 0: 

173 attrs[REDIS_CLIENT_OPERATION_RETRY_ATTEMPTS] = retry_attempts 

174 

175 if is_blocking is not None: 

176 attrs[REDIS_CLIENT_OPERATION_BLOCKING] = is_blocking 

177 

178 return attrs 

179 

180 @staticmethod 

181 def build_connection_attributes( 

182 pool_name: Optional[str] = None, 

183 connection_state: Optional[ConnectionState] = None, 

184 connection_name: Optional[str] = None, 

185 is_pubsub: Optional[bool] = None, 

186 ) -> Dict[str, Any]: 

187 """ 

188 Build attributes for connection pool metrics. 

189 

190 Args: 

191 pool_name: Unique connection pool name 

192 connection_state: Connection state ('idle' or 'used') 

193 is_pubsub: Whether this is a PubSub connection 

194 connection_name: Unique connection name 

195 

196 Returns: 

197 Dictionary of connection pool attributes 

198 """ 

199 attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes() 

200 

201 if pool_name is not None: 

202 attrs[DB_CLIENT_CONNECTION_POOL_NAME] = pool_name 

203 

204 if connection_state is not None: 

205 attrs[DB_CLIENT_CONNECTION_STATE] = connection_state.value 

206 

207 if is_pubsub is not None: 

208 attrs[REDIS_CLIENT_CONNECTION_PUBSUB] = is_pubsub 

209 

210 if connection_name is not None: 

211 attrs[DB_CLIENT_CONNECTION_NAME] = connection_name 

212 

213 return attrs 

214 

215 @staticmethod 

216 def build_error_attributes( 

217 error_type: Optional[Exception] = None, 

218 is_internal: Optional[bool] = None, 

219 ) -> Dict[str, Any]: 

220 """ 

221 Build error attributes. 

222 

223 Args: 

224 is_internal: Whether the error is internal (e.g., timeout, network error) 

225 error_type: The exception that occurred 

226 

227 Returns: 

228 Dictionary of error attributes 

229 """ 

230 attrs: Dict[str, Any] = {} 

231 

232 if error_type is not None: 

233 attrs[ERROR_TYPE] = error_type.__class__.__name__ 

234 

235 if ( 

236 hasattr(error_type, "status_code") 

237 and error_type.status_code is not None 

238 ): 

239 attrs[DB_RESPONSE_STATUS_CODE] = error_type.status_code 

240 else: 

241 attrs[DB_RESPONSE_STATUS_CODE] = "error" 

242 

243 if hasattr(error_type, "error_type") and error_type.error_type is not None: 

244 attrs[REDIS_CLIENT_ERROR_CATEGORY] = error_type.error_type.value 

245 else: 

246 attrs[REDIS_CLIENT_ERROR_CATEGORY] = "other" 

247 

248 if is_internal is not None: 

249 attrs[REDIS_CLIENT_ERROR_INTERNAL] = is_internal 

250 

251 return attrs 

252 

253 @staticmethod 

254 def build_pubsub_message_attributes( 

255 direction: PubSubDirection, 

256 channel: Optional[str] = None, 

257 sharded: Optional[bool] = None, 

258 ) -> Dict[str, Any]: 

259 """ 

260 Build attributes for a PubSub message. 

261 

262 Args: 

263 direction: Message direction ('publish' or 'receive') 

264 channel: Pub/Sub channel name 

265 sharded: True if sharded Pub/Sub channel 

266 

267 Returns: 

268 Dictionary of PubSub message attributes 

269 """ 

270 attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes() 

271 attrs[REDIS_CLIENT_PUBSUB_MESSAGE_DIRECTION] = direction.value 

272 

273 if channel is not None: 

274 attrs[REDIS_CLIENT_PUBSUB_CHANNEL] = channel 

275 

276 if sharded is not None: 

277 attrs[REDIS_CLIENT_PUBSUB_SHARDED] = sharded 

278 

279 return attrs 

280 

281 @staticmethod 

282 def build_streaming_attributes( 

283 stream_name: Optional[str] = None, 

284 consumer_group: Optional[str] = None, 

285 consumer_name: Optional[str] = None, # noqa 

286 ) -> Dict[str, Any]: 

287 """ 

288 Build attributes for a streaming operation. 

289 

290 Args: 

291 stream_name: Name of the stream 

292 consumer_group: Name of the consumer group 

293 consumer_name: Name of the consumer 

294 

295 Returns: 

296 Dictionary of streaming attributes 

297 """ 

298 attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes() 

299 

300 if stream_name is not None: 

301 attrs[REDIS_CLIENT_STREAM_NAME] = stream_name 

302 

303 if consumer_group is not None: 

304 attrs[REDIS_CLIENT_CONSUMER_GROUP] = consumer_group 

305 

306 return attrs 

307 

308 @staticmethod 

309 def build_csc_attributes( 

310 pool_name: Optional[str] = None, 

311 result: Optional[CSCResult] = None, 

312 reason: Optional[CSCReason] = None, 

313 ) -> Dict[str, Any]: 

314 """ 

315 Build attributes for a Client Side Caching (CSC) operation. 

316 

317 Args: 

318 pool_name: Connection pool name (used only for csc_items metric) 

319 result: CSC result ('hit' or 'miss') 

320 reason: Reason for CSC eviction ('full' or 'invalidation') 

321 

322 Returns: 

323 Dictionary of CSC attributes 

324 """ 

325 attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes() 

326 

327 if pool_name is not None: 

328 attrs[DB_CLIENT_CONNECTION_POOL_NAME] = pool_name 

329 

330 if result is not None: 

331 attrs[REDIS_CLIENT_CSC_RESULT] = result.value 

332 

333 if reason is not None: 

334 attrs[REDIS_CLIENT_CSC_REASON] = reason.value 

335 

336 return attrs 

337 

338 @staticmethod 

339 def build_geo_failover_attributes( 

340 fail_from: Union["SyncDatabase", "AsyncDatabase"], 

341 fail_to: Union["SyncDatabase", "AsyncDatabase"], 

342 reason: GeoFailoverReason, 

343 ) -> Dict[str, Any]: 

344 """ 

345 Build attributes for a geo failover. 

346 

347 Args: 

348 fail_from: Database failed from 

349 fail_to: Database failed to 

350 reason: Reason for the failover 

351 

352 Returns: 

353 Dictionary of geo failover attributes 

354 """ 

355 attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes() 

356 

357 attrs[DB_CLIENT_GEOFAILOVER_FAIL_FROM] = get_db_name(fail_from) 

358 attrs[DB_CLIENT_GEOFAILOVER_FAIL_TO] = get_db_name(fail_to) 

359 attrs[DB_CLIENT_GEOFAILOVER_REASON] = reason.value 

360 

361 return attrs 

362 

363 @staticmethod 

364 def build_pool_name( 

365 server_address: str, 

366 server_port: int, 

367 db_namespace: int = 0, 

368 ) -> str: 

369 """ 

370 Build a unique connection pool name. 

371 

372 Args: 

373 server_address: Redis server address 

374 server_port: Redis server port 

375 db_namespace: Redis database index 

376 

377 Returns: 

378 Unique pool name in format "address:port/db" 

379 """ 

380 return f"{server_address}:{server_port}/{db_namespace}" 

381 

382 

383def get_pool_name(pool: Union["ConnectionPoolInterface", "ConnectionPool"]) -> str: 

384 """ 

385 Get a short string representation of a connection pool for observability. 

386 

387 This provides a concise pool identifier suitable for use as a metric attribute, 

388 in the format: host:port_uniqueID (matching go-redis format) 

389 

390 Args: 

391 pool: Connection pool instance 

392 

393 Returns: 

394 Short pool name in format "host:port_uniqueID" 

395 

396 Example: 

397 >>> pool = ConnectionPool(host='localhost', port=6379, db=0) 

398 >>> get_pool_name(pool) 

399 'localhost:6379_a1b2c3d4' 

400 """ 

401 host = pool.connection_kwargs.get("host", "unknown") 

402 port = pool.connection_kwargs.get("port", 6379) 

403 

404 # Get unique pool ID if available (added for observability) 

405 pool_id = getattr(pool, "_pool_id", "") 

406 

407 if pool_id: 

408 return f"{host}:{port}_{pool_id}" 

409 else: 

410 return f"{host}:{port}" 

411 

412 

413def get_db_name(database: Union["SyncDatabase", "AsyncDatabase"]): 

414 """ 

415 Get a short string representation of a database for observability. 

416 

417 Args: 

418 database: Database instance 

419 

420 Returns: 

421 Short database name in format "{host}:{port}/{weight}" 

422 """ 

423 

424 host = database.client.get_connection_kwargs()["host"] 

425 port = database.client.get_connection_kwargs()["port"] 

426 weight = database.weight 

427 

428 return f"{host}:{port}/{weight}"