Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

802 statements  

1import copy 

2import logging 

3import re 

4import threading 

5import time 

6from itertools import chain 

7from typing import ( 

8 TYPE_CHECKING, 

9 Any, 

10 Callable, 

11 Dict, 

12 List, 

13 Literal, 

14 Mapping, 

15 Optional, 

16 Set, 

17 Type, 

18 Union, 

19) 

20 

21from redis._parsers.encoders import Encoder 

22from redis._parsers.helpers import ( 

23 _RedisCallbacks, 

24 _RedisCallbacksRESP2, 

25 _RedisCallbacksRESP3, 

26 bool_ok, 

27) 

28from redis._parsers.socket import SENTINEL 

29from redis.backoff import ExponentialWithJitterBackoff 

30from redis.cache import CacheConfig, CacheInterface 

31from redis.commands import ( 

32 CoreCommands, 

33 RedisModuleCommands, 

34 SentinelCommands, 

35 list_or_args, 

36) 

37from redis.commands.core import Script 

38from redis.commands.helpers import partition_pubsub_subscriptions_by_handler 

39from redis.connection import ( 

40 AbstractConnection, 

41 Connection, 

42 ConnectionPool, 

43 SSLConnection, 

44 UnixDomainSocketConnection, 

45) 

46from redis.credentials import CredentialProvider 

47from redis.driver_info import DriverInfo, resolve_driver_info 

48from redis.event import ( 

49 AfterPooledConnectionsInstantiationEvent, 

50 AfterPubSubConnectionInstantiationEvent, 

51 AfterSingleConnectionInstantiationEvent, 

52 ClientType, 

53 EventDispatcher, 

54) 

55from redis.exceptions import ( 

56 ConnectionError, 

57 ExecAbortError, 

58 PubSubError, 

59 RedisError, 

60 ResponseError, 

61 WatchError, 

62) 

63from redis.lock import Lock 

64from redis.maint_notifications import ( 

65 MaintNotificationsConfig, 

66 OSSMaintNotificationsHandler, 

67) 

68from redis.observability.attributes import PubSubDirection 

69from redis.observability.recorder import ( 

70 record_error_count, 

71 record_operation_duration, 

72 record_pubsub_message, 

73) 

74from redis.retry import Retry 

75from redis.utils import ( 

76 DEFAULT_RESP_VERSION, 

77 _set_info_logger, 

78 check_protocol_version, 

79 deprecated_args, 

80 safe_str, 

81 str_if_bytes, 

82 truncate_text, 

83) 

84 

85if TYPE_CHECKING: 

86 import ssl 

87 

88 import OpenSSL 

89 

90 from redis.keyspace_notifications import KeyspaceNotifications 

91 

92SYM_EMPTY = b"" 

93EMPTY_RESPONSE = "EMPTY_RESPONSE" 

94 

95# some responses (ie. dump) are binary, and just meant to never be decoded 

96NEVER_DECODE = "NEVER_DECODE" 

97 

98 

99logger = logging.getLogger(__name__) 

100 

101 

102def is_debug_log_enabled(): 

103 return logger.isEnabledFor(logging.DEBUG) 

104 

105 

106def add_debug_log_for_operation_failure(connection: "AbstractConnection"): 

107 logger.debug( 

108 f"Operation failed, " 

109 f"with connection: {connection}, details: {connection.extract_connection_details() if connection else 'no connection'}", 

110 ) 

111 

112 

113class CaseInsensitiveDict(dict): 

114 "Case insensitive dict implementation. Assumes string keys only." 

115 

116 def __init__(self, data: Dict[str, str]) -> None: 

117 for k, v in data.items(): 

118 self[k.upper()] = v 

119 

120 def __contains__(self, k): 

121 return super().__contains__(k.upper()) 

122 

123 def __delitem__(self, k): 

124 super().__delitem__(k.upper()) 

125 

126 def __getitem__(self, k): 

127 return super().__getitem__(k.upper()) 

128 

129 def get(self, k, default=None): 

130 return super().get(k.upper(), default) 

131 

132 def __setitem__(self, k, v): 

133 super().__setitem__(k.upper(), v) 

134 

135 def update(self, data): 

136 data = CaseInsensitiveDict(data) 

137 super().update(data) 

138 

139 

140class AbstractRedis: 

141 pass 

142 

143 

144class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): 

145 """ 

146 Implementation of the Redis protocol. 

147 

148 This abstract class provides a Python interface to all Redis commands 

149 and an implementation of the Redis protocol. 

150 

151 Pipelines derive from this, implementing how 

152 the commands are sent and received to the Redis server. Based on 

153 configuration, an instance will either use a ConnectionPool, or 

154 Connection object to talk to redis. 

155 

156 It is not safe to pass PubSub or Pipeline objects between threads. 

157 """ 

158 

159 # Type discrimination marker for @overload self-type pattern 

160 _is_async_client: Literal[False] = False 

161 

162 @classmethod 

163 def from_url(cls, url: str, **kwargs) -> "Redis": 

164 """ 

165 Return a Redis client object configured from the given URL 

166 

167 For example:: 

168 

169 redis://[[username]:[password]]@localhost:6379/0 

170 rediss://[[username]:[password]]@localhost:6379/0 

171 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

172 

173 Three URL schemes are supported: 

174 

175 - `redis://` creates a TCP socket connection. See more at: 

176 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

177 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

178 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

179 - ``unix://``: creates a Unix Domain Socket connection. 

180 

181 The username, password, hostname, path and all querystring values 

182 are passed through urllib.parse.unquote in order to replace any 

183 percent-encoded values with their corresponding characters. 

184 

185 There are several ways to specify a database number. The first value 

186 found will be used: 

187 

188 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

189 2. If using the redis:// or rediss:// schemes, the path argument 

190 of the url, e.g. redis://localhost/0 

191 3. A ``db`` keyword argument to this function. 

192 

193 If none of these options are specified, the default db=0 is used. 

194 

195 All querystring options are cast to their appropriate Python types. 

196 Boolean arguments can be specified with string values "True"/"False" 

197 or "Yes"/"No". Values that cannot be properly cast cause a 

198 ``ValueError`` to be raised. Once parsed, the querystring arguments 

199 and keyword arguments are passed to the ``ConnectionPool``'s 

200 class initializer. In the case of conflicting arguments, querystring 

201 arguments always win. 

202 

203 """ 

204 single_connection_client = kwargs.pop("single_connection_client", False) 

205 connection_pool = ConnectionPool.from_url(url, **kwargs) 

206 client = cls( 

207 connection_pool=connection_pool, 

208 single_connection_client=single_connection_client, 

209 ) 

210 client.auto_close_connection_pool = True 

211 return client 

212 

213 @classmethod 

214 def from_pool( 

215 cls: Type["Redis"], 

216 connection_pool: ConnectionPool, 

217 ) -> "Redis": 

218 """ 

219 Return a Redis client from the given connection pool. 

220 The Redis client will take ownership of the connection pool and 

221 close it when the Redis client is closed. 

222 """ 

223 client = cls( 

224 connection_pool=connection_pool, 

225 ) 

226 client.auto_close_connection_pool = True 

227 return client 

228 

229 @deprecated_args( 

230 args_to_warn=["retry_on_timeout"], 

231 reason="TimeoutError is included by default.", 

232 version="6.0.0", 

233 ) 

234 @deprecated_args( 

235 args_to_warn=["lib_name", "lib_version"], 

236 reason="Use 'driver_info' parameter instead. " 

237 "lib_name and lib_version will be removed in a future version.", 

238 ) 

239 def __init__( 

240 self, 

241 host: str = "localhost", 

242 port: int = 6379, 

243 db: int = 0, 

244 password: Optional[str] = None, 

245 socket_timeout: Optional[float] = None, 

246 socket_connect_timeout: Optional[float] = None, 

247 socket_keepalive: Optional[bool] = None, 

248 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, 

249 connection_pool: Optional[ConnectionPool] = None, 

250 unix_socket_path: Optional[str] = None, 

251 encoding: str = "utf-8", 

252 encoding_errors: str = "strict", 

253 decode_responses: bool = False, 

254 retry_on_timeout: bool = False, 

255 retry: Retry = Retry( 

256 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 

257 ), 

258 retry_on_error: Optional[List[Type[Exception]]] = None, 

259 ssl: bool = False, 

260 ssl_keyfile: Optional[str] = None, 

261 ssl_certfile: Optional[str] = None, 

262 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required", 

263 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

264 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

265 ssl_ca_certs: Optional[str] = None, 

266 ssl_ca_path: Optional[str] = None, 

267 ssl_ca_data: Optional[str] = None, 

268 ssl_check_hostname: bool = True, 

269 ssl_password: Optional[str] = None, 

270 ssl_validate_ocsp: bool = False, 

271 ssl_validate_ocsp_stapled: bool = False, 

272 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None, 

273 ssl_ocsp_expected_cert: Optional[str] = None, 

274 ssl_min_version: Optional["ssl.TLSVersion"] = None, 

275 ssl_ciphers: Optional[str] = None, 

276 max_connections: Optional[int] = None, 

277 single_connection_client: bool = False, 

278 health_check_interval: int = 0, 

279 client_name: Optional[str] = None, 

280 lib_name: Optional[str] = None, 

281 lib_version: Optional[str] = None, 

282 driver_info: Optional["DriverInfo"] = None, 

283 username: Optional[str] = None, 

284 redis_connect_func: Optional[Callable[[], None]] = None, 

285 credential_provider: Optional[CredentialProvider] = None, 

286 protocol: Optional[int] = 3, 

287 cache: Optional[CacheInterface] = None, 

288 cache_config: Optional[CacheConfig] = None, 

289 event_dispatcher: Optional[EventDispatcher] = None, 

290 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

291 oss_cluster_maint_notifications_handler: Optional[ 

292 OSSMaintNotificationsHandler 

293 ] = None, 

294 ) -> None: 

295 """ 

296 Initialize a new Redis client. 

297 

298 To specify a retry policy for specific errors, you have two options: 

299 

300 1. Set the `retry_on_error` to a list of the error/s to retry on, and 

301 you can also set `retry` to a valid `Retry` object(in case the default 

302 one is not appropriate) - with this approach the retries will be triggered 

303 on the default errors specified in the Retry object enriched with the 

304 errors specified in `retry_on_error`. 

305 

306 2. Define a `Retry` object with configured 'supported_errors' and set 

307 it to the `retry` parameter - with this approach you completely redefine 

308 the errors on which retries will happen. 

309 

310 `retry_on_timeout` is deprecated - please include the TimeoutError 

311 either in the Retry object or in the `retry_on_error` list. 

312 

313 When 'connection_pool' is provided - the retry configuration of the 

314 provided pool will be used. 

315 

316 Args: 

317 

318 single_connection_client: 

319 if `True`, connection pool is not used. In that case `Redis` 

320 instance use is not thread safe. 

321 decode_responses: 

322 if `True`, the response will be decoded to utf-8. 

323 Argument is ignored when connection_pool is provided. 

324 driver_info: 

325 Optional DriverInfo object to identify upstream libraries. 

326 If provided, lib_name and lib_version are ignored. 

327 If not provided, a DriverInfo will be created from lib_name and lib_version. 

328 Argument is ignored when connection_pool is provided. 

329 lib_name: 

330 **Deprecated.** Use driver_info instead. Library name for CLIENT SETINFO. 

331 lib_version: 

332 **Deprecated.** Use driver_info instead. Library version for CLIENT SETINFO. 

333 maint_notifications_config: 

334 configures the pool to support maintenance notifications - see 

335 `redis.maint_notifications.MaintNotificationsConfig` for details. 

336 Only supported with RESP3 

337 If not provided and protocol is RESP3, the maintenance notifications 

338 will be enabled by default (logic is included in the connection pool 

339 initialization). 

340 Argument is ignored when connection_pool is provided. 

341 oss_cluster_maint_notifications_handler: 

342 handler for OSS cluster notifications - see 

343 `redis.maint_notifications.OSSMaintNotificationsHandler` for details. 

344 Only supported with RESP3 

345 Argument is ignored when connection_pool is provided. 

346 """ 

347 if event_dispatcher is None: 

348 self._event_dispatcher = EventDispatcher() 

349 else: 

350 self._event_dispatcher = event_dispatcher 

351 if not connection_pool: 

352 if not retry_on_error: 

353 retry_on_error = [] 

354 

355 # Handle driver_info: if provided, use it; otherwise create from lib_name/lib_version 

356 computed_driver_info = resolve_driver_info( 

357 driver_info, lib_name, lib_version 

358 ) 

359 

360 kwargs = { 

361 "db": db, 

362 "username": username, 

363 "password": password, 

364 "socket_timeout": socket_timeout, 

365 "encoding": encoding, 

366 "encoding_errors": encoding_errors, 

367 "decode_responses": decode_responses, 

368 "retry_on_error": retry_on_error, 

369 "retry": copy.deepcopy(retry), 

370 "max_connections": max_connections, 

371 "health_check_interval": health_check_interval, 

372 "client_name": client_name, 

373 "driver_info": computed_driver_info, 

374 "redis_connect_func": redis_connect_func, 

375 "credential_provider": credential_provider, 

376 "protocol": protocol, 

377 } 

378 # based on input, setup appropriate connection args 

379 if unix_socket_path is not None: 

380 kwargs.update( 

381 { 

382 "path": unix_socket_path, 

383 "connection_class": UnixDomainSocketConnection, 

384 } 

385 ) 

386 else: 

387 # TCP specific options 

388 kwargs.update( 

389 { 

390 "host": host, 

391 "port": port, 

392 "socket_connect_timeout": socket_connect_timeout, 

393 "socket_keepalive": socket_keepalive, 

394 "socket_keepalive_options": socket_keepalive_options, 

395 } 

396 ) 

397 

398 if ssl: 

399 kwargs.update( 

400 { 

401 "connection_class": SSLConnection, 

402 "ssl_keyfile": ssl_keyfile, 

403 "ssl_certfile": ssl_certfile, 

404 "ssl_cert_reqs": ssl_cert_reqs, 

405 "ssl_include_verify_flags": ssl_include_verify_flags, 

406 "ssl_exclude_verify_flags": ssl_exclude_verify_flags, 

407 "ssl_ca_certs": ssl_ca_certs, 

408 "ssl_ca_data": ssl_ca_data, 

409 "ssl_check_hostname": ssl_check_hostname, 

410 "ssl_password": ssl_password, 

411 "ssl_ca_path": ssl_ca_path, 

412 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled, 

413 "ssl_validate_ocsp": ssl_validate_ocsp, 

414 "ssl_ocsp_context": ssl_ocsp_context, 

415 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert, 

416 "ssl_min_version": ssl_min_version, 

417 "ssl_ciphers": ssl_ciphers, 

418 } 

419 ) 

420 if (cache_config or cache) and check_protocol_version(protocol, 3): 

421 kwargs.update( 

422 { 

423 "cache": cache, 

424 "cache_config": cache_config, 

425 } 

426 ) 

427 maint_notifications_enabled = ( 

428 maint_notifications_config and maint_notifications_config.enabled 

429 ) 

430 if maint_notifications_enabled and protocol not in [ 

431 3, 

432 "3", 

433 ]: 

434 raise RedisError( 

435 "Maintenance notifications handlers on connection are only supported with RESP version 3" 

436 ) 

437 if maint_notifications_config: 

438 kwargs.update( 

439 { 

440 "maint_notifications_config": maint_notifications_config, 

441 } 

442 ) 

443 if oss_cluster_maint_notifications_handler: 

444 kwargs.update( 

445 { 

446 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler, 

447 } 

448 ) 

449 connection_pool = ConnectionPool(**kwargs) 

450 self._event_dispatcher.dispatch( 

451 AfterPooledConnectionsInstantiationEvent( 

452 [connection_pool], ClientType.SYNC, credential_provider 

453 ) 

454 ) 

455 self.auto_close_connection_pool = True 

456 else: 

457 self.auto_close_connection_pool = False 

458 self._event_dispatcher.dispatch( 

459 AfterPooledConnectionsInstantiationEvent( 

460 [connection_pool], ClientType.SYNC, credential_provider 

461 ) 

462 ) 

463 

464 self.connection_pool = connection_pool 

465 

466 if (cache_config or cache) and self.connection_pool.get_protocol() not in [ 

467 3, 

468 "3", 

469 ]: 

470 raise RedisError("Client caching is only supported with RESP version 3") 

471 

472 self.single_connection_lock = threading.RLock() 

473 self.connection = None 

474 self._single_connection_client = single_connection_client 

475 if self._single_connection_client: 

476 self.connection = self.connection_pool.get_connection() 

477 self._event_dispatcher.dispatch( 

478 AfterSingleConnectionInstantiationEvent( 

479 self.connection, ClientType.SYNC, self.single_connection_lock 

480 ) 

481 ) 

482 

483 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks) 

484 

485 if check_protocol_version( 

486 self.connection_pool.connection_kwargs.get( 

487 "protocol", DEFAULT_RESP_VERSION 

488 ), 

489 3, 

490 ): 

491 self.response_callbacks.update(_RedisCallbacksRESP3) 

492 else: 

493 self.response_callbacks.update(_RedisCallbacksRESP2) 

494 

495 def __repr__(self) -> str: 

496 return ( 

497 f"<{type(self).__module__}.{type(self).__name__}" 

498 f"({repr(self.connection_pool)})>" 

499 ) 

500 

501 def get_encoder(self) -> "Encoder": 

502 """Get the connection pool's encoder""" 

503 return self.connection_pool.get_encoder() 

504 

505 def get_connection_kwargs(self) -> Dict: 

506 """Get the connection's key-word arguments""" 

507 return self.connection_pool.connection_kwargs 

508 

509 def get_retry(self) -> Optional[Retry]: 

510 return self.get_connection_kwargs().get("retry") 

511 

512 def set_retry(self, retry: Retry) -> None: 

513 self.get_connection_kwargs().update({"retry": retry}) 

514 self.connection_pool.set_retry(retry) 

515 

516 def set_response_callback(self, command: str, callback: Callable) -> None: 

517 """Set a custom Response Callback""" 

518 self.response_callbacks[command] = callback 

519 

520 def load_external_module(self, funcname, func) -> None: 

521 """ 

522 This function can be used to add externally defined redis modules, 

523 and their namespaces to the redis client. 

524 

525 funcname - A string containing the name of the function to create 

526 func - The function, being added to this class. 

527 

528 ex: Assume that one has a custom redis module named foomod that 

529 creates command named 'foo.dothing' and 'foo.anotherthing' in redis. 

530 To load function functions into this namespace: 

531 

532 from redis import Redis 

533 from foomodule import F 

534 r = Redis() 

535 r.load_external_module("foo", F) 

536 r.foo().dothing('your', 'arguments') 

537 

538 For a concrete example see the reimport of the redisjson module in 

539 tests/test_connection.py::test_loading_external_modules 

540 """ 

541 setattr(self, funcname, func) 

542 

543 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline": 

544 """ 

545 Return a new pipeline object that can queue multiple commands for 

546 later execution. ``transaction`` indicates whether all commands 

547 should be executed atomically. Apart from making a group of operations 

548 atomic, pipelines are useful for reducing the back-and-forth overhead 

549 between the client and server. 

550 """ 

551 return Pipeline( 

552 self.connection_pool, self.response_callbacks, transaction, shard_hint 

553 ) 

554 

555 def transaction( 

556 self, func: Callable[["Pipeline"], None], *watches, **kwargs 

557 ) -> Union[List[Any], Any, None]: 

558 """ 

559 Convenience method for executing the callable `func` as a transaction 

560 while watching all keys specified in `watches`. The 'func' callable 

561 should expect a single argument which is a Pipeline object. 

562 """ 

563 shard_hint = kwargs.pop("shard_hint", None) 

564 value_from_callable = kwargs.pop("value_from_callable", False) 

565 watch_delay = kwargs.pop("watch_delay", None) 

566 with self.pipeline(True, shard_hint) as pipe: 

567 while True: 

568 try: 

569 if watches: 

570 pipe.watch(*watches) 

571 func_value = func(pipe) 

572 exec_value = pipe.execute() 

573 return func_value if value_from_callable else exec_value 

574 except WatchError: 

575 if watch_delay is not None and watch_delay > 0: 

576 time.sleep(watch_delay) 

577 continue 

578 

579 def lock( 

580 self, 

581 name: str, 

582 timeout: Optional[float] = None, 

583 sleep: float = 0.1, 

584 blocking: bool = True, 

585 blocking_timeout: Optional[float] = None, 

586 lock_class: Union[None, Any] = None, 

587 thread_local: bool = True, 

588 raise_on_release_error: bool = True, 

589 ): 

590 """ 

591 Return a new Lock object using key ``name`` that mimics 

592 the behavior of threading.Lock. 

593 

594 If specified, ``timeout`` indicates a maximum life for the lock. 

595 By default, it will remain locked until release() is called. 

596 

597 ``sleep`` indicates the amount of time to sleep per loop iteration 

598 when the lock is in blocking mode and another client is currently 

599 holding the lock. 

600 

601 ``blocking`` indicates whether calling ``acquire`` should block until 

602 the lock has been acquired or to fail immediately, causing ``acquire`` 

603 to return False and the lock not being acquired. Defaults to True. 

604 Note this value can be overridden by passing a ``blocking`` 

605 argument to ``acquire``. 

606 

607 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

608 spend trying to acquire the lock. A value of ``None`` indicates 

609 continue trying forever. ``blocking_timeout`` can be specified as a 

610 float or integer, both representing the number of seconds to wait. 

611 

612 ``lock_class`` forces the specified lock implementation. Note that as 

613 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

614 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

615 you have created your own custom lock class. 

616 

617 ``thread_local`` indicates whether the lock token is placed in 

618 thread-local storage. By default, the token is placed in thread local 

619 storage so that a thread only sees its token, not a token set by 

620 another thread. Consider the following timeline: 

621 

622 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

623 thread-1 sets the token to "abc" 

624 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

625 Lock instance. 

626 time: 5, thread-1 has not yet completed. redis expires the lock 

627 key. 

628 time: 5, thread-2 acquired `my-lock` now that it's available. 

629 thread-2 sets the token to "xyz" 

630 time: 6, thread-1 finishes its work and calls release(). if the 

631 token is *not* stored in thread local storage, then 

632 thread-1 would see the token value as "xyz" and would be 

633 able to successfully release the thread-2's lock. 

634 

635 ``raise_on_release_error`` indicates whether to raise an exception when 

636 the lock is no longer owned when exiting the context manager. By default, 

637 this is True, meaning an exception will be raised. If False, the warning 

638 will be logged and the exception will be suppressed. 

639 

640 In some use cases it's necessary to disable thread local storage. For 

641 example, if you have code where one thread acquires a lock and passes 

642 that lock instance to a worker thread to release later. If thread 

643 local storage isn't disabled in this case, the worker thread won't see 

644 the token set by the thread that acquired the lock. Our assumption 

645 is that these cases aren't common and as such default to using 

646 thread local storage.""" 

647 if lock_class is None: 

648 lock_class = Lock 

649 return lock_class( 

650 self, 

651 name, 

652 timeout=timeout, 

653 sleep=sleep, 

654 blocking=blocking, 

655 blocking_timeout=blocking_timeout, 

656 thread_local=thread_local, 

657 raise_on_release_error=raise_on_release_error, 

658 ) 

659 

660 def pubsub(self, **kwargs): 

661 """ 

662 Return a Publish/Subscribe object. With this object, you can 

663 subscribe to channels and listen for messages that get published to 

664 them. 

665 """ 

666 return PubSub( 

667 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs 

668 ) 

669 

670 def keyspace_notifications( 

671 self, 

672 key_prefix: Union[str, bytes, None] = None, 

673 ignore_subscribe_messages: bool = True, 

674 ) -> "KeyspaceNotifications": 

675 """ 

676 Return a :class:`~redis.keyspace_notifications.KeyspaceNotifications` 

677 object for subscribing to keyspace and keyevent notifications. 

678 

679 Note: Keyspace notifications must be enabled on the Redis server via 

680 the ``notify-keyspace-events`` configuration option. 

681 

682 Args: 

683 key_prefix: Optional prefix to filter and strip from keys in 

684 notifications. 

685 ignore_subscribe_messages: If True, subscribe/unsubscribe 

686 confirmations are not returned by 

687 get_message/listen. 

688 """ 

689 from redis.keyspace_notifications import KeyspaceNotifications 

690 

691 return KeyspaceNotifications( 

692 self, 

693 key_prefix=key_prefix, 

694 ignore_subscribe_messages=ignore_subscribe_messages, 

695 ) 

696 

697 def monitor(self): 

698 return Monitor(self.connection_pool) 

699 

700 def client(self): 

701 return self.__class__( 

702 connection_pool=self.connection_pool, 

703 single_connection_client=True, 

704 ) 

705 

706 def __enter__(self): 

707 return self 

708 

709 def __exit__(self, exc_type, exc_value, traceback): 

710 self.close() 

711 

712 def __del__(self): 

713 try: 

714 self.close() 

715 except Exception: 

716 pass 

717 

718 def close(self) -> None: 

719 # In case a connection property does not yet exist 

720 # (due to a crash earlier in the Redis() constructor), return 

721 # immediately as there is nothing to clean-up. 

722 if not hasattr(self, "connection"): 

723 return 

724 

725 conn = self.connection 

726 if conn: 

727 self.connection = None 

728 self.connection_pool.release(conn) 

729 

730 if self.auto_close_connection_pool: 

731 self.connection_pool.disconnect() 

732 

733 def _send_command_parse_response(self, conn, command_name, *args, **options): 

734 """ 

735 Send a command and parse the response 

736 """ 

737 conn.send_command(*args, **options) 

738 return self.parse_response(conn, command_name, **options) 

739 

740 def _close_connection( 

741 self, 

742 conn, 

743 error: Optional[Exception] = None, 

744 failure_count: Optional[int] = None, 

745 start_time: Optional[float] = None, 

746 command_name: Optional[str] = None, 

747 ) -> None: 

748 """ 

749 Close the connection before retrying. 

750 

751 The supported exceptions are already checked in the 

752 retry object so we don't need to do it here. 

753 

754 After we disconnect the connection, it will try to reconnect and 

755 do a health check as part of the send_command logic(on connection level). 

756 """ 

757 if error and failure_count <= conn.retry.get_retries(): 

758 record_operation_duration( 

759 command_name=command_name, 

760 duration_seconds=time.monotonic() - start_time, 

761 server_address=getattr(conn, "host", None), 

762 server_port=getattr(conn, "port", None), 

763 db_namespace=str(conn.db), 

764 error=error, 

765 retry_attempts=failure_count, 

766 ) 

767 

768 conn.disconnect() 

769 

770 # COMMAND EXECUTION AND PROTOCOL PARSING 

771 def execute_command(self, *args, **options): 

772 return self._execute_command(*args, **options) 

773 

774 def _execute_command(self, *args, **options): 

775 """Execute a command and return a parsed response""" 

776 pool = self.connection_pool 

777 command_name = args[0] 

778 conn = self.connection or pool.get_connection() 

779 

780 # Start timing for observability 

781 start_time = time.monotonic() 

782 # Track actual retry attempts for error reporting 

783 actual_retry_attempts = [0] 

784 

785 def failure_callback(error, failure_count): 

786 if is_debug_log_enabled(): 

787 add_debug_log_for_operation_failure(conn) 

788 actual_retry_attempts[0] = failure_count 

789 self._close_connection(conn, error, failure_count, start_time, command_name) 

790 

791 if self._single_connection_client: 

792 self.single_connection_lock.acquire() 

793 try: 

794 result = conn.retry.call_with_retry( 

795 lambda: self._send_command_parse_response( 

796 conn, command_name, *args, **options 

797 ), 

798 failure_callback, 

799 with_failure_count=True, 

800 ) 

801 

802 record_operation_duration( 

803 command_name=command_name, 

804 duration_seconds=time.monotonic() - start_time, 

805 server_address=getattr(conn, "host", None), 

806 server_port=getattr(conn, "port", None), 

807 db_namespace=str(conn.db), 

808 ) 

809 return result 

810 except Exception as e: 

811 record_error_count( 

812 server_address=getattr(conn, "host", None), 

813 server_port=getattr(conn, "port", None), 

814 network_peer_address=getattr(conn, "host", None), 

815 network_peer_port=getattr(conn, "port", None), 

816 error_type=e, 

817 retry_attempts=actual_retry_attempts[0], 

818 is_internal=False, 

819 ) 

820 raise 

821 

822 finally: 

823 if conn and conn.should_reconnect(): 

824 self._close_connection(conn) 

825 conn.connect() 

826 if self._single_connection_client: 

827 self.single_connection_lock.release() 

828 if not self.connection: 

829 pool.release(conn) 

830 

831 def parse_response(self, connection, command_name, **options): 

832 """Parses a response from the Redis server""" 

833 try: 

834 if NEVER_DECODE in options: 

835 response = connection.read_response(disable_decoding=True) 

836 options.pop(NEVER_DECODE) 

837 else: 

838 response = connection.read_response() 

839 except ResponseError: 

840 if EMPTY_RESPONSE in options: 

841 return options[EMPTY_RESPONSE] 

842 raise 

843 

844 if EMPTY_RESPONSE in options: 

845 options.pop(EMPTY_RESPONSE) 

846 

847 # Remove keys entry, it needs only for cache. 

848 options.pop("keys", None) 

849 

850 if command_name in self.response_callbacks: 

851 return self.response_callbacks[command_name](response, **options) 

852 return response 

853 

854 def get_cache(self) -> Optional[CacheInterface]: 

855 return self.connection_pool.cache 

856 

857 

858StrictRedis = Redis 

859 

860 

861class Monitor: 

862 """ 

863 Monitor is useful for handling the MONITOR command to the redis server. 

864 next_command() method returns one command from monitor 

865 listen() method yields commands from monitor. 

866 """ 

867 

868 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)") 

869 command_re = re.compile(r'"(.*?)(?<!\\)"') 

870 

871 def __init__(self, connection_pool): 

872 self.connection_pool = connection_pool 

873 self.connection = self.connection_pool.get_connection() 

874 

875 def __enter__(self): 

876 self._start_monitor() 

877 return self 

878 

879 def __exit__(self, *args): 

880 self.connection.disconnect() 

881 self.connection_pool.release(self.connection) 

882 

883 def next_command(self): 

884 """Parse the response from a monitor command""" 

885 response = self.connection.read_response() 

886 

887 if response is None: 

888 return None 

889 

890 if isinstance(response, bytes): 

891 response = self.connection.encoder.decode(response, force=True) 

892 

893 command_time, command_data = response.split(" ", 1) 

894 m = self.monitor_re.match(command_data) 

895 db_id, client_info, command = m.groups() 

896 command = " ".join(self.command_re.findall(command)) 

897 # Redis escapes double quotes because each piece of the command 

898 # string is surrounded by double quotes. We don't have that 

899 # requirement so remove the escaping and leave the quote. 

900 command = command.replace('\\"', '"') 

901 

902 if client_info == "lua": 

903 client_address = "lua" 

904 client_port = "" 

905 client_type = "lua" 

906 elif client_info.startswith("unix"): 

907 client_address = "unix" 

908 client_port = client_info[5:] 

909 client_type = "unix" 

910 else: 

911 if client_info == "": 

912 client_address = "" 

913 client_port = "" 

914 client_type = "unknown" 

915 else: 

916 # use rsplit as ipv6 addresses contain colons 

917 client_address, client_port = client_info.rsplit(":", 1) 

918 client_type = "tcp" 

919 return { 

920 "time": float(command_time), 

921 "db": int(db_id), 

922 "client_address": client_address, 

923 "client_port": client_port, 

924 "client_type": client_type, 

925 "command": command, 

926 } 

927 

928 def listen(self): 

929 """Listen for commands coming to the server.""" 

930 while True: 

931 yield self.next_command() 

932 

933 def _start_monitor(self): 

934 self.connection.send_command("MONITOR") 

935 # check that monitor returns 'OK', but don't return it to user 

936 response = self.connection.read_response() 

937 

938 if not bool_ok(response): 

939 raise RedisError(f"MONITOR failed: {response}") 

940 

941 

942class PubSub: 

943 """ 

944 PubSub provides publish, subscribe and listen support to Redis channels. 

945 

946 After subscribing to one or more channels, the listen() method will block 

947 until a message arrives on one of the subscribed channels. That message 

948 will be returned and it's safe to start listening again. 

949 """ 

950 

951 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage") 

952 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe") 

953 HEALTH_CHECK_MESSAGE = "redis-py-health-check" 

954 

955 def __init__( 

956 self, 

957 connection_pool, 

958 shard_hint=None, 

959 ignore_subscribe_messages: bool = False, 

960 encoder: Optional["Encoder"] = None, 

961 push_handler_func: Union[None, Callable[[str], None]] = None, 

962 event_dispatcher: Optional["EventDispatcher"] = None, 

963 ): 

964 self.connection_pool = connection_pool 

965 self.shard_hint = shard_hint 

966 self.ignore_subscribe_messages = ignore_subscribe_messages 

967 self.connection = None 

968 self.subscribed_event = threading.Event() 

969 # we need to know the encoding options for this connection in order 

970 # to lookup channel and pattern names for callback handlers. 

971 self.encoder = encoder 

972 self.push_handler_func = push_handler_func 

973 if event_dispatcher is None: 

974 self._event_dispatcher = EventDispatcher() 

975 else: 

976 self._event_dispatcher = event_dispatcher 

977 

978 self._lock = threading.RLock() 

979 if self.encoder is None: 

980 self.encoder = self.connection_pool.get_encoder() 

981 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) 

982 if self.encoder.decode_responses: 

983 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] 

984 else: 

985 self.health_check_response = [b"pong", self.health_check_response_b] 

986 if self.push_handler_func is None: 

987 _set_info_logger() 

988 self.reset() 

989 

990 def __enter__(self) -> "PubSub": 

991 return self 

992 

993 def __exit__(self, exc_type, exc_value, traceback) -> None: 

994 self.reset() 

995 

996 def __del__(self) -> None: 

997 try: 

998 # if this object went out of scope prior to shutting down 

999 # subscriptions, close the connection manually before 

1000 # returning it to the connection pool 

1001 self.reset() 

1002 except Exception: 

1003 pass 

1004 

1005 def reset(self) -> None: 

1006 if self.connection: 

1007 self.connection.disconnect() 

1008 self.connection.deregister_connect_callback(self.on_connect) 

1009 self.connection_pool.release(self.connection) 

1010 self.connection = None 

1011 self.health_check_response_counter = 0 

1012 self.channels = {} 

1013 self.pending_unsubscribe_channels = set() 

1014 self.shard_channels = {} 

1015 self.pending_unsubscribe_shard_channels = set() 

1016 self.patterns = {} 

1017 self.pending_unsubscribe_patterns = set() 

1018 self.subscribed_event.clear() 

1019 

1020 def close(self) -> None: 

1021 self.reset() 

1022 

1023 def _resubscribe(self, subscribed, subscribe_fn) -> None: 

1024 subscriptions_without_handlers, subscriptions_with_handlers = ( 

1025 partition_pubsub_subscriptions_by_handler(subscribed, self.encoder) 

1026 ) 

1027 if subscriptions_without_handlers or subscriptions_with_handlers: 

1028 subscribe_fn(*subscriptions_without_handlers, **subscriptions_with_handlers) 

1029 

1030 def _resubscribe_shard_channels(self) -> None: 

1031 self._resubscribe(self.shard_channels, self.ssubscribe) 

1032 

1033 def on_connect(self, connection) -> None: 

1034 "Re-subscribe to any channels and patterns previously subscribed to" 

1035 self.pending_unsubscribe_channels.clear() 

1036 self.pending_unsubscribe_patterns.clear() 

1037 self.pending_unsubscribe_shard_channels.clear() 

1038 if self.channels: 

1039 self._resubscribe(self.channels, self.subscribe) 

1040 if self.patterns: 

1041 self._resubscribe(self.patterns, self.psubscribe) 

1042 if self.shard_channels: 

1043 self._resubscribe_shard_channels() 

1044 

1045 @property 

1046 def subscribed(self) -> bool: 

1047 """Indicates if there are subscriptions to any channels or patterns""" 

1048 return self.subscribed_event.is_set() 

1049 

1050 def execute_command(self, *args): 

1051 """Execute a publish/subscribe command""" 

1052 

1053 # NOTE: don't parse the response in this function -- it could pull a 

1054 # legitimate message off the stack if the connection is already 

1055 # subscribed to one or more channels 

1056 

1057 if self.connection is None: 

1058 self.connection = self.connection_pool.get_connection() 

1059 # register a callback that re-subscribes to any channels we 

1060 # were listening to when we were disconnected 

1061 self.connection.register_connect_callback(self.on_connect) 

1062 if self.push_handler_func is not None: 

1063 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

1064 self._event_dispatcher.dispatch( 

1065 AfterPubSubConnectionInstantiationEvent( 

1066 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

1067 ) 

1068 ) 

1069 connection = self.connection 

1070 kwargs = {"check_health": not self.subscribed} 

1071 if not self.subscribed: 

1072 self.clean_health_check_responses() 

1073 with self._lock: 

1074 self._execute(connection, connection.send_command, *args, **kwargs) 

1075 

1076 def clean_health_check_responses(self) -> None: 

1077 """ 

1078 If any health check responses are present, clean them 

1079 """ 

1080 ttl = 10 

1081 conn = self.connection 

1082 while conn and self.health_check_response_counter > 0 and ttl > 0: 

1083 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout): 

1084 response = self._execute(conn, conn.read_response) 

1085 if self.is_health_check_response(response): 

1086 self.health_check_response_counter -= 1 

1087 else: 

1088 raise PubSubError( 

1089 "A non health check response was cleaned by " 

1090 "execute_command: {}".format(response) 

1091 ) 

1092 ttl -= 1 

1093 

1094 def _reconnect( 

1095 self, 

1096 conn, 

1097 error: Optional[Exception] = None, 

1098 failure_count: Optional[int] = None, 

1099 start_time: Optional[float] = None, 

1100 command_name: Optional[str] = None, 

1101 ) -> None: 

1102 """ 

1103 The supported exceptions are already checked in the 

1104 retry object so we don't need to do it here. 

1105 

1106 In this error handler we are trying to reconnect to the server. 

1107 """ 

1108 if error and failure_count <= conn.retry.get_retries(): 

1109 if command_name: 

1110 record_operation_duration( 

1111 command_name=command_name, 

1112 duration_seconds=time.monotonic() - start_time, 

1113 server_address=getattr(conn, "host", None), 

1114 server_port=getattr(conn, "port", None), 

1115 db_namespace=str(conn.db), 

1116 error=error, 

1117 retry_attempts=failure_count, 

1118 ) 

1119 conn.disconnect() 

1120 conn.connect() 

1121 

1122 def _execute(self, conn, command, *args, **kwargs): 

1123 """ 

1124 Connect manually upon disconnection. If the Redis server is down, 

1125 this will fail and raise a ConnectionError as desired. 

1126 After reconnection, the ``on_connect`` callback should have been 

1127 called by the # connection to resubscribe us to any channels and 

1128 patterns we were previously listening to 

1129 """ 

1130 

1131 if conn.should_reconnect(): 

1132 self._reconnect(conn) 

1133 

1134 if not len(args) == 0: 

1135 command_name = args[0] 

1136 else: 

1137 command_name = None 

1138 

1139 # Start timing for observability 

1140 start_time = time.monotonic() 

1141 # Track actual retry attempts for error reporting 

1142 actual_retry_attempts = [0] 

1143 

1144 def failure_callback(error, failure_count): 

1145 actual_retry_attempts[0] = failure_count 

1146 self._reconnect(conn, error, failure_count, start_time, command_name) 

1147 

1148 try: 

1149 response = conn.retry.call_with_retry( 

1150 lambda: command(*args, **kwargs), 

1151 failure_callback, 

1152 with_failure_count=True, 

1153 ) 

1154 

1155 if command_name: 

1156 record_operation_duration( 

1157 command_name=command_name, 

1158 duration_seconds=time.monotonic() - start_time, 

1159 server_address=getattr(conn, "host", None), 

1160 server_port=getattr(conn, "port", None), 

1161 db_namespace=str(conn.db), 

1162 ) 

1163 

1164 return response 

1165 except Exception as e: 

1166 record_error_count( 

1167 server_address=getattr(conn, "host", None), 

1168 server_port=getattr(conn, "port", None), 

1169 network_peer_address=getattr(conn, "host", None), 

1170 network_peer_port=getattr(conn, "port", None), 

1171 error_type=e, 

1172 retry_attempts=actual_retry_attempts[0], 

1173 is_internal=False, 

1174 ) 

1175 raise 

1176 

1177 def parse_response(self, block=True, timeout=0): 

1178 """ 

1179 Parse the response from a publish/subscribe command. 

1180 

1181 Args: 

1182 block: If True, block indefinitely until a message is available. 

1183 If False, return immediately if no message is available. 

1184 Default: True 

1185 timeout: The timeout in seconds for reading a response when block=False. 

1186 This parameter is ignored when block=True. 

1187 Default: 0 (return immediately if no data available) 

1188 

1189 Returns: 

1190 The parsed response from the server, or None if no message is available 

1191 within the timeout period (when block=False). 

1192 

1193 Important: 

1194 The block and timeout parameters work together: 

1195 - When block=True: timeout is IGNORED, method blocks indefinitely 

1196 - When block=False: timeout is USED, method returns after timeout expires 

1197 

1198 Typically, you should use get_message(timeout=X) instead of calling 

1199 parse_response() directly. The get_message() method automatically sets 

1200 block=False when a timeout is provided, and block=True when timeout=None. 

1201 

1202 Example: 

1203 # Block indefinitely (timeout is ignored) 

1204 response = pubsub.parse_response(block=True, timeout=0.1) 

1205 

1206 # Non-blocking with 0.1 second timeout 

1207 response = pubsub.parse_response(block=False, timeout=0.1) 

1208 

1209 # Non-blocking, return immediately 

1210 response = pubsub.parse_response(block=False, timeout=0) 

1211 

1212 # Recommended: use get_message() instead 

1213 msg = pubsub.get_message(timeout=0.1) # automatically sets block=False 

1214 msg = pubsub.get_message(timeout=None) # automatically sets block=True 

1215 """ 

1216 conn = self.connection 

1217 if conn is None: 

1218 raise RuntimeError( 

1219 "pubsub connection not set: " 

1220 "did you forget to call subscribe() or psubscribe()?" 

1221 ) 

1222 

1223 self.check_health() 

1224 

1225 def try_read(): 

1226 if not block: 

1227 if not conn.can_read(timeout=timeout): 

1228 return None 

1229 read_timeout = timeout 

1230 else: 

1231 conn.connect() 

1232 read_timeout = SENTINEL # Use default socket timeout for blocking 

1233 return conn.read_response( 

1234 disconnect_on_error=False, push_request=True, timeout=read_timeout 

1235 ) 

1236 

1237 response = self._execute(conn, try_read) 

1238 

1239 if self.is_health_check_response(response): 

1240 # ignore the health check message as user might not expect it 

1241 self.health_check_response_counter -= 1 

1242 return None 

1243 return response 

1244 

1245 def is_health_check_response(self, response) -> bool: 

1246 """ 

1247 Check if the response is a health check response. 

1248 If there are no subscriptions redis responds to PING command with a 

1249 bulk response, instead of a multi-bulk with "pong" and the response. 

1250 """ 

1251 if self.encoder.decode_responses: 

1252 return ( 

1253 response 

1254 in [ 

1255 self.health_check_response, # If there is a subscription 

1256 self.HEALTH_CHECK_MESSAGE, # If there are no subscriptions and decode_responses=True 

1257 ] 

1258 ) 

1259 else: 

1260 return ( 

1261 response 

1262 in [ 

1263 self.health_check_response, # If there is a subscription 

1264 self.health_check_response_b, # If there isn't a subscription and decode_responses=False 

1265 ] 

1266 ) 

1267 

1268 def check_health(self) -> None: 

1269 conn = self.connection 

1270 if conn is None: 

1271 raise RuntimeError( 

1272 "pubsub connection not set: " 

1273 "did you forget to call subscribe() or psubscribe()?" 

1274 ) 

1275 

1276 if conn.health_check_interval and time.monotonic() > conn.next_health_check: 

1277 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False) 

1278 self.health_check_response_counter += 1 

1279 

1280 def _normalize_keys(self, data) -> Dict: 

1281 """ 

1282 normalize channel/pattern names to be either bytes or strings 

1283 based on whether responses are automatically decoded. this saves us 

1284 from coercing the value for each message coming in. 

1285 """ 

1286 encode = self.encoder.encode 

1287 decode = self.encoder.decode 

1288 return {decode(encode(k)): v for k, v in data.items()} 

1289 

1290 def psubscribe(self, *args, **kwargs): 

1291 """ 

1292 Subscribe to channel patterns. Patterns supplied as keyword arguments 

1293 expect a pattern name as the key and a callable as the value. A 

1294 pattern's callable will be invoked automatically when a message is 

1295 received on that pattern rather than producing a message via 

1296 ``listen()``. 

1297 """ 

1298 if args: 

1299 args = list_or_args(args[0], args[1:]) 

1300 new_patterns = dict.fromkeys(args) 

1301 new_patterns.update(kwargs) 

1302 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys()) 

1303 # update the patterns dict AFTER we send the command. we don't want to 

1304 # subscribe twice to these patterns, once for the command and again 

1305 # for the reconnection. 

1306 new_patterns = self._normalize_keys(new_patterns) 

1307 self.patterns.update(new_patterns) 

1308 if not self.subscribed: 

1309 # Set the subscribed_event flag to True 

1310 self.subscribed_event.set() 

1311 # Clear the health check counter 

1312 self.health_check_response_counter = 0 

1313 self.pending_unsubscribe_patterns.difference_update(new_patterns) 

1314 return ret_val 

1315 

1316 def punsubscribe(self, *args): 

1317 """ 

1318 Unsubscribe from the supplied patterns. If empty, unsubscribe from 

1319 all patterns. 

1320 """ 

1321 if args: 

1322 args = list_or_args(args[0], args[1:]) 

1323 patterns = self._normalize_keys(dict.fromkeys(args)) 

1324 else: 

1325 patterns = self.patterns 

1326 self.pending_unsubscribe_patterns.update(patterns) 

1327 return self.execute_command("PUNSUBSCRIBE", *args) 

1328 

1329 def subscribe(self, *args, **kwargs): 

1330 """ 

1331 Subscribe to channels. Channels supplied as keyword arguments expect 

1332 a channel name as the key and a callable as the value. A channel's 

1333 callable will be invoked automatically when a message is received on 

1334 that channel rather than producing a message via ``listen()`` or 

1335 ``get_message()``. 

1336 """ 

1337 if args: 

1338 args = list_or_args(args[0], args[1:]) 

1339 new_channels = dict.fromkeys(args) 

1340 new_channels.update(kwargs) 

1341 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys()) 

1342 # update the channels dict AFTER we send the command. we don't want to 

1343 # subscribe twice to these channels, once for the command and again 

1344 # for the reconnection. 

1345 new_channels = self._normalize_keys(new_channels) 

1346 self.channels.update(new_channels) 

1347 if not self.subscribed: 

1348 # Set the subscribed_event flag to True 

1349 self.subscribed_event.set() 

1350 # Clear the health check counter 

1351 self.health_check_response_counter = 0 

1352 self.pending_unsubscribe_channels.difference_update(new_channels) 

1353 return ret_val 

1354 

1355 def unsubscribe(self, *args): 

1356 """ 

1357 Unsubscribe from the supplied channels. If empty, unsubscribe from 

1358 all channels 

1359 """ 

1360 if args: 

1361 args = list_or_args(args[0], args[1:]) 

1362 channels = self._normalize_keys(dict.fromkeys(args)) 

1363 else: 

1364 channels = self.channels 

1365 self.pending_unsubscribe_channels.update(channels) 

1366 return self.execute_command("UNSUBSCRIBE", *args) 

1367 

1368 def ssubscribe(self, *args, target_node=None, **kwargs): 

1369 """ 

1370 Subscribes the client to the specified shard channels. 

1371 Channels supplied as keyword arguments expect a channel name as the key 

1372 and a callable as the value. A channel's callable will be invoked automatically 

1373 when a message is received on that channel rather than producing a message via 

1374 ``listen()`` or ``get_sharded_message()``. 

1375 """ 

1376 if args: 

1377 args = list_or_args(args[0], args[1:]) 

1378 new_s_channels = dict.fromkeys(args) 

1379 new_s_channels.update(kwargs) 

1380 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys()) 

1381 # update the s_channels dict AFTER we send the command. we don't want to 

1382 # subscribe twice to these channels, once for the command and again 

1383 # for the reconnection. 

1384 new_s_channels = self._normalize_keys(new_s_channels) 

1385 self.shard_channels.update(new_s_channels) 

1386 if not self.subscribed: 

1387 # Set the subscribed_event flag to True 

1388 self.subscribed_event.set() 

1389 # Clear the health check counter 

1390 self.health_check_response_counter = 0 

1391 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels) 

1392 return ret_val 

1393 

1394 def sunsubscribe(self, *args, target_node=None): 

1395 """ 

1396 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from 

1397 all shard_channels 

1398 """ 

1399 if args: 

1400 args = list_or_args(args[0], args[1:]) 

1401 s_channels = self._normalize_keys(dict.fromkeys(args)) 

1402 else: 

1403 s_channels = self.shard_channels 

1404 self.pending_unsubscribe_shard_channels.update(s_channels) 

1405 return self.execute_command("SUNSUBSCRIBE", *args) 

1406 

1407 def listen(self): 

1408 "Listen for messages on channels this client has been subscribed to" 

1409 while self.subscribed: 

1410 response = self.handle_message(self.parse_response(block=True)) 

1411 if response is not None: 

1412 yield response 

1413 

1414 def get_message( 

1415 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0 

1416 ): 

1417 """ 

1418 Get the next message if one is available, otherwise None. 

1419 

1420 If timeout is specified, the system will wait for `timeout` seconds 

1421 before returning. Timeout should be specified as a floating point 

1422 number, or None, to wait indefinitely. 

1423 """ 

1424 if not self.subscribed: 

1425 # Wait for subscription 

1426 start_time = time.monotonic() 

1427 if self.subscribed_event.wait(timeout) is True: 

1428 # The connection was subscribed during the timeout time frame. 

1429 # The timeout should be adjusted based on the time spent 

1430 # waiting for the subscription 

1431 time_spent = time.monotonic() - start_time 

1432 timeout = max(0.0, timeout - time_spent) 

1433 else: 

1434 # The connection isn't subscribed to any channels or patterns, 

1435 # so no messages are available 

1436 return None 

1437 

1438 response = self.parse_response(block=(timeout is None), timeout=timeout) 

1439 

1440 if response: 

1441 return self.handle_message(response, ignore_subscribe_messages) 

1442 return None 

1443 

1444 get_sharded_message = get_message 

1445 

1446 def ping(self, message: Union[str, None] = None) -> bool: 

1447 """ 

1448 Ping the Redis server to test connectivity. 

1449 

1450 Sends a PING command to the Redis server and returns True if the server 

1451 responds with "PONG". 

1452 """ 

1453 args = ["PING", message] if message is not None else ["PING"] 

1454 return self.execute_command(*args) 

1455 

1456 def handle_message(self, response, ignore_subscribe_messages=False): 

1457 """ 

1458 Parses a pub/sub message. If the channel or pattern was subscribed to 

1459 with a message handler, the handler is invoked instead of a parsed 

1460 message being returned. 

1461 """ 

1462 if response is None: 

1463 return None 

1464 if isinstance(response, bytes): 

1465 response = [b"pong", response] if response != b"PONG" else [b"pong", b""] 

1466 

1467 message_type = str_if_bytes(response[0]) 

1468 if message_type == "pmessage": 

1469 message = { 

1470 "type": message_type, 

1471 "pattern": response[1], 

1472 "channel": response[2], 

1473 "data": response[3], 

1474 } 

1475 elif message_type == "pong": 

1476 message = { 

1477 "type": message_type, 

1478 "pattern": None, 

1479 "channel": None, 

1480 "data": response[1], 

1481 } 

1482 else: 

1483 message = { 

1484 "type": message_type, 

1485 "pattern": None, 

1486 "channel": response[1], 

1487 "data": response[2], 

1488 } 

1489 

1490 if message_type in ["message", "pmessage"]: 

1491 channel = str_if_bytes(message["channel"]) 

1492 record_pubsub_message( 

1493 direction=PubSubDirection.RECEIVE, 

1494 channel=channel, 

1495 ) 

1496 elif message_type == "smessage": 

1497 channel = str_if_bytes(message["channel"]) 

1498 record_pubsub_message( 

1499 direction=PubSubDirection.RECEIVE, 

1500 channel=channel, 

1501 sharded=True, 

1502 ) 

1503 

1504 # if this is an unsubscribe message, remove it from memory 

1505 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 

1506 if message_type == "punsubscribe": 

1507 pattern = response[1] 

1508 if pattern in self.pending_unsubscribe_patterns: 

1509 self.pending_unsubscribe_patterns.remove(pattern) 

1510 self.patterns.pop(pattern, None) 

1511 elif message_type == "sunsubscribe": 

1512 s_channel = response[1] 

1513 if s_channel in self.pending_unsubscribe_shard_channels: 

1514 self.pending_unsubscribe_shard_channels.remove(s_channel) 

1515 self.shard_channels.pop(s_channel, None) 

1516 else: 

1517 channel = response[1] 

1518 if channel in self.pending_unsubscribe_channels: 

1519 self.pending_unsubscribe_channels.remove(channel) 

1520 self.channels.pop(channel, None) 

1521 if not self.channels and not self.patterns and not self.shard_channels: 

1522 # There are no subscriptions anymore, set subscribed_event flag 

1523 # to false 

1524 self.subscribed_event.clear() 

1525 

1526 if message_type in self.PUBLISH_MESSAGE_TYPES: 

1527 # if there's a message handler, invoke it 

1528 if message_type == "pmessage": 

1529 handler = self.patterns.get(message["pattern"], None) 

1530 elif message_type == "smessage": 

1531 handler = self.shard_channels.get(message["channel"], None) 

1532 else: 

1533 handler = self.channels.get(message["channel"], None) 

1534 if handler: 

1535 handler(message) 

1536 return None 

1537 elif message_type != "pong": 

1538 # this is a subscribe/unsubscribe message. ignore if we don't 

1539 # want them 

1540 if ignore_subscribe_messages or self.ignore_subscribe_messages: 

1541 return None 

1542 

1543 return message 

1544 

1545 def run_in_thread( 

1546 self, 

1547 sleep_time: float = 0.0, 

1548 daemon: bool = False, 

1549 exception_handler: Optional[Callable] = None, 

1550 pubsub=None, 

1551 sharded_pubsub: bool = False, 

1552 ) -> "PubSubWorkerThread": 

1553 for channel, handler in self.channels.items(): 

1554 if handler is None: 

1555 raise PubSubError(f"Channel: '{channel}' has no handler registered") 

1556 for pattern, handler in self.patterns.items(): 

1557 if handler is None: 

1558 raise PubSubError(f"Pattern: '{pattern}' has no handler registered") 

1559 for s_channel, handler in self.shard_channels.items(): 

1560 if handler is None: 

1561 raise PubSubError( 

1562 f"Shard Channel: '{s_channel}' has no handler registered" 

1563 ) 

1564 

1565 pubsub = self if pubsub is None else pubsub 

1566 thread = PubSubWorkerThread( 

1567 pubsub, 

1568 sleep_time, 

1569 daemon=daemon, 

1570 exception_handler=exception_handler, 

1571 sharded_pubsub=sharded_pubsub, 

1572 ) 

1573 thread.start() 

1574 return thread 

1575 

1576 

1577class PubSubWorkerThread(threading.Thread): 

1578 def __init__( 

1579 self, 

1580 pubsub, 

1581 sleep_time: float, 

1582 daemon: bool = False, 

1583 exception_handler: Union[ 

1584 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None 

1585 ] = None, 

1586 sharded_pubsub: bool = False, 

1587 ): 

1588 super().__init__() 

1589 self.daemon = daemon 

1590 self.pubsub = pubsub 

1591 self.sleep_time = sleep_time 

1592 self.exception_handler = exception_handler 

1593 self.sharded_pubsub = sharded_pubsub 

1594 self._running = threading.Event() 

1595 

1596 def run(self) -> None: 

1597 if self._running.is_set(): 

1598 return 

1599 self._running.set() 

1600 pubsub = self.pubsub 

1601 sleep_time = self.sleep_time 

1602 while self._running.is_set(): 

1603 try: 

1604 if not self.sharded_pubsub: 

1605 pubsub.get_message( 

1606 ignore_subscribe_messages=True, timeout=sleep_time 

1607 ) 

1608 else: 

1609 pubsub.get_sharded_message( 

1610 ignore_subscribe_messages=True, timeout=sleep_time 

1611 ) 

1612 except BaseException as e: 

1613 if self.exception_handler is None: 

1614 raise 

1615 self.exception_handler(e, pubsub, self) 

1616 pubsub.close() 

1617 

1618 def stop(self) -> None: 

1619 # trip the flag so the run loop exits. the run loop will 

1620 # close the pubsub connection, which disconnects the socket 

1621 # and returns the connection to the pool. 

1622 self._running.clear() 

1623 

1624 

1625class Pipeline(Redis): 

1626 """ 

1627 Pipelines provide a way to transmit multiple commands to the Redis server 

1628 in one transmission. This is convenient for batch processing, such as 

1629 saving all the values in a list to Redis. 

1630 

1631 All commands executed within a pipeline(when running in transactional mode, 

1632 which is the default behavior) are wrapped with MULTI and EXEC 

1633 calls. This guarantees all commands executed in the pipeline will be 

1634 executed atomically. 

1635 

1636 Any command raising an exception does *not* halt the execution of 

1637 subsequent commands in the pipeline. Instead, the exception is caught 

1638 and its instance is placed into the response list returned by execute(). 

1639 Code iterating over the response list should be able to deal with an 

1640 instance of an exception as a potential value. In general, these will be 

1641 ResponseError exceptions, such as those raised when issuing a command 

1642 on a key of a different datatype. 

1643 """ 

1644 

1645 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

1646 

1647 def __init__( 

1648 self, 

1649 connection_pool: ConnectionPool, 

1650 response_callbacks, 

1651 transaction, 

1652 shard_hint, 

1653 ): 

1654 self.connection_pool = connection_pool 

1655 self.connection: Optional[Connection] = None 

1656 self.response_callbacks = response_callbacks 

1657 self.transaction = transaction 

1658 self.shard_hint = shard_hint 

1659 self.watching = False 

1660 self.command_stack = [] 

1661 self.scripts: Set[Script] = set() 

1662 self.explicit_transaction = False 

1663 

1664 def __enter__(self) -> "Pipeline": 

1665 return self 

1666 

1667 def __exit__(self, exc_type, exc_value, traceback): 

1668 self.reset() 

1669 

1670 def __del__(self): 

1671 try: 

1672 self.reset() 

1673 except Exception: 

1674 pass 

1675 

1676 def __len__(self) -> int: 

1677 return len(self.command_stack) 

1678 

1679 def __bool__(self) -> bool: 

1680 """Pipeline instances should always evaluate to True""" 

1681 return True 

1682 

1683 def reset(self) -> None: 

1684 self.command_stack = [] 

1685 self.scripts = set() 

1686 # make sure to reset the connection state in the event that we were 

1687 # watching something 

1688 if self.watching and self.connection: 

1689 try: 

1690 # call this manually since our unwatch or 

1691 # immediate_execute_command methods can call reset() 

1692 self.connection.send_command("UNWATCH") 

1693 self.connection.read_response() 

1694 except ConnectionError: 

1695 # disconnect will also remove any previous WATCHes 

1696 self.connection.disconnect() 

1697 # clean up the other instance attributes 

1698 self.watching = False 

1699 self.explicit_transaction = False 

1700 

1701 # we can safely return the connection to the pool here since we're 

1702 # sure we're no longer WATCHing anything 

1703 if self.connection: 

1704 self.connection_pool.release(self.connection) 

1705 self.connection = None 

1706 

1707 def close(self) -> None: 

1708 """Close the pipeline""" 

1709 self.reset() 

1710 

1711 def multi(self) -> None: 

1712 """ 

1713 Start a transactional block of the pipeline after WATCH commands 

1714 are issued. End the transactional block with `execute`. 

1715 """ 

1716 if self.explicit_transaction: 

1717 raise RedisError("Cannot issue nested calls to MULTI") 

1718 if self.command_stack: 

1719 raise RedisError( 

1720 "Commands without an initial WATCH have already been issued" 

1721 ) 

1722 self.explicit_transaction = True 

1723 

1724 def execute_command(self, *args, **kwargs): 

1725 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction: 

1726 return self.immediate_execute_command(*args, **kwargs) 

1727 return self.pipeline_execute_command(*args, **kwargs) 

1728 

1729 def _disconnect_reset_raise_on_watching( 

1730 self, 

1731 conn: AbstractConnection, 

1732 error: Exception, 

1733 failure_count: Optional[int] = None, 

1734 start_time: Optional[float] = None, 

1735 command_name: Optional[str] = None, 

1736 ) -> None: 

1737 """ 

1738 Close the connection reset watching state and 

1739 raise an exception if we were watching. 

1740 

1741 The supported exceptions are already checked in the 

1742 retry object so we don't need to do it here. 

1743 

1744 After we disconnect the connection, it will try to reconnect and 

1745 do a health check as part of the send_command logic(on connection level). 

1746 """ 

1747 if error and failure_count <= conn.retry.get_retries(): 

1748 record_operation_duration( 

1749 command_name=command_name, 

1750 duration_seconds=time.monotonic() - start_time, 

1751 server_address=getattr(conn, "host", None), 

1752 server_port=getattr(conn, "port", None), 

1753 db_namespace=str(conn.db), 

1754 error=error, 

1755 retry_attempts=failure_count, 

1756 ) 

1757 conn.disconnect() 

1758 

1759 # if we were already watching a variable, the watch is no longer 

1760 # valid since this connection has died. raise a WatchError, which 

1761 # indicates the user should retry this transaction. 

1762 if self.watching: 

1763 self.reset() 

1764 raise WatchError( 

1765 f"A {type(error).__name__} occurred while watching one or more keys" 

1766 ) 

1767 

1768 def immediate_execute_command(self, *args, **options): 

1769 """ 

1770 Execute a command immediately, but don't auto-retry on the supported 

1771 errors for retry if we're already WATCHing a variable. 

1772 Used when issuing WATCH or subsequent commands retrieving their values but before 

1773 MULTI is called. 

1774 """ 

1775 command_name = args[0] 

1776 conn = self.connection 

1777 # if this is the first call, we need a connection 

1778 if not conn: 

1779 conn = self.connection_pool.get_connection() 

1780 self.connection = conn 

1781 

1782 # Start timing for observability 

1783 start_time = time.monotonic() 

1784 # Track actual retry attempts for error reporting 

1785 actual_retry_attempts = [0] 

1786 

1787 def failure_callback(error, failure_count): 

1788 if is_debug_log_enabled(): 

1789 add_debug_log_for_operation_failure(conn) 

1790 actual_retry_attempts[0] = failure_count 

1791 self._disconnect_reset_raise_on_watching( 

1792 conn, error, failure_count, start_time, command_name 

1793 ) 

1794 

1795 try: 

1796 response = conn.retry.call_with_retry( 

1797 lambda: self._send_command_parse_response( 

1798 conn, command_name, *args, **options 

1799 ), 

1800 failure_callback, 

1801 with_failure_count=True, 

1802 ) 

1803 

1804 record_operation_duration( 

1805 command_name=command_name, 

1806 duration_seconds=time.monotonic() - start_time, 

1807 server_address=getattr(conn, "host", None), 

1808 server_port=getattr(conn, "port", None), 

1809 db_namespace=str(conn.db), 

1810 ) 

1811 

1812 return response 

1813 except Exception as e: 

1814 record_error_count( 

1815 server_address=getattr(conn, "host", None), 

1816 server_port=getattr(conn, "port", None), 

1817 network_peer_address=getattr(conn, "host", None), 

1818 network_peer_port=getattr(conn, "port", None), 

1819 error_type=e, 

1820 retry_attempts=actual_retry_attempts[0], 

1821 is_internal=False, 

1822 ) 

1823 raise 

1824 

1825 def pipeline_execute_command(self, *args, **options) -> "Pipeline": 

1826 """ 

1827 Stage a command to be executed when execute() is next called 

1828 

1829 Returns the current Pipeline object back so commands can be 

1830 chained together, such as: 

1831 

1832 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

1833 

1834 At some other point, you can then run: pipe.execute(), 

1835 which will execute all commands queued in the pipe. 

1836 """ 

1837 self.command_stack.append((args, options)) 

1838 return self 

1839 

1840 def _execute_transaction( 

1841 self, connection: Connection, commands, raise_on_error 

1842 ) -> List: 

1843 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) 

1844 all_cmds = connection.pack_commands( 

1845 [args for args, options in cmds if EMPTY_RESPONSE not in options] 

1846 ) 

1847 connection.send_packed_command(all_cmds) 

1848 errors = [] 

1849 

1850 # parse off the response for MULTI 

1851 # NOTE: we need to handle ResponseErrors here and continue 

1852 # so that we read all the additional command messages from 

1853 # the socket 

1854 try: 

1855 self.parse_response(connection, "_") 

1856 except ResponseError as e: 

1857 errors.append((0, e)) 

1858 

1859 # and all the other commands 

1860 for i, command in enumerate(commands): 

1861 if EMPTY_RESPONSE in command[1]: 

1862 errors.append((i, command[1][EMPTY_RESPONSE])) 

1863 else: 

1864 try: 

1865 self.parse_response(connection, "_") 

1866 except ResponseError as e: 

1867 self.annotate_exception(e, i + 1, command[0]) 

1868 errors.append((i, e)) 

1869 

1870 # parse the EXEC. 

1871 try: 

1872 response = self.parse_response(connection, "_") 

1873 except ExecAbortError: 

1874 if errors: 

1875 raise errors[0][1] 

1876 raise 

1877 

1878 # EXEC clears any watched keys 

1879 self.watching = False 

1880 

1881 if response is None: 

1882 raise WatchError("Watched variable changed.") 

1883 

1884 # put any parse errors into the response 

1885 for i, e in errors: 

1886 response.insert(i, e) 

1887 

1888 if len(response) != len(commands): 

1889 self.connection.disconnect() 

1890 raise ResponseError( 

1891 "Wrong number of response items from pipeline execution" 

1892 ) 

1893 

1894 # find any errors in the response and raise if necessary 

1895 if raise_on_error: 

1896 self.raise_first_error(commands, response) 

1897 

1898 # We have to run response callbacks manually 

1899 data = [] 

1900 for r, cmd in zip(response, commands): 

1901 if not isinstance(r, Exception): 

1902 args, options = cmd 

1903 # Remove keys entry, it needs only for cache. 

1904 options.pop("keys", None) 

1905 command_name = args[0] 

1906 if command_name in self.response_callbacks: 

1907 r = self.response_callbacks[command_name](r, **options) 

1908 data.append(r) 

1909 

1910 return data 

1911 

1912 def _execute_pipeline(self, connection, commands, raise_on_error): 

1913 # build up all commands into a single request to increase network perf 

1914 all_cmds = connection.pack_commands([args for args, _ in commands]) 

1915 connection.send_packed_command(all_cmds) 

1916 

1917 responses = [] 

1918 for args, options in commands: 

1919 try: 

1920 responses.append(self.parse_response(connection, args[0], **options)) 

1921 except ResponseError as e: 

1922 responses.append(e) 

1923 

1924 if raise_on_error: 

1925 self.raise_first_error(commands, responses) 

1926 

1927 return responses 

1928 

1929 def raise_first_error(self, commands, response): 

1930 for i, r in enumerate(response): 

1931 if isinstance(r, ResponseError): 

1932 self.annotate_exception(r, i + 1, commands[i][0]) 

1933 raise r 

1934 

1935 def annotate_exception(self, exception, number, command): 

1936 cmd = " ".join(map(safe_str, command)) 

1937 msg = ( 

1938 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

1939 f"caused error: {exception.args[0]}" 

1940 ) 

1941 exception.args = (msg,) + exception.args[1:] 

1942 

1943 def parse_response(self, connection, command_name, **options): 

1944 result = Redis.parse_response(self, connection, command_name, **options) 

1945 if command_name in self.UNWATCH_COMMANDS: 

1946 self.watching = False 

1947 elif command_name == "WATCH": 

1948 self.watching = True 

1949 return result 

1950 

1951 def load_scripts(self): 

1952 # make sure all scripts that are about to be run on this pipeline exist 

1953 scripts = list(self.scripts) 

1954 immediate = self.immediate_execute_command 

1955 shas = [s.sha for s in scripts] 

1956 # we can't use the normal script_* methods because they would just 

1957 # get buffered in the pipeline. 

1958 exists = immediate("SCRIPT EXISTS", *shas) 

1959 if not all(exists): 

1960 for s, exist in zip(scripts, exists): 

1961 if not exist: 

1962 s.sha = immediate("SCRIPT LOAD", s.script) 

1963 

1964 def _disconnect_raise_on_watching( 

1965 self, 

1966 conn: AbstractConnection, 

1967 error: Exception, 

1968 failure_count: Optional[int] = None, 

1969 start_time: Optional[float] = None, 

1970 command_name: Optional[str] = None, 

1971 ) -> None: 

1972 """ 

1973 Close the connection, raise an exception if we were watching. 

1974 

1975 The supported exceptions are already checked in the 

1976 retry object so we don't need to do it here. 

1977 

1978 After we disconnect the connection, it will try to reconnect and 

1979 do a health check as part of the send_command logic(on connection level). 

1980 """ 

1981 if error and failure_count <= conn.retry.get_retries(): 

1982 record_operation_duration( 

1983 command_name=command_name, 

1984 duration_seconds=time.monotonic() - start_time, 

1985 server_address=getattr(conn, "host", None), 

1986 server_port=getattr(conn, "port", None), 

1987 db_namespace=str(conn.db), 

1988 error=error, 

1989 retry_attempts=failure_count, 

1990 ) 

1991 conn.disconnect() 

1992 # if we were watching a variable, the watch is no longer valid 

1993 # since this connection has died. raise a WatchError, which 

1994 # indicates the user should retry this transaction. 

1995 if self.watching: 

1996 raise WatchError( 

1997 f"A {type(error).__name__} occurred while watching one or more keys" 

1998 ) 

1999 

2000 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2001 """Execute all the commands in the current pipeline""" 

2002 stack = self.command_stack 

2003 if not stack and not self.watching: 

2004 return [] 

2005 if self.scripts: 

2006 self.load_scripts() 

2007 if self.transaction or self.explicit_transaction: 

2008 execute = self._execute_transaction 

2009 operation_name = "MULTI" 

2010 else: 

2011 execute = self._execute_pipeline 

2012 operation_name = "PIPELINE" 

2013 

2014 conn = self.connection 

2015 if not conn: 

2016 conn = self.connection_pool.get_connection() 

2017 # assign to self.connection so reset() releases the connection 

2018 # back to the pool after we're done 

2019 self.connection = conn 

2020 

2021 # Start timing for observability 

2022 start_time = time.monotonic() 

2023 # Track actual retry attempts for error reporting 

2024 actual_retry_attempts = [0] 

2025 

2026 def failure_callback(error, failure_count): 

2027 if is_debug_log_enabled(): 

2028 add_debug_log_for_operation_failure(conn) 

2029 actual_retry_attempts[0] = failure_count 

2030 self._disconnect_raise_on_watching( 

2031 conn, error, failure_count, start_time, operation_name 

2032 ) 

2033 

2034 try: 

2035 response = conn.retry.call_with_retry( 

2036 lambda: execute(conn, stack, raise_on_error), 

2037 failure_callback, 

2038 with_failure_count=True, 

2039 ) 

2040 

2041 record_operation_duration( 

2042 command_name=operation_name, 

2043 duration_seconds=time.monotonic() - start_time, 

2044 server_address=getattr(conn, "host", None), 

2045 server_port=getattr(conn, "port", None), 

2046 db_namespace=str(conn.db), 

2047 ) 

2048 return response 

2049 except Exception as e: 

2050 record_error_count( 

2051 server_address=getattr(conn, "host", None), 

2052 server_port=getattr(conn, "port", None), 

2053 network_peer_address=getattr(conn, "host", None), 

2054 network_peer_port=getattr(conn, "port", None), 

2055 error_type=e, 

2056 retry_attempts=actual_retry_attempts[0], 

2057 is_internal=False, 

2058 ) 

2059 raise 

2060 

2061 finally: 

2062 # in reset() the connection is disconnected before returned to the pool if 

2063 # it is marked for reconnect. 

2064 self.reset() 

2065 

2066 def discard(self): 

2067 """ 

2068 Flushes all previously queued commands 

2069 See: https://redis.io/commands/DISCARD 

2070 """ 

2071 self.execute_command("DISCARD") 

2072 

2073 def watch(self, *names): 

2074 """Watches the values at keys ``names``""" 

2075 if self.explicit_transaction: 

2076 raise RedisError("Cannot issue a WATCH after a MULTI") 

2077 return self.execute_command("WATCH", *names) 

2078 

2079 def unwatch(self) -> bool: 

2080 """Unwatches all previously specified keys""" 

2081 return self.watching and self.execute_command("UNWATCH") or True