Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

816 statements  

1import copy 

2import logging 

3import re 

4import threading 

5import time 

6from itertools import chain 

7from typing import ( 

8 TYPE_CHECKING, 

9 Any, 

10 Callable, 

11 Dict, 

12 List, 

13 Literal, 

14 Mapping, 

15 Optional, 

16 Set, 

17 Type, 

18 Union, 

19) 

20 

21from redis._parsers.encoders import Encoder 

22from redis._parsers.helpers import ( 

23 _RedisCallbacks, 

24 _RedisCallbacksRESP2, 

25 _RedisCallbacksRESP3, 

26 bool_ok, 

27) 

28from redis._parsers.socket import SENTINEL 

29from redis.backoff import ExponentialWithJitterBackoff 

30from redis.cache import CacheConfig, CacheInterface 

31from redis.commands import ( 

32 CoreCommands, 

33 RedisModuleCommands, 

34 SentinelCommands, 

35 list_or_args, 

36) 

37from redis.commands.core import Script 

38from redis.connection import ( 

39 AbstractConnection, 

40 Connection, 

41 ConnectionPool, 

42 SSLConnection, 

43 UnixDomainSocketConnection, 

44) 

45from redis.credentials import CredentialProvider 

46from redis.driver_info import DriverInfo, resolve_driver_info 

47from redis.event import ( 

48 AfterPooledConnectionsInstantiationEvent, 

49 AfterPubSubConnectionInstantiationEvent, 

50 AfterSingleConnectionInstantiationEvent, 

51 ClientType, 

52 EventDispatcher, 

53) 

54from redis.exceptions import ( 

55 ConnectionError, 

56 ExecAbortError, 

57 PubSubError, 

58 RedisError, 

59 ResponseError, 

60 WatchError, 

61) 

62from redis.lock import Lock 

63from redis.maint_notifications import ( 

64 MaintNotificationsConfig, 

65 OSSMaintNotificationsHandler, 

66) 

67from redis.observability.attributes import PubSubDirection 

68from redis.observability.recorder import ( 

69 record_error_count, 

70 record_operation_duration, 

71 record_pubsub_message, 

72) 

73from redis.retry import Retry 

74from redis.utils import ( 

75 _set_info_logger, 

76 check_protocol_version, 

77 deprecated_args, 

78 safe_str, 

79 str_if_bytes, 

80 truncate_text, 

81) 

82 

83if TYPE_CHECKING: 

84 import ssl 

85 

86 import OpenSSL 

87 

88 from redis.keyspace_notifications import KeyspaceNotifications 

89 

90SYM_EMPTY = b"" 

91EMPTY_RESPONSE = "EMPTY_RESPONSE" 

92 

93# some responses (ie. dump) are binary, and just meant to never be decoded 

94NEVER_DECODE = "NEVER_DECODE" 

95 

96 

97logger = logging.getLogger(__name__) 

98 

99 

100def is_debug_log_enabled(): 

101 return logger.isEnabledFor(logging.DEBUG) 

102 

103 

104def add_debug_log_for_operation_failure(connection: "AbstractConnection"): 

105 logger.debug( 

106 f"Operation failed, " 

107 f"with connection: {connection}, details: {connection.extract_connection_details() if connection else 'no connection'}", 

108 ) 

109 

110 

111class CaseInsensitiveDict(dict): 

112 "Case insensitive dict implementation. Assumes string keys only." 

113 

114 def __init__(self, data: Dict[str, str]) -> None: 

115 for k, v in data.items(): 

116 self[k.upper()] = v 

117 

118 def __contains__(self, k): 

119 return super().__contains__(k.upper()) 

120 

121 def __delitem__(self, k): 

122 super().__delitem__(k.upper()) 

123 

124 def __getitem__(self, k): 

125 return super().__getitem__(k.upper()) 

126 

127 def get(self, k, default=None): 

128 return super().get(k.upper(), default) 

129 

130 def __setitem__(self, k, v): 

131 super().__setitem__(k.upper(), v) 

132 

133 def update(self, data): 

134 data = CaseInsensitiveDict(data) 

135 super().update(data) 

136 

137 

138class AbstractRedis: 

139 pass 

140 

141 

142class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): 

143 """ 

144 Implementation of the Redis protocol. 

145 

146 This abstract class provides a Python interface to all Redis commands 

147 and an implementation of the Redis protocol. 

148 

149 Pipelines derive from this, implementing how 

150 the commands are sent and received to the Redis server. Based on 

151 configuration, an instance will either use a ConnectionPool, or 

152 Connection object to talk to redis. 

153 

154 It is not safe to pass PubSub or Pipeline objects between threads. 

155 """ 

156 

157 # Type discrimination marker for @overload self-type pattern 

158 _is_async_client: Literal[False] = False 

159 

160 @classmethod 

161 def from_url(cls, url: str, **kwargs) -> "Redis": 

162 """ 

163 Return a Redis client object configured from the given URL 

164 

165 For example:: 

166 

167 redis://[[username]:[password]]@localhost:6379/0 

168 rediss://[[username]:[password]]@localhost:6379/0 

169 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

170 

171 Three URL schemes are supported: 

172 

173 - `redis://` creates a TCP socket connection. See more at: 

174 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

175 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

176 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

177 - ``unix://``: creates a Unix Domain Socket connection. 

178 

179 The username, password, hostname, path and all querystring values 

180 are passed through urllib.parse.unquote in order to replace any 

181 percent-encoded values with their corresponding characters. 

182 

183 There are several ways to specify a database number. The first value 

184 found will be used: 

185 

186 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

187 2. If using the redis:// or rediss:// schemes, the path argument 

188 of the url, e.g. redis://localhost/0 

189 3. A ``db`` keyword argument to this function. 

190 

191 If none of these options are specified, the default db=0 is used. 

192 

193 All querystring options are cast to their appropriate Python types. 

194 Boolean arguments can be specified with string values "True"/"False" 

195 or "Yes"/"No". Values that cannot be properly cast cause a 

196 ``ValueError`` to be raised. Once parsed, the querystring arguments 

197 and keyword arguments are passed to the ``ConnectionPool``'s 

198 class initializer. In the case of conflicting arguments, querystring 

199 arguments always win. 

200 

201 """ 

202 single_connection_client = kwargs.pop("single_connection_client", False) 

203 connection_pool = ConnectionPool.from_url(url, **kwargs) 

204 client = cls( 

205 connection_pool=connection_pool, 

206 single_connection_client=single_connection_client, 

207 ) 

208 client.auto_close_connection_pool = True 

209 return client 

210 

211 @classmethod 

212 def from_pool( 

213 cls: Type["Redis"], 

214 connection_pool: ConnectionPool, 

215 ) -> "Redis": 

216 """ 

217 Return a Redis client from the given connection pool. 

218 The Redis client will take ownership of the connection pool and 

219 close it when the Redis client is closed. 

220 """ 

221 client = cls( 

222 connection_pool=connection_pool, 

223 ) 

224 client.auto_close_connection_pool = True 

225 return client 

226 

227 @deprecated_args( 

228 args_to_warn=["retry_on_timeout"], 

229 reason="TimeoutError is included by default.", 

230 version="6.0.0", 

231 ) 

232 @deprecated_args( 

233 args_to_warn=["lib_name", "lib_version"], 

234 reason="Use 'driver_info' parameter instead. " 

235 "lib_name and lib_version will be removed in a future version.", 

236 ) 

237 def __init__( 

238 self, 

239 host: str = "localhost", 

240 port: int = 6379, 

241 db: int = 0, 

242 password: Optional[str] = None, 

243 socket_timeout: Optional[float] = None, 

244 socket_connect_timeout: Optional[float] = None, 

245 socket_keepalive: Optional[bool] = None, 

246 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, 

247 connection_pool: Optional[ConnectionPool] = None, 

248 unix_socket_path: Optional[str] = None, 

249 encoding: str = "utf-8", 

250 encoding_errors: str = "strict", 

251 decode_responses: bool = False, 

252 retry_on_timeout: bool = False, 

253 retry: Retry = Retry( 

254 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 

255 ), 

256 retry_on_error: Optional[List[Type[Exception]]] = None, 

257 ssl: bool = False, 

258 ssl_keyfile: Optional[str] = None, 

259 ssl_certfile: Optional[str] = None, 

260 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required", 

261 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

262 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

263 ssl_ca_certs: Optional[str] = None, 

264 ssl_ca_path: Optional[str] = None, 

265 ssl_ca_data: Optional[str] = None, 

266 ssl_check_hostname: bool = True, 

267 ssl_password: Optional[str] = None, 

268 ssl_validate_ocsp: bool = False, 

269 ssl_validate_ocsp_stapled: bool = False, 

270 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None, 

271 ssl_ocsp_expected_cert: Optional[str] = None, 

272 ssl_min_version: Optional["ssl.TLSVersion"] = None, 

273 ssl_ciphers: Optional[str] = None, 

274 max_connections: Optional[int] = None, 

275 single_connection_client: bool = False, 

276 health_check_interval: int = 0, 

277 client_name: Optional[str] = None, 

278 lib_name: Optional[str] = None, 

279 lib_version: Optional[str] = None, 

280 driver_info: Optional["DriverInfo"] = None, 

281 username: Optional[str] = None, 

282 redis_connect_func: Optional[Callable[[], None]] = None, 

283 credential_provider: Optional[CredentialProvider] = None, 

284 protocol: Optional[int] = 2, 

285 cache: Optional[CacheInterface] = None, 

286 cache_config: Optional[CacheConfig] = None, 

287 event_dispatcher: Optional[EventDispatcher] = None, 

288 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

289 oss_cluster_maint_notifications_handler: Optional[ 

290 OSSMaintNotificationsHandler 

291 ] = None, 

292 ) -> None: 

293 """ 

294 Initialize a new Redis client. 

295 

296 To specify a retry policy for specific errors, you have two options: 

297 

298 1. Set the `retry_on_error` to a list of the error/s to retry on, and 

299 you can also set `retry` to a valid `Retry` object(in case the default 

300 one is not appropriate) - with this approach the retries will be triggered 

301 on the default errors specified in the Retry object enriched with the 

302 errors specified in `retry_on_error`. 

303 

304 2. Define a `Retry` object with configured 'supported_errors' and set 

305 it to the `retry` parameter - with this approach you completely redefine 

306 the errors on which retries will happen. 

307 

308 `retry_on_timeout` is deprecated - please include the TimeoutError 

309 either in the Retry object or in the `retry_on_error` list. 

310 

311 When 'connection_pool' is provided - the retry configuration of the 

312 provided pool will be used. 

313 

314 Args: 

315 

316 single_connection_client: 

317 if `True`, connection pool is not used. In that case `Redis` 

318 instance use is not thread safe. 

319 decode_responses: 

320 if `True`, the response will be decoded to utf-8. 

321 Argument is ignored when connection_pool is provided. 

322 driver_info: 

323 Optional DriverInfo object to identify upstream libraries. 

324 If provided, lib_name and lib_version are ignored. 

325 If not provided, a DriverInfo will be created from lib_name and lib_version. 

326 Argument is ignored when connection_pool is provided. 

327 lib_name: 

328 **Deprecated.** Use driver_info instead. Library name for CLIENT SETINFO. 

329 lib_version: 

330 **Deprecated.** Use driver_info instead. Library version for CLIENT SETINFO. 

331 maint_notifications_config: 

332 configures the pool to support maintenance notifications - see 

333 `redis.maint_notifications.MaintNotificationsConfig` for details. 

334 Only supported with RESP3 

335 If not provided and protocol is RESP3, the maintenance notifications 

336 will be enabled by default (logic is included in the connection pool 

337 initialization). 

338 Argument is ignored when connection_pool is provided. 

339 oss_cluster_maint_notifications_handler: 

340 handler for OSS cluster notifications - see 

341 `redis.maint_notifications.OSSMaintNotificationsHandler` for details. 

342 Only supported with RESP3 

343 Argument is ignored when connection_pool is provided. 

344 """ 

345 if event_dispatcher is None: 

346 self._event_dispatcher = EventDispatcher() 

347 else: 

348 self._event_dispatcher = event_dispatcher 

349 if not connection_pool: 

350 if not retry_on_error: 

351 retry_on_error = [] 

352 

353 # Handle driver_info: if provided, use it; otherwise create from lib_name/lib_version 

354 computed_driver_info = resolve_driver_info( 

355 driver_info, lib_name, lib_version 

356 ) 

357 

358 kwargs = { 

359 "db": db, 

360 "username": username, 

361 "password": password, 

362 "socket_timeout": socket_timeout, 

363 "encoding": encoding, 

364 "encoding_errors": encoding_errors, 

365 "decode_responses": decode_responses, 

366 "retry_on_error": retry_on_error, 

367 "retry": copy.deepcopy(retry), 

368 "max_connections": max_connections, 

369 "health_check_interval": health_check_interval, 

370 "client_name": client_name, 

371 "driver_info": computed_driver_info, 

372 "redis_connect_func": redis_connect_func, 

373 "credential_provider": credential_provider, 

374 "protocol": protocol, 

375 } 

376 # based on input, setup appropriate connection args 

377 if unix_socket_path is not None: 

378 kwargs.update( 

379 { 

380 "path": unix_socket_path, 

381 "connection_class": UnixDomainSocketConnection, 

382 } 

383 ) 

384 else: 

385 # TCP specific options 

386 kwargs.update( 

387 { 

388 "host": host, 

389 "port": port, 

390 "socket_connect_timeout": socket_connect_timeout, 

391 "socket_keepalive": socket_keepalive, 

392 "socket_keepalive_options": socket_keepalive_options, 

393 } 

394 ) 

395 

396 if ssl: 

397 kwargs.update( 

398 { 

399 "connection_class": SSLConnection, 

400 "ssl_keyfile": ssl_keyfile, 

401 "ssl_certfile": ssl_certfile, 

402 "ssl_cert_reqs": ssl_cert_reqs, 

403 "ssl_include_verify_flags": ssl_include_verify_flags, 

404 "ssl_exclude_verify_flags": ssl_exclude_verify_flags, 

405 "ssl_ca_certs": ssl_ca_certs, 

406 "ssl_ca_data": ssl_ca_data, 

407 "ssl_check_hostname": ssl_check_hostname, 

408 "ssl_password": ssl_password, 

409 "ssl_ca_path": ssl_ca_path, 

410 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled, 

411 "ssl_validate_ocsp": ssl_validate_ocsp, 

412 "ssl_ocsp_context": ssl_ocsp_context, 

413 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert, 

414 "ssl_min_version": ssl_min_version, 

415 "ssl_ciphers": ssl_ciphers, 

416 } 

417 ) 

418 if (cache_config or cache) and check_protocol_version(protocol, 3): 

419 kwargs.update( 

420 { 

421 "cache": cache, 

422 "cache_config": cache_config, 

423 } 

424 ) 

425 maint_notifications_enabled = ( 

426 maint_notifications_config and maint_notifications_config.enabled 

427 ) 

428 if maint_notifications_enabled and protocol not in [ 

429 3, 

430 "3", 

431 ]: 

432 raise RedisError( 

433 "Maintenance notifications handlers on connection are only supported with RESP version 3" 

434 ) 

435 if maint_notifications_config: 

436 kwargs.update( 

437 { 

438 "maint_notifications_config": maint_notifications_config, 

439 } 

440 ) 

441 if oss_cluster_maint_notifications_handler: 

442 kwargs.update( 

443 { 

444 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler, 

445 } 

446 ) 

447 connection_pool = ConnectionPool(**kwargs) 

448 self._event_dispatcher.dispatch( 

449 AfterPooledConnectionsInstantiationEvent( 

450 [connection_pool], ClientType.SYNC, credential_provider 

451 ) 

452 ) 

453 self.auto_close_connection_pool = True 

454 else: 

455 self.auto_close_connection_pool = False 

456 self._event_dispatcher.dispatch( 

457 AfterPooledConnectionsInstantiationEvent( 

458 [connection_pool], ClientType.SYNC, credential_provider 

459 ) 

460 ) 

461 

462 self.connection_pool = connection_pool 

463 

464 if (cache_config or cache) and self.connection_pool.get_protocol() not in [ 

465 3, 

466 "3", 

467 ]: 

468 raise RedisError("Client caching is only supported with RESP version 3") 

469 

470 self.single_connection_lock = threading.RLock() 

471 self.connection = None 

472 self._single_connection_client = single_connection_client 

473 if self._single_connection_client: 

474 self.connection = self.connection_pool.get_connection() 

475 self._event_dispatcher.dispatch( 

476 AfterSingleConnectionInstantiationEvent( 

477 self.connection, ClientType.SYNC, self.single_connection_lock 

478 ) 

479 ) 

480 

481 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks) 

482 

483 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]: 

484 self.response_callbacks.update(_RedisCallbacksRESP3) 

485 else: 

486 self.response_callbacks.update(_RedisCallbacksRESP2) 

487 

488 def __repr__(self) -> str: 

489 return ( 

490 f"<{type(self).__module__}.{type(self).__name__}" 

491 f"({repr(self.connection_pool)})>" 

492 ) 

493 

494 def get_encoder(self) -> "Encoder": 

495 """Get the connection pool's encoder""" 

496 return self.connection_pool.get_encoder() 

497 

498 def get_connection_kwargs(self) -> Dict: 

499 """Get the connection's key-word arguments""" 

500 return self.connection_pool.connection_kwargs 

501 

502 def get_retry(self) -> Optional[Retry]: 

503 return self.get_connection_kwargs().get("retry") 

504 

505 def set_retry(self, retry: Retry) -> None: 

506 self.get_connection_kwargs().update({"retry": retry}) 

507 self.connection_pool.set_retry(retry) 

508 

509 def set_response_callback(self, command: str, callback: Callable) -> None: 

510 """Set a custom Response Callback""" 

511 self.response_callbacks[command] = callback 

512 

513 def load_external_module(self, funcname, func) -> None: 

514 """ 

515 This function can be used to add externally defined redis modules, 

516 and their namespaces to the redis client. 

517 

518 funcname - A string containing the name of the function to create 

519 func - The function, being added to this class. 

520 

521 ex: Assume that one has a custom redis module named foomod that 

522 creates command named 'foo.dothing' and 'foo.anotherthing' in redis. 

523 To load function functions into this namespace: 

524 

525 from redis import Redis 

526 from foomodule import F 

527 r = Redis() 

528 r.load_external_module("foo", F) 

529 r.foo().dothing('your', 'arguments') 

530 

531 For a concrete example see the reimport of the redisjson module in 

532 tests/test_connection.py::test_loading_external_modules 

533 """ 

534 setattr(self, funcname, func) 

535 

536 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline": 

537 """ 

538 Return a new pipeline object that can queue multiple commands for 

539 later execution. ``transaction`` indicates whether all commands 

540 should be executed atomically. Apart from making a group of operations 

541 atomic, pipelines are useful for reducing the back-and-forth overhead 

542 between the client and server. 

543 """ 

544 return Pipeline( 

545 self.connection_pool, self.response_callbacks, transaction, shard_hint 

546 ) 

547 

548 def transaction( 

549 self, func: Callable[["Pipeline"], None], *watches, **kwargs 

550 ) -> Union[List[Any], Any, None]: 

551 """ 

552 Convenience method for executing the callable `func` as a transaction 

553 while watching all keys specified in `watches`. The 'func' callable 

554 should expect a single argument which is a Pipeline object. 

555 """ 

556 shard_hint = kwargs.pop("shard_hint", None) 

557 value_from_callable = kwargs.pop("value_from_callable", False) 

558 watch_delay = kwargs.pop("watch_delay", None) 

559 with self.pipeline(True, shard_hint) as pipe: 

560 while True: 

561 try: 

562 if watches: 

563 pipe.watch(*watches) 

564 func_value = func(pipe) 

565 exec_value = pipe.execute() 

566 return func_value if value_from_callable else exec_value 

567 except WatchError: 

568 if watch_delay is not None and watch_delay > 0: 

569 time.sleep(watch_delay) 

570 continue 

571 

572 def lock( 

573 self, 

574 name: str, 

575 timeout: Optional[float] = None, 

576 sleep: float = 0.1, 

577 blocking: bool = True, 

578 blocking_timeout: Optional[float] = None, 

579 lock_class: Union[None, Any] = None, 

580 thread_local: bool = True, 

581 raise_on_release_error: bool = True, 

582 ): 

583 """ 

584 Return a new Lock object using key ``name`` that mimics 

585 the behavior of threading.Lock. 

586 

587 If specified, ``timeout`` indicates a maximum life for the lock. 

588 By default, it will remain locked until release() is called. 

589 

590 ``sleep`` indicates the amount of time to sleep per loop iteration 

591 when the lock is in blocking mode and another client is currently 

592 holding the lock. 

593 

594 ``blocking`` indicates whether calling ``acquire`` should block until 

595 the lock has been acquired or to fail immediately, causing ``acquire`` 

596 to return False and the lock not being acquired. Defaults to True. 

597 Note this value can be overridden by passing a ``blocking`` 

598 argument to ``acquire``. 

599 

600 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

601 spend trying to acquire the lock. A value of ``None`` indicates 

602 continue trying forever. ``blocking_timeout`` can be specified as a 

603 float or integer, both representing the number of seconds to wait. 

604 

605 ``lock_class`` forces the specified lock implementation. Note that as 

606 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

607 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

608 you have created your own custom lock class. 

609 

610 ``thread_local`` indicates whether the lock token is placed in 

611 thread-local storage. By default, the token is placed in thread local 

612 storage so that a thread only sees its token, not a token set by 

613 another thread. Consider the following timeline: 

614 

615 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

616 thread-1 sets the token to "abc" 

617 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

618 Lock instance. 

619 time: 5, thread-1 has not yet completed. redis expires the lock 

620 key. 

621 time: 5, thread-2 acquired `my-lock` now that it's available. 

622 thread-2 sets the token to "xyz" 

623 time: 6, thread-1 finishes its work and calls release(). if the 

624 token is *not* stored in thread local storage, then 

625 thread-1 would see the token value as "xyz" and would be 

626 able to successfully release the thread-2's lock. 

627 

628 ``raise_on_release_error`` indicates whether to raise an exception when 

629 the lock is no longer owned when exiting the context manager. By default, 

630 this is True, meaning an exception will be raised. If False, the warning 

631 will be logged and the exception will be suppressed. 

632 

633 In some use cases it's necessary to disable thread local storage. For 

634 example, if you have code where one thread acquires a lock and passes 

635 that lock instance to a worker thread to release later. If thread 

636 local storage isn't disabled in this case, the worker thread won't see 

637 the token set by the thread that acquired the lock. Our assumption 

638 is that these cases aren't common and as such default to using 

639 thread local storage.""" 

640 if lock_class is None: 

641 lock_class = Lock 

642 return lock_class( 

643 self, 

644 name, 

645 timeout=timeout, 

646 sleep=sleep, 

647 blocking=blocking, 

648 blocking_timeout=blocking_timeout, 

649 thread_local=thread_local, 

650 raise_on_release_error=raise_on_release_error, 

651 ) 

652 

653 def pubsub(self, **kwargs): 

654 """ 

655 Return a Publish/Subscribe object. With this object, you can 

656 subscribe to channels and listen for messages that get published to 

657 them. 

658 """ 

659 return PubSub( 

660 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs 

661 ) 

662 

663 def keyspace_notifications( 

664 self, 

665 key_prefix: Union[str, bytes, None] = None, 

666 ignore_subscribe_messages: bool = True, 

667 ) -> "KeyspaceNotifications": 

668 """ 

669 Return a :class:`~redis.keyspace_notifications.KeyspaceNotifications` 

670 object for subscribing to keyspace and keyevent notifications. 

671 

672 Note: Keyspace notifications must be enabled on the Redis server via 

673 the ``notify-keyspace-events`` configuration option. 

674 

675 Args: 

676 key_prefix: Optional prefix to filter and strip from keys in 

677 notifications. 

678 ignore_subscribe_messages: If True, subscribe/unsubscribe 

679 confirmations are not returned by 

680 get_message/listen. 

681 """ 

682 from redis.keyspace_notifications import KeyspaceNotifications 

683 

684 return KeyspaceNotifications( 

685 self, 

686 key_prefix=key_prefix, 

687 ignore_subscribe_messages=ignore_subscribe_messages, 

688 ) 

689 

690 def monitor(self): 

691 return Monitor(self.connection_pool) 

692 

693 def client(self): 

694 return self.__class__( 

695 connection_pool=self.connection_pool, 

696 single_connection_client=True, 

697 ) 

698 

699 def __enter__(self): 

700 return self 

701 

702 def __exit__(self, exc_type, exc_value, traceback): 

703 self.close() 

704 

705 def __del__(self): 

706 try: 

707 self.close() 

708 except Exception: 

709 pass 

710 

711 def close(self) -> None: 

712 # In case a connection property does not yet exist 

713 # (due to a crash earlier in the Redis() constructor), return 

714 # immediately as there is nothing to clean-up. 

715 if not hasattr(self, "connection"): 

716 return 

717 

718 conn = self.connection 

719 if conn: 

720 self.connection = None 

721 self.connection_pool.release(conn) 

722 

723 if self.auto_close_connection_pool: 

724 self.connection_pool.disconnect() 

725 

726 def _send_command_parse_response(self, conn, command_name, *args, **options): 

727 """ 

728 Send a command and parse the response 

729 """ 

730 conn.send_command(*args, **options) 

731 return self.parse_response(conn, command_name, **options) 

732 

733 def _close_connection( 

734 self, 

735 conn, 

736 error: Optional[Exception] = None, 

737 failure_count: Optional[int] = None, 

738 start_time: Optional[float] = None, 

739 command_name: Optional[str] = None, 

740 ) -> None: 

741 """ 

742 Close the connection before retrying. 

743 

744 The supported exceptions are already checked in the 

745 retry object so we don't need to do it here. 

746 

747 After we disconnect the connection, it will try to reconnect and 

748 do a health check as part of the send_command logic(on connection level). 

749 """ 

750 if error and failure_count <= conn.retry.get_retries(): 

751 record_operation_duration( 

752 command_name=command_name, 

753 duration_seconds=time.monotonic() - start_time, 

754 server_address=getattr(conn, "host", None), 

755 server_port=getattr(conn, "port", None), 

756 db_namespace=str(conn.db), 

757 error=error, 

758 retry_attempts=failure_count, 

759 ) 

760 

761 conn.disconnect() 

762 

763 # COMMAND EXECUTION AND PROTOCOL PARSING 

764 def execute_command(self, *args, **options): 

765 return self._execute_command(*args, **options) 

766 

767 def _execute_command(self, *args, **options): 

768 """Execute a command and return a parsed response""" 

769 pool = self.connection_pool 

770 command_name = args[0] 

771 conn = self.connection or pool.get_connection() 

772 

773 # Start timing for observability 

774 start_time = time.monotonic() 

775 # Track actual retry attempts for error reporting 

776 actual_retry_attempts = [0] 

777 

778 def failure_callback(error, failure_count): 

779 if is_debug_log_enabled(): 

780 add_debug_log_for_operation_failure(conn) 

781 actual_retry_attempts[0] = failure_count 

782 self._close_connection(conn, error, failure_count, start_time, command_name) 

783 

784 if self._single_connection_client: 

785 self.single_connection_lock.acquire() 

786 try: 

787 result = conn.retry.call_with_retry( 

788 lambda: self._send_command_parse_response( 

789 conn, command_name, *args, **options 

790 ), 

791 failure_callback, 

792 with_failure_count=True, 

793 ) 

794 

795 record_operation_duration( 

796 command_name=command_name, 

797 duration_seconds=time.monotonic() - start_time, 

798 server_address=getattr(conn, "host", None), 

799 server_port=getattr(conn, "port", None), 

800 db_namespace=str(conn.db), 

801 ) 

802 return result 

803 except Exception as e: 

804 record_error_count( 

805 server_address=getattr(conn, "host", None), 

806 server_port=getattr(conn, "port", None), 

807 network_peer_address=getattr(conn, "host", None), 

808 network_peer_port=getattr(conn, "port", None), 

809 error_type=e, 

810 retry_attempts=actual_retry_attempts[0], 

811 is_internal=False, 

812 ) 

813 raise 

814 

815 finally: 

816 if conn and conn.should_reconnect(): 

817 self._close_connection(conn) 

818 conn.connect() 

819 if self._single_connection_client: 

820 self.single_connection_lock.release() 

821 if not self.connection: 

822 pool.release(conn) 

823 

824 def parse_response(self, connection, command_name, **options): 

825 """Parses a response from the Redis server""" 

826 try: 

827 if NEVER_DECODE in options: 

828 response = connection.read_response(disable_decoding=True) 

829 options.pop(NEVER_DECODE) 

830 else: 

831 response = connection.read_response() 

832 except ResponseError: 

833 if EMPTY_RESPONSE in options: 

834 return options[EMPTY_RESPONSE] 

835 raise 

836 

837 if EMPTY_RESPONSE in options: 

838 options.pop(EMPTY_RESPONSE) 

839 

840 # Remove keys entry, it needs only for cache. 

841 options.pop("keys", None) 

842 

843 if command_name in self.response_callbacks: 

844 return self.response_callbacks[command_name](response, **options) 

845 return response 

846 

847 def get_cache(self) -> Optional[CacheInterface]: 

848 return self.connection_pool.cache 

849 

850 

851StrictRedis = Redis 

852 

853 

854class Monitor: 

855 """ 

856 Monitor is useful for handling the MONITOR command to the redis server. 

857 next_command() method returns one command from monitor 

858 listen() method yields commands from monitor. 

859 """ 

860 

861 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)") 

862 command_re = re.compile(r'"(.*?)(?<!\\)"') 

863 

864 def __init__(self, connection_pool): 

865 self.connection_pool = connection_pool 

866 self.connection = self.connection_pool.get_connection() 

867 

868 def __enter__(self): 

869 self._start_monitor() 

870 return self 

871 

872 def __exit__(self, *args): 

873 self.connection.disconnect() 

874 self.connection_pool.release(self.connection) 

875 

876 def next_command(self): 

877 """Parse the response from a monitor command""" 

878 response = self.connection.read_response() 

879 

880 if response is None: 

881 return None 

882 

883 if isinstance(response, bytes): 

884 response = self.connection.encoder.decode(response, force=True) 

885 

886 command_time, command_data = response.split(" ", 1) 

887 m = self.monitor_re.match(command_data) 

888 db_id, client_info, command = m.groups() 

889 command = " ".join(self.command_re.findall(command)) 

890 # Redis escapes double quotes because each piece of the command 

891 # string is surrounded by double quotes. We don't have that 

892 # requirement so remove the escaping and leave the quote. 

893 command = command.replace('\\"', '"') 

894 

895 if client_info == "lua": 

896 client_address = "lua" 

897 client_port = "" 

898 client_type = "lua" 

899 elif client_info.startswith("unix"): 

900 client_address = "unix" 

901 client_port = client_info[5:] 

902 client_type = "unix" 

903 else: 

904 if client_info == "": 

905 client_address = "" 

906 client_port = "" 

907 client_type = "unknown" 

908 else: 

909 # use rsplit as ipv6 addresses contain colons 

910 client_address, client_port = client_info.rsplit(":", 1) 

911 client_type = "tcp" 

912 return { 

913 "time": float(command_time), 

914 "db": int(db_id), 

915 "client_address": client_address, 

916 "client_port": client_port, 

917 "client_type": client_type, 

918 "command": command, 

919 } 

920 

921 def listen(self): 

922 """Listen for commands coming to the server.""" 

923 while True: 

924 yield self.next_command() 

925 

926 def _start_monitor(self): 

927 self.connection.send_command("MONITOR") 

928 # check that monitor returns 'OK', but don't return it to user 

929 response = self.connection.read_response() 

930 

931 if not bool_ok(response): 

932 raise RedisError(f"MONITOR failed: {response}") 

933 

934 

935class PubSub: 

936 """ 

937 PubSub provides publish, subscribe and listen support to Redis channels. 

938 

939 After subscribing to one or more channels, the listen() method will block 

940 until a message arrives on one of the subscribed channels. That message 

941 will be returned and it's safe to start listening again. 

942 """ 

943 

944 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage") 

945 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe") 

946 HEALTH_CHECK_MESSAGE = "redis-py-health-check" 

947 

948 def __init__( 

949 self, 

950 connection_pool, 

951 shard_hint=None, 

952 ignore_subscribe_messages: bool = False, 

953 encoder: Optional["Encoder"] = None, 

954 push_handler_func: Union[None, Callable[[str], None]] = None, 

955 event_dispatcher: Optional["EventDispatcher"] = None, 

956 ): 

957 self.connection_pool = connection_pool 

958 self.shard_hint = shard_hint 

959 self.ignore_subscribe_messages = ignore_subscribe_messages 

960 self.connection = None 

961 self.subscribed_event = threading.Event() 

962 # we need to know the encoding options for this connection in order 

963 # to lookup channel and pattern names for callback handlers. 

964 self.encoder = encoder 

965 self.push_handler_func = push_handler_func 

966 if event_dispatcher is None: 

967 self._event_dispatcher = EventDispatcher() 

968 else: 

969 self._event_dispatcher = event_dispatcher 

970 

971 self._lock = threading.RLock() 

972 if self.encoder is None: 

973 self.encoder = self.connection_pool.get_encoder() 

974 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) 

975 if self.encoder.decode_responses: 

976 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] 

977 else: 

978 self.health_check_response = [b"pong", self.health_check_response_b] 

979 if self.push_handler_func is None: 

980 _set_info_logger() 

981 self.reset() 

982 

983 def __enter__(self) -> "PubSub": 

984 return self 

985 

986 def __exit__(self, exc_type, exc_value, traceback) -> None: 

987 self.reset() 

988 

989 def __del__(self) -> None: 

990 try: 

991 # if this object went out of scope prior to shutting down 

992 # subscriptions, close the connection manually before 

993 # returning it to the connection pool 

994 self.reset() 

995 except Exception: 

996 pass 

997 

998 def reset(self) -> None: 

999 if self.connection: 

1000 self.connection.disconnect() 

1001 self.connection.deregister_connect_callback(self.on_connect) 

1002 self.connection_pool.release(self.connection) 

1003 self.connection = None 

1004 self.health_check_response_counter = 0 

1005 self.channels = {} 

1006 self.pending_unsubscribe_channels = set() 

1007 self.shard_channels = {} 

1008 self.pending_unsubscribe_shard_channels = set() 

1009 self.patterns = {} 

1010 self.pending_unsubscribe_patterns = set() 

1011 self.subscribed_event.clear() 

1012 

1013 def close(self) -> None: 

1014 self.reset() 

1015 

1016 def on_connect(self, connection) -> None: 

1017 "Re-subscribe to any channels and patterns previously subscribed to" 

1018 # NOTE: for python3, we can't pass bytestrings as keyword arguments 

1019 # so we need to decode channel/pattern names back to unicode strings 

1020 # before passing them to [p]subscribe. 

1021 # 

1022 # However, channels subscribed without a callback (positional args) may 

1023 # have binary names that are not valid in the current encoding (e.g. 

1024 # arbitrary bytes that are not valid UTF-8). These channels are stored 

1025 # with a ``None`` handler. We re-subscribe them as positional args so 

1026 # that no decoding is required. 

1027 self.pending_unsubscribe_channels.clear() 

1028 self.pending_unsubscribe_patterns.clear() 

1029 self.pending_unsubscribe_shard_channels.clear() 

1030 if self.channels: 

1031 channels_with_handlers = {} 

1032 channels_without_handlers = [] 

1033 for k, v in self.channels.items(): 

1034 if v is not None: 

1035 channels_with_handlers[self.encoder.decode(k, force=True)] = v 

1036 else: 

1037 channels_without_handlers.append(k) 

1038 if channels_with_handlers or channels_without_handlers: 

1039 self.subscribe(*channels_without_handlers, **channels_with_handlers) 

1040 if self.patterns: 

1041 patterns_with_handlers = {} 

1042 patterns_without_handlers = [] 

1043 for k, v in self.patterns.items(): 

1044 if v is not None: 

1045 patterns_with_handlers[self.encoder.decode(k, force=True)] = v 

1046 else: 

1047 patterns_without_handlers.append(k) 

1048 if patterns_with_handlers or patterns_without_handlers: 

1049 self.psubscribe(*patterns_without_handlers, **patterns_with_handlers) 

1050 if self.shard_channels: 

1051 shard_with_handlers = {} 

1052 shard_without_handlers = [] 

1053 for k, v in self.shard_channels.items(): 

1054 if v is not None: 

1055 shard_with_handlers[self.encoder.decode(k, force=True)] = v 

1056 else: 

1057 shard_without_handlers.append(k) 

1058 if shard_with_handlers or shard_without_handlers: 

1059 self.ssubscribe(*shard_without_handlers, **shard_with_handlers) 

1060 

1061 @property 

1062 def subscribed(self) -> bool: 

1063 """Indicates if there are subscriptions to any channels or patterns""" 

1064 return self.subscribed_event.is_set() 

1065 

1066 def execute_command(self, *args): 

1067 """Execute a publish/subscribe command""" 

1068 

1069 # NOTE: don't parse the response in this function -- it could pull a 

1070 # legitimate message off the stack if the connection is already 

1071 # subscribed to one or more channels 

1072 

1073 if self.connection is None: 

1074 self.connection = self.connection_pool.get_connection() 

1075 # register a callback that re-subscribes to any channels we 

1076 # were listening to when we were disconnected 

1077 self.connection.register_connect_callback(self.on_connect) 

1078 if self.push_handler_func is not None: 

1079 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

1080 self._event_dispatcher.dispatch( 

1081 AfterPubSubConnectionInstantiationEvent( 

1082 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

1083 ) 

1084 ) 

1085 connection = self.connection 

1086 kwargs = {"check_health": not self.subscribed} 

1087 if not self.subscribed: 

1088 self.clean_health_check_responses() 

1089 with self._lock: 

1090 self._execute(connection, connection.send_command, *args, **kwargs) 

1091 

1092 def clean_health_check_responses(self) -> None: 

1093 """ 

1094 If any health check responses are present, clean them 

1095 """ 

1096 ttl = 10 

1097 conn = self.connection 

1098 while conn and self.health_check_response_counter > 0 and ttl > 0: 

1099 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout): 

1100 response = self._execute(conn, conn.read_response) 

1101 if self.is_health_check_response(response): 

1102 self.health_check_response_counter -= 1 

1103 else: 

1104 raise PubSubError( 

1105 "A non health check response was cleaned by " 

1106 "execute_command: {}".format(response) 

1107 ) 

1108 ttl -= 1 

1109 

1110 def _reconnect( 

1111 self, 

1112 conn, 

1113 error: Optional[Exception] = None, 

1114 failure_count: Optional[int] = None, 

1115 start_time: Optional[float] = None, 

1116 command_name: Optional[str] = None, 

1117 ) -> None: 

1118 """ 

1119 The supported exceptions are already checked in the 

1120 retry object so we don't need to do it here. 

1121 

1122 In this error handler we are trying to reconnect to the server. 

1123 """ 

1124 if error and failure_count <= conn.retry.get_retries(): 

1125 if command_name: 

1126 record_operation_duration( 

1127 command_name=command_name, 

1128 duration_seconds=time.monotonic() - start_time, 

1129 server_address=getattr(conn, "host", None), 

1130 server_port=getattr(conn, "port", None), 

1131 db_namespace=str(conn.db), 

1132 error=error, 

1133 retry_attempts=failure_count, 

1134 ) 

1135 conn.disconnect() 

1136 conn.connect() 

1137 

1138 def _execute(self, conn, command, *args, **kwargs): 

1139 """ 

1140 Connect manually upon disconnection. If the Redis server is down, 

1141 this will fail and raise a ConnectionError as desired. 

1142 After reconnection, the ``on_connect`` callback should have been 

1143 called by the # connection to resubscribe us to any channels and 

1144 patterns we were previously listening to 

1145 """ 

1146 

1147 if conn.should_reconnect(): 

1148 self._reconnect(conn) 

1149 

1150 if not len(args) == 0: 

1151 command_name = args[0] 

1152 else: 

1153 command_name = None 

1154 

1155 # Start timing for observability 

1156 start_time = time.monotonic() 

1157 # Track actual retry attempts for error reporting 

1158 actual_retry_attempts = [0] 

1159 

1160 def failure_callback(error, failure_count): 

1161 actual_retry_attempts[0] = failure_count 

1162 self._reconnect(conn, error, failure_count, start_time, command_name) 

1163 

1164 try: 

1165 response = conn.retry.call_with_retry( 

1166 lambda: command(*args, **kwargs), 

1167 failure_callback, 

1168 with_failure_count=True, 

1169 ) 

1170 

1171 if command_name: 

1172 record_operation_duration( 

1173 command_name=command_name, 

1174 duration_seconds=time.monotonic() - start_time, 

1175 server_address=getattr(conn, "host", None), 

1176 server_port=getattr(conn, "port", None), 

1177 db_namespace=str(conn.db), 

1178 ) 

1179 

1180 return response 

1181 except Exception as e: 

1182 record_error_count( 

1183 server_address=getattr(conn, "host", None), 

1184 server_port=getattr(conn, "port", None), 

1185 network_peer_address=getattr(conn, "host", None), 

1186 network_peer_port=getattr(conn, "port", None), 

1187 error_type=e, 

1188 retry_attempts=actual_retry_attempts[0], 

1189 is_internal=False, 

1190 ) 

1191 raise 

1192 

1193 def parse_response(self, block=True, timeout=0): 

1194 """ 

1195 Parse the response from a publish/subscribe command. 

1196 

1197 Args: 

1198 block: If True, block indefinitely until a message is available. 

1199 If False, return immediately if no message is available. 

1200 Default: True 

1201 timeout: The timeout in seconds for reading a response when block=False. 

1202 This parameter is ignored when block=True. 

1203 Default: 0 (return immediately if no data available) 

1204 

1205 Returns: 

1206 The parsed response from the server, or None if no message is available 

1207 within the timeout period (when block=False). 

1208 

1209 Important: 

1210 The block and timeout parameters work together: 

1211 - When block=True: timeout is IGNORED, method blocks indefinitely 

1212 - When block=False: timeout is USED, method returns after timeout expires 

1213 

1214 Typically, you should use get_message(timeout=X) instead of calling 

1215 parse_response() directly. The get_message() method automatically sets 

1216 block=False when a timeout is provided, and block=True when timeout=None. 

1217 

1218 Example: 

1219 # Block indefinitely (timeout is ignored) 

1220 response = pubsub.parse_response(block=True, timeout=0.1) 

1221 

1222 # Non-blocking with 0.1 second timeout 

1223 response = pubsub.parse_response(block=False, timeout=0.1) 

1224 

1225 # Non-blocking, return immediately 

1226 response = pubsub.parse_response(block=False, timeout=0) 

1227 

1228 # Recommended: use get_message() instead 

1229 msg = pubsub.get_message(timeout=0.1) # automatically sets block=False 

1230 msg = pubsub.get_message(timeout=None) # automatically sets block=True 

1231 """ 

1232 conn = self.connection 

1233 if conn is None: 

1234 raise RuntimeError( 

1235 "pubsub connection not set: " 

1236 "did you forget to call subscribe() or psubscribe()?" 

1237 ) 

1238 

1239 self.check_health() 

1240 

1241 def try_read(): 

1242 if not block: 

1243 if not conn.can_read(timeout=timeout): 

1244 return None 

1245 read_timeout = timeout 

1246 else: 

1247 conn.connect() 

1248 read_timeout = SENTINEL # Use default socket timeout for blocking 

1249 return conn.read_response( 

1250 disconnect_on_error=False, push_request=True, timeout=read_timeout 

1251 ) 

1252 

1253 response = self._execute(conn, try_read) 

1254 

1255 if self.is_health_check_response(response): 

1256 # ignore the health check message as user might not expect it 

1257 self.health_check_response_counter -= 1 

1258 return None 

1259 return response 

1260 

1261 def is_health_check_response(self, response) -> bool: 

1262 """ 

1263 Check if the response is a health check response. 

1264 If there are no subscriptions redis responds to PING command with a 

1265 bulk response, instead of a multi-bulk with "pong" and the response. 

1266 """ 

1267 if self.encoder.decode_responses: 

1268 return ( 

1269 response 

1270 in [ 

1271 self.health_check_response, # If there is a subscription 

1272 self.HEALTH_CHECK_MESSAGE, # If there are no subscriptions and decode_responses=True 

1273 ] 

1274 ) 

1275 else: 

1276 return ( 

1277 response 

1278 in [ 

1279 self.health_check_response, # If there is a subscription 

1280 self.health_check_response_b, # If there isn't a subscription and decode_responses=False 

1281 ] 

1282 ) 

1283 

1284 def check_health(self) -> None: 

1285 conn = self.connection 

1286 if conn is None: 

1287 raise RuntimeError( 

1288 "pubsub connection not set: " 

1289 "did you forget to call subscribe() or psubscribe()?" 

1290 ) 

1291 

1292 if conn.health_check_interval and time.monotonic() > conn.next_health_check: 

1293 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False) 

1294 self.health_check_response_counter += 1 

1295 

1296 def _normalize_keys(self, data) -> Dict: 

1297 """ 

1298 normalize channel/pattern names to be either bytes or strings 

1299 based on whether responses are automatically decoded. this saves us 

1300 from coercing the value for each message coming in. 

1301 """ 

1302 encode = self.encoder.encode 

1303 decode = self.encoder.decode 

1304 return {decode(encode(k)): v for k, v in data.items()} 

1305 

1306 def psubscribe(self, *args, **kwargs): 

1307 """ 

1308 Subscribe to channel patterns. Patterns supplied as keyword arguments 

1309 expect a pattern name as the key and a callable as the value. A 

1310 pattern's callable will be invoked automatically when a message is 

1311 received on that pattern rather than producing a message via 

1312 ``listen()``. 

1313 """ 

1314 if args: 

1315 args = list_or_args(args[0], args[1:]) 

1316 new_patterns = dict.fromkeys(args) 

1317 new_patterns.update(kwargs) 

1318 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys()) 

1319 # update the patterns dict AFTER we send the command. we don't want to 

1320 # subscribe twice to these patterns, once for the command and again 

1321 # for the reconnection. 

1322 new_patterns = self._normalize_keys(new_patterns) 

1323 self.patterns.update(new_patterns) 

1324 if not self.subscribed: 

1325 # Set the subscribed_event flag to True 

1326 self.subscribed_event.set() 

1327 # Clear the health check counter 

1328 self.health_check_response_counter = 0 

1329 self.pending_unsubscribe_patterns.difference_update(new_patterns) 

1330 return ret_val 

1331 

1332 def punsubscribe(self, *args): 

1333 """ 

1334 Unsubscribe from the supplied patterns. If empty, unsubscribe from 

1335 all patterns. 

1336 """ 

1337 if args: 

1338 args = list_or_args(args[0], args[1:]) 

1339 patterns = self._normalize_keys(dict.fromkeys(args)) 

1340 else: 

1341 patterns = self.patterns 

1342 self.pending_unsubscribe_patterns.update(patterns) 

1343 return self.execute_command("PUNSUBSCRIBE", *args) 

1344 

1345 def subscribe(self, *args, **kwargs): 

1346 """ 

1347 Subscribe to channels. Channels supplied as keyword arguments expect 

1348 a channel name as the key and a callable as the value. A channel's 

1349 callable will be invoked automatically when a message is received on 

1350 that channel rather than producing a message via ``listen()`` or 

1351 ``get_message()``. 

1352 """ 

1353 if args: 

1354 args = list_or_args(args[0], args[1:]) 

1355 new_channels = dict.fromkeys(args) 

1356 new_channels.update(kwargs) 

1357 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys()) 

1358 # update the channels dict AFTER we send the command. we don't want to 

1359 # subscribe twice to these channels, once for the command and again 

1360 # for the reconnection. 

1361 new_channels = self._normalize_keys(new_channels) 

1362 self.channels.update(new_channels) 

1363 if not self.subscribed: 

1364 # Set the subscribed_event flag to True 

1365 self.subscribed_event.set() 

1366 # Clear the health check counter 

1367 self.health_check_response_counter = 0 

1368 self.pending_unsubscribe_channels.difference_update(new_channels) 

1369 return ret_val 

1370 

1371 def unsubscribe(self, *args): 

1372 """ 

1373 Unsubscribe from the supplied channels. If empty, unsubscribe from 

1374 all channels 

1375 """ 

1376 if args: 

1377 args = list_or_args(args[0], args[1:]) 

1378 channels = self._normalize_keys(dict.fromkeys(args)) 

1379 else: 

1380 channels = self.channels 

1381 self.pending_unsubscribe_channels.update(channels) 

1382 return self.execute_command("UNSUBSCRIBE", *args) 

1383 

1384 def ssubscribe(self, *args, target_node=None, **kwargs): 

1385 """ 

1386 Subscribes the client to the specified shard channels. 

1387 Channels supplied as keyword arguments expect a channel name as the key 

1388 and a callable as the value. A channel's callable will be invoked automatically 

1389 when a message is received on that channel rather than producing a message via 

1390 ``listen()`` or ``get_sharded_message()``. 

1391 """ 

1392 if args: 

1393 args = list_or_args(args[0], args[1:]) 

1394 new_s_channels = dict.fromkeys(args) 

1395 new_s_channels.update(kwargs) 

1396 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys()) 

1397 # update the s_channels dict AFTER we send the command. we don't want to 

1398 # subscribe twice to these channels, once for the command and again 

1399 # for the reconnection. 

1400 new_s_channels = self._normalize_keys(new_s_channels) 

1401 self.shard_channels.update(new_s_channels) 

1402 if not self.subscribed: 

1403 # Set the subscribed_event flag to True 

1404 self.subscribed_event.set() 

1405 # Clear the health check counter 

1406 self.health_check_response_counter = 0 

1407 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels) 

1408 return ret_val 

1409 

1410 def sunsubscribe(self, *args, target_node=None): 

1411 """ 

1412 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from 

1413 all shard_channels 

1414 """ 

1415 if args: 

1416 args = list_or_args(args[0], args[1:]) 

1417 s_channels = self._normalize_keys(dict.fromkeys(args)) 

1418 else: 

1419 s_channels = self.shard_channels 

1420 self.pending_unsubscribe_shard_channels.update(s_channels) 

1421 return self.execute_command("SUNSUBSCRIBE", *args) 

1422 

1423 def listen(self): 

1424 "Listen for messages on channels this client has been subscribed to" 

1425 while self.subscribed: 

1426 response = self.handle_message(self.parse_response(block=True)) 

1427 if response is not None: 

1428 yield response 

1429 

1430 def get_message( 

1431 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0 

1432 ): 

1433 """ 

1434 Get the next message if one is available, otherwise None. 

1435 

1436 If timeout is specified, the system will wait for `timeout` seconds 

1437 before returning. Timeout should be specified as a floating point 

1438 number, or None, to wait indefinitely. 

1439 """ 

1440 if not self.subscribed: 

1441 # Wait for subscription 

1442 start_time = time.monotonic() 

1443 if self.subscribed_event.wait(timeout) is True: 

1444 # The connection was subscribed during the timeout time frame. 

1445 # The timeout should be adjusted based on the time spent 

1446 # waiting for the subscription 

1447 time_spent = time.monotonic() - start_time 

1448 timeout = max(0.0, timeout - time_spent) 

1449 else: 

1450 # The connection isn't subscribed to any channels or patterns, 

1451 # so no messages are available 

1452 return None 

1453 

1454 response = self.parse_response(block=(timeout is None), timeout=timeout) 

1455 

1456 if response: 

1457 return self.handle_message(response, ignore_subscribe_messages) 

1458 return None 

1459 

1460 get_sharded_message = get_message 

1461 

1462 def ping(self, message: Union[str, None] = None) -> bool: 

1463 """ 

1464 Ping the Redis server to test connectivity. 

1465 

1466 Sends a PING command to the Redis server and returns True if the server 

1467 responds with "PONG". 

1468 """ 

1469 args = ["PING", message] if message is not None else ["PING"] 

1470 return self.execute_command(*args) 

1471 

1472 def handle_message(self, response, ignore_subscribe_messages=False): 

1473 """ 

1474 Parses a pub/sub message. If the channel or pattern was subscribed to 

1475 with a message handler, the handler is invoked instead of a parsed 

1476 message being returned. 

1477 """ 

1478 if response is None: 

1479 return None 

1480 if isinstance(response, bytes): 

1481 response = [b"pong", response] if response != b"PONG" else [b"pong", b""] 

1482 

1483 message_type = str_if_bytes(response[0]) 

1484 if message_type == "pmessage": 

1485 message = { 

1486 "type": message_type, 

1487 "pattern": response[1], 

1488 "channel": response[2], 

1489 "data": response[3], 

1490 } 

1491 elif message_type == "pong": 

1492 message = { 

1493 "type": message_type, 

1494 "pattern": None, 

1495 "channel": None, 

1496 "data": response[1], 

1497 } 

1498 else: 

1499 message = { 

1500 "type": message_type, 

1501 "pattern": None, 

1502 "channel": response[1], 

1503 "data": response[2], 

1504 } 

1505 

1506 if message_type in ["message", "pmessage"]: 

1507 channel = str_if_bytes(message["channel"]) 

1508 record_pubsub_message( 

1509 direction=PubSubDirection.RECEIVE, 

1510 channel=channel, 

1511 ) 

1512 elif message_type == "smessage": 

1513 channel = str_if_bytes(message["channel"]) 

1514 record_pubsub_message( 

1515 direction=PubSubDirection.RECEIVE, 

1516 channel=channel, 

1517 sharded=True, 

1518 ) 

1519 

1520 # if this is an unsubscribe message, remove it from memory 

1521 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 

1522 if message_type == "punsubscribe": 

1523 pattern = response[1] 

1524 if pattern in self.pending_unsubscribe_patterns: 

1525 self.pending_unsubscribe_patterns.remove(pattern) 

1526 self.patterns.pop(pattern, None) 

1527 elif message_type == "sunsubscribe": 

1528 s_channel = response[1] 

1529 if s_channel in self.pending_unsubscribe_shard_channels: 

1530 self.pending_unsubscribe_shard_channels.remove(s_channel) 

1531 self.shard_channels.pop(s_channel, None) 

1532 else: 

1533 channel = response[1] 

1534 if channel in self.pending_unsubscribe_channels: 

1535 self.pending_unsubscribe_channels.remove(channel) 

1536 self.channels.pop(channel, None) 

1537 if not self.channels and not self.patterns and not self.shard_channels: 

1538 # There are no subscriptions anymore, set subscribed_event flag 

1539 # to false 

1540 self.subscribed_event.clear() 

1541 

1542 if message_type in self.PUBLISH_MESSAGE_TYPES: 

1543 # if there's a message handler, invoke it 

1544 if message_type == "pmessage": 

1545 handler = self.patterns.get(message["pattern"], None) 

1546 elif message_type == "smessage": 

1547 handler = self.shard_channels.get(message["channel"], None) 

1548 else: 

1549 handler = self.channels.get(message["channel"], None) 

1550 if handler: 

1551 handler(message) 

1552 return None 

1553 elif message_type != "pong": 

1554 # this is a subscribe/unsubscribe message. ignore if we don't 

1555 # want them 

1556 if ignore_subscribe_messages or self.ignore_subscribe_messages: 

1557 return None 

1558 

1559 return message 

1560 

1561 def run_in_thread( 

1562 self, 

1563 sleep_time: float = 0.0, 

1564 daemon: bool = False, 

1565 exception_handler: Optional[Callable] = None, 

1566 pubsub=None, 

1567 sharded_pubsub: bool = False, 

1568 ) -> "PubSubWorkerThread": 

1569 for channel, handler in self.channels.items(): 

1570 if handler is None: 

1571 raise PubSubError(f"Channel: '{channel}' has no handler registered") 

1572 for pattern, handler in self.patterns.items(): 

1573 if handler is None: 

1574 raise PubSubError(f"Pattern: '{pattern}' has no handler registered") 

1575 for s_channel, handler in self.shard_channels.items(): 

1576 if handler is None: 

1577 raise PubSubError( 

1578 f"Shard Channel: '{s_channel}' has no handler registered" 

1579 ) 

1580 

1581 pubsub = self if pubsub is None else pubsub 

1582 thread = PubSubWorkerThread( 

1583 pubsub, 

1584 sleep_time, 

1585 daemon=daemon, 

1586 exception_handler=exception_handler, 

1587 sharded_pubsub=sharded_pubsub, 

1588 ) 

1589 thread.start() 

1590 return thread 

1591 

1592 

1593class PubSubWorkerThread(threading.Thread): 

1594 def __init__( 

1595 self, 

1596 pubsub, 

1597 sleep_time: float, 

1598 daemon: bool = False, 

1599 exception_handler: Union[ 

1600 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None 

1601 ] = None, 

1602 sharded_pubsub: bool = False, 

1603 ): 

1604 super().__init__() 

1605 self.daemon = daemon 

1606 self.pubsub = pubsub 

1607 self.sleep_time = sleep_time 

1608 self.exception_handler = exception_handler 

1609 self.sharded_pubsub = sharded_pubsub 

1610 self._running = threading.Event() 

1611 

1612 def run(self) -> None: 

1613 if self._running.is_set(): 

1614 return 

1615 self._running.set() 

1616 pubsub = self.pubsub 

1617 sleep_time = self.sleep_time 

1618 while self._running.is_set(): 

1619 try: 

1620 if not self.sharded_pubsub: 

1621 pubsub.get_message( 

1622 ignore_subscribe_messages=True, timeout=sleep_time 

1623 ) 

1624 else: 

1625 pubsub.get_sharded_message( 

1626 ignore_subscribe_messages=True, timeout=sleep_time 

1627 ) 

1628 except BaseException as e: 

1629 if self.exception_handler is None: 

1630 raise 

1631 self.exception_handler(e, pubsub, self) 

1632 pubsub.close() 

1633 

1634 def stop(self) -> None: 

1635 # trip the flag so the run loop exits. the run loop will 

1636 # close the pubsub connection, which disconnects the socket 

1637 # and returns the connection to the pool. 

1638 self._running.clear() 

1639 

1640 

1641class Pipeline(Redis): 

1642 """ 

1643 Pipelines provide a way to transmit multiple commands to the Redis server 

1644 in one transmission. This is convenient for batch processing, such as 

1645 saving all the values in a list to Redis. 

1646 

1647 All commands executed within a pipeline(when running in transactional mode, 

1648 which is the default behavior) are wrapped with MULTI and EXEC 

1649 calls. This guarantees all commands executed in the pipeline will be 

1650 executed atomically. 

1651 

1652 Any command raising an exception does *not* halt the execution of 

1653 subsequent commands in the pipeline. Instead, the exception is caught 

1654 and its instance is placed into the response list returned by execute(). 

1655 Code iterating over the response list should be able to deal with an 

1656 instance of an exception as a potential value. In general, these will be 

1657 ResponseError exceptions, such as those raised when issuing a command 

1658 on a key of a different datatype. 

1659 """ 

1660 

1661 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

1662 

1663 def __init__( 

1664 self, 

1665 connection_pool: ConnectionPool, 

1666 response_callbacks, 

1667 transaction, 

1668 shard_hint, 

1669 ): 

1670 self.connection_pool = connection_pool 

1671 self.connection: Optional[Connection] = None 

1672 self.response_callbacks = response_callbacks 

1673 self.transaction = transaction 

1674 self.shard_hint = shard_hint 

1675 self.watching = False 

1676 self.command_stack = [] 

1677 self.scripts: Set[Script] = set() 

1678 self.explicit_transaction = False 

1679 

1680 def __enter__(self) -> "Pipeline": 

1681 return self 

1682 

1683 def __exit__(self, exc_type, exc_value, traceback): 

1684 self.reset() 

1685 

1686 def __del__(self): 

1687 try: 

1688 self.reset() 

1689 except Exception: 

1690 pass 

1691 

1692 def __len__(self) -> int: 

1693 return len(self.command_stack) 

1694 

1695 def __bool__(self) -> bool: 

1696 """Pipeline instances should always evaluate to True""" 

1697 return True 

1698 

1699 def reset(self) -> None: 

1700 self.command_stack = [] 

1701 self.scripts = set() 

1702 # make sure to reset the connection state in the event that we were 

1703 # watching something 

1704 if self.watching and self.connection: 

1705 try: 

1706 # call this manually since our unwatch or 

1707 # immediate_execute_command methods can call reset() 

1708 self.connection.send_command("UNWATCH") 

1709 self.connection.read_response() 

1710 except ConnectionError: 

1711 # disconnect will also remove any previous WATCHes 

1712 self.connection.disconnect() 

1713 # clean up the other instance attributes 

1714 self.watching = False 

1715 self.explicit_transaction = False 

1716 

1717 # we can safely return the connection to the pool here since we're 

1718 # sure we're no longer WATCHing anything 

1719 if self.connection: 

1720 self.connection_pool.release(self.connection) 

1721 self.connection = None 

1722 

1723 def close(self) -> None: 

1724 """Close the pipeline""" 

1725 self.reset() 

1726 

1727 def multi(self) -> None: 

1728 """ 

1729 Start a transactional block of the pipeline after WATCH commands 

1730 are issued. End the transactional block with `execute`. 

1731 """ 

1732 if self.explicit_transaction: 

1733 raise RedisError("Cannot issue nested calls to MULTI") 

1734 if self.command_stack: 

1735 raise RedisError( 

1736 "Commands without an initial WATCH have already been issued" 

1737 ) 

1738 self.explicit_transaction = True 

1739 

1740 def execute_command(self, *args, **kwargs): 

1741 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction: 

1742 return self.immediate_execute_command(*args, **kwargs) 

1743 return self.pipeline_execute_command(*args, **kwargs) 

1744 

1745 def _disconnect_reset_raise_on_watching( 

1746 self, 

1747 conn: AbstractConnection, 

1748 error: Exception, 

1749 failure_count: Optional[int] = None, 

1750 start_time: Optional[float] = None, 

1751 command_name: Optional[str] = None, 

1752 ) -> None: 

1753 """ 

1754 Close the connection reset watching state and 

1755 raise an exception if we were watching. 

1756 

1757 The supported exceptions are already checked in the 

1758 retry object so we don't need to do it here. 

1759 

1760 After we disconnect the connection, it will try to reconnect and 

1761 do a health check as part of the send_command logic(on connection level). 

1762 """ 

1763 if error and failure_count <= conn.retry.get_retries(): 

1764 record_operation_duration( 

1765 command_name=command_name, 

1766 duration_seconds=time.monotonic() - start_time, 

1767 server_address=getattr(conn, "host", None), 

1768 server_port=getattr(conn, "port", None), 

1769 db_namespace=str(conn.db), 

1770 error=error, 

1771 retry_attempts=failure_count, 

1772 ) 

1773 conn.disconnect() 

1774 

1775 # if we were already watching a variable, the watch is no longer 

1776 # valid since this connection has died. raise a WatchError, which 

1777 # indicates the user should retry this transaction. 

1778 if self.watching: 

1779 self.reset() 

1780 raise WatchError( 

1781 f"A {type(error).__name__} occurred while watching one or more keys" 

1782 ) 

1783 

1784 def immediate_execute_command(self, *args, **options): 

1785 """ 

1786 Execute a command immediately, but don't auto-retry on the supported 

1787 errors for retry if we're already WATCHing a variable. 

1788 Used when issuing WATCH or subsequent commands retrieving their values but before 

1789 MULTI is called. 

1790 """ 

1791 command_name = args[0] 

1792 conn = self.connection 

1793 # if this is the first call, we need a connection 

1794 if not conn: 

1795 conn = self.connection_pool.get_connection() 

1796 self.connection = conn 

1797 

1798 # Start timing for observability 

1799 start_time = time.monotonic() 

1800 # Track actual retry attempts for error reporting 

1801 actual_retry_attempts = [0] 

1802 

1803 def failure_callback(error, failure_count): 

1804 if is_debug_log_enabled(): 

1805 add_debug_log_for_operation_failure(conn) 

1806 actual_retry_attempts[0] = failure_count 

1807 self._disconnect_reset_raise_on_watching( 

1808 conn, error, failure_count, start_time, command_name 

1809 ) 

1810 

1811 try: 

1812 response = conn.retry.call_with_retry( 

1813 lambda: self._send_command_parse_response( 

1814 conn, command_name, *args, **options 

1815 ), 

1816 failure_callback, 

1817 with_failure_count=True, 

1818 ) 

1819 

1820 record_operation_duration( 

1821 command_name=command_name, 

1822 duration_seconds=time.monotonic() - start_time, 

1823 server_address=getattr(conn, "host", None), 

1824 server_port=getattr(conn, "port", None), 

1825 db_namespace=str(conn.db), 

1826 ) 

1827 

1828 return response 

1829 except Exception as e: 

1830 record_error_count( 

1831 server_address=getattr(conn, "host", None), 

1832 server_port=getattr(conn, "port", None), 

1833 network_peer_address=getattr(conn, "host", None), 

1834 network_peer_port=getattr(conn, "port", None), 

1835 error_type=e, 

1836 retry_attempts=actual_retry_attempts[0], 

1837 is_internal=False, 

1838 ) 

1839 raise 

1840 

1841 def pipeline_execute_command(self, *args, **options) -> "Pipeline": 

1842 """ 

1843 Stage a command to be executed when execute() is next called 

1844 

1845 Returns the current Pipeline object back so commands can be 

1846 chained together, such as: 

1847 

1848 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

1849 

1850 At some other point, you can then run: pipe.execute(), 

1851 which will execute all commands queued in the pipe. 

1852 """ 

1853 self.command_stack.append((args, options)) 

1854 return self 

1855 

1856 def _execute_transaction( 

1857 self, connection: Connection, commands, raise_on_error 

1858 ) -> List: 

1859 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) 

1860 all_cmds = connection.pack_commands( 

1861 [args for args, options in cmds if EMPTY_RESPONSE not in options] 

1862 ) 

1863 connection.send_packed_command(all_cmds) 

1864 errors = [] 

1865 

1866 # parse off the response for MULTI 

1867 # NOTE: we need to handle ResponseErrors here and continue 

1868 # so that we read all the additional command messages from 

1869 # the socket 

1870 try: 

1871 self.parse_response(connection, "_") 

1872 except ResponseError as e: 

1873 errors.append((0, e)) 

1874 

1875 # and all the other commands 

1876 for i, command in enumerate(commands): 

1877 if EMPTY_RESPONSE in command[1]: 

1878 errors.append((i, command[1][EMPTY_RESPONSE])) 

1879 else: 

1880 try: 

1881 self.parse_response(connection, "_") 

1882 except ResponseError as e: 

1883 self.annotate_exception(e, i + 1, command[0]) 

1884 errors.append((i, e)) 

1885 

1886 # parse the EXEC. 

1887 try: 

1888 response = self.parse_response(connection, "_") 

1889 except ExecAbortError: 

1890 if errors: 

1891 raise errors[0][1] 

1892 raise 

1893 

1894 # EXEC clears any watched keys 

1895 self.watching = False 

1896 

1897 if response is None: 

1898 raise WatchError("Watched variable changed.") 

1899 

1900 # put any parse errors into the response 

1901 for i, e in errors: 

1902 response.insert(i, e) 

1903 

1904 if len(response) != len(commands): 

1905 self.connection.disconnect() 

1906 raise ResponseError( 

1907 "Wrong number of response items from pipeline execution" 

1908 ) 

1909 

1910 # find any errors in the response and raise if necessary 

1911 if raise_on_error: 

1912 self.raise_first_error(commands, response) 

1913 

1914 # We have to run response callbacks manually 

1915 data = [] 

1916 for r, cmd in zip(response, commands): 

1917 if not isinstance(r, Exception): 

1918 args, options = cmd 

1919 # Remove keys entry, it needs only for cache. 

1920 options.pop("keys", None) 

1921 command_name = args[0] 

1922 if command_name in self.response_callbacks: 

1923 r = self.response_callbacks[command_name](r, **options) 

1924 data.append(r) 

1925 

1926 return data 

1927 

1928 def _execute_pipeline(self, connection, commands, raise_on_error): 

1929 # build up all commands into a single request to increase network perf 

1930 all_cmds = connection.pack_commands([args for args, _ in commands]) 

1931 connection.send_packed_command(all_cmds) 

1932 

1933 responses = [] 

1934 for args, options in commands: 

1935 try: 

1936 responses.append(self.parse_response(connection, args[0], **options)) 

1937 except ResponseError as e: 

1938 responses.append(e) 

1939 

1940 if raise_on_error: 

1941 self.raise_first_error(commands, responses) 

1942 

1943 return responses 

1944 

1945 def raise_first_error(self, commands, response): 

1946 for i, r in enumerate(response): 

1947 if isinstance(r, ResponseError): 

1948 self.annotate_exception(r, i + 1, commands[i][0]) 

1949 raise r 

1950 

1951 def annotate_exception(self, exception, number, command): 

1952 cmd = " ".join(map(safe_str, command)) 

1953 msg = ( 

1954 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

1955 f"caused error: {exception.args[0]}" 

1956 ) 

1957 exception.args = (msg,) + exception.args[1:] 

1958 

1959 def parse_response(self, connection, command_name, **options): 

1960 result = Redis.parse_response(self, connection, command_name, **options) 

1961 if command_name in self.UNWATCH_COMMANDS: 

1962 self.watching = False 

1963 elif command_name == "WATCH": 

1964 self.watching = True 

1965 return result 

1966 

1967 def load_scripts(self): 

1968 # make sure all scripts that are about to be run on this pipeline exist 

1969 scripts = list(self.scripts) 

1970 immediate = self.immediate_execute_command 

1971 shas = [s.sha for s in scripts] 

1972 # we can't use the normal script_* methods because they would just 

1973 # get buffered in the pipeline. 

1974 exists = immediate("SCRIPT EXISTS", *shas) 

1975 if not all(exists): 

1976 for s, exist in zip(scripts, exists): 

1977 if not exist: 

1978 s.sha = immediate("SCRIPT LOAD", s.script) 

1979 

1980 def _disconnect_raise_on_watching( 

1981 self, 

1982 conn: AbstractConnection, 

1983 error: Exception, 

1984 failure_count: Optional[int] = None, 

1985 start_time: Optional[float] = None, 

1986 command_name: Optional[str] = None, 

1987 ) -> None: 

1988 """ 

1989 Close the connection, raise an exception if we were watching. 

1990 

1991 The supported exceptions are already checked in the 

1992 retry object so we don't need to do it here. 

1993 

1994 After we disconnect the connection, it will try to reconnect and 

1995 do a health check as part of the send_command logic(on connection level). 

1996 """ 

1997 if error and failure_count <= conn.retry.get_retries(): 

1998 record_operation_duration( 

1999 command_name=command_name, 

2000 duration_seconds=time.monotonic() - start_time, 

2001 server_address=getattr(conn, "host", None), 

2002 server_port=getattr(conn, "port", None), 

2003 db_namespace=str(conn.db), 

2004 error=error, 

2005 retry_attempts=failure_count, 

2006 ) 

2007 conn.disconnect() 

2008 # if we were watching a variable, the watch is no longer valid 

2009 # since this connection has died. raise a WatchError, which 

2010 # indicates the user should retry this transaction. 

2011 if self.watching: 

2012 raise WatchError( 

2013 f"A {type(error).__name__} occurred while watching one or more keys" 

2014 ) 

2015 

2016 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2017 """Execute all the commands in the current pipeline""" 

2018 stack = self.command_stack 

2019 if not stack and not self.watching: 

2020 return [] 

2021 if self.scripts: 

2022 self.load_scripts() 

2023 if self.transaction or self.explicit_transaction: 

2024 execute = self._execute_transaction 

2025 operation_name = "MULTI" 

2026 else: 

2027 execute = self._execute_pipeline 

2028 operation_name = "PIPELINE" 

2029 

2030 conn = self.connection 

2031 if not conn: 

2032 conn = self.connection_pool.get_connection() 

2033 # assign to self.connection so reset() releases the connection 

2034 # back to the pool after we're done 

2035 self.connection = conn 

2036 

2037 # Start timing for observability 

2038 start_time = time.monotonic() 

2039 # Track actual retry attempts for error reporting 

2040 actual_retry_attempts = [0] 

2041 

2042 def failure_callback(error, failure_count): 

2043 if is_debug_log_enabled(): 

2044 add_debug_log_for_operation_failure(conn) 

2045 actual_retry_attempts[0] = failure_count 

2046 self._disconnect_raise_on_watching( 

2047 conn, error, failure_count, start_time, operation_name 

2048 ) 

2049 

2050 try: 

2051 response = conn.retry.call_with_retry( 

2052 lambda: execute(conn, stack, raise_on_error), 

2053 failure_callback, 

2054 with_failure_count=True, 

2055 ) 

2056 

2057 record_operation_duration( 

2058 command_name=operation_name, 

2059 duration_seconds=time.monotonic() - start_time, 

2060 server_address=getattr(conn, "host", None), 

2061 server_port=getattr(conn, "port", None), 

2062 db_namespace=str(conn.db), 

2063 ) 

2064 return response 

2065 except Exception as e: 

2066 record_error_count( 

2067 server_address=getattr(conn, "host", None), 

2068 server_port=getattr(conn, "port", None), 

2069 network_peer_address=getattr(conn, "host", None), 

2070 network_peer_port=getattr(conn, "port", None), 

2071 error_type=e, 

2072 retry_attempts=actual_retry_attempts[0], 

2073 is_internal=False, 

2074 ) 

2075 raise 

2076 

2077 finally: 

2078 # in reset() the connection is disconnected before returned to the pool if 

2079 # it is marked for reconnect. 

2080 self.reset() 

2081 

2082 def discard(self): 

2083 """ 

2084 Flushes all previously queued commands 

2085 See: https://redis.io/commands/DISCARD 

2086 """ 

2087 self.execute_command("DISCARD") 

2088 

2089 def watch(self, *names): 

2090 """Watches the values at keys ``names``""" 

2091 if self.explicit_transaction: 

2092 raise RedisError("Cannot issue a WATCH after a MULTI") 

2093 return self.execute_command("WATCH", *names) 

2094 

2095 def unwatch(self) -> bool: 

2096 """Unwatches all previously specified keys""" 

2097 return self.watching and self.execute_command("UNWATCH") or True