Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

808 statements  

1import copy 

2import logging 

3import re 

4import threading 

5import time 

6from itertools import chain 

7from typing import ( 

8 TYPE_CHECKING, 

9 Any, 

10 Callable, 

11 Dict, 

12 List, 

13 Mapping, 

14 Optional, 

15 Set, 

16 Type, 

17 Union, 

18) 

19 

20from redis._parsers.encoders import Encoder 

21from redis._parsers.helpers import ( 

22 _RedisCallbacks, 

23 _RedisCallbacksRESP2, 

24 _RedisCallbacksRESP3, 

25 bool_ok, 

26) 

27from redis.backoff import ExponentialWithJitterBackoff 

28from redis.cache import CacheConfig, CacheInterface 

29from redis.commands import ( 

30 CoreCommands, 

31 RedisModuleCommands, 

32 SentinelCommands, 

33 list_or_args, 

34) 

35from redis.commands.core import Script 

36from redis.connection import ( 

37 AbstractConnection, 

38 Connection, 

39 ConnectionPool, 

40 SSLConnection, 

41 UnixDomainSocketConnection, 

42) 

43from redis.credentials import CredentialProvider 

44from redis.driver_info import DriverInfo, resolve_driver_info 

45from redis.event import ( 

46 AfterPooledConnectionsInstantiationEvent, 

47 AfterPubSubConnectionInstantiationEvent, 

48 AfterSingleConnectionInstantiationEvent, 

49 ClientType, 

50 EventDispatcher, 

51) 

52from redis.exceptions import ( 

53 ConnectionError, 

54 ExecAbortError, 

55 PubSubError, 

56 RedisError, 

57 ResponseError, 

58 WatchError, 

59) 

60from redis.lock import Lock 

61from redis.maint_notifications import ( 

62 MaintNotificationsConfig, 

63 OSSMaintNotificationsHandler, 

64) 

65from redis.observability.attributes import PubSubDirection 

66from redis.observability.recorder import ( 

67 record_error_count, 

68 record_operation_duration, 

69 record_pubsub_message, 

70) 

71from redis.retry import Retry 

72from redis.utils import ( 

73 _set_info_logger, 

74 check_protocol_version, 

75 deprecated_args, 

76 safe_str, 

77 str_if_bytes, 

78 truncate_text, 

79) 

80 

81if TYPE_CHECKING: 

82 import ssl 

83 

84 import OpenSSL 

85 

86SYM_EMPTY = b"" 

87EMPTY_RESPONSE = "EMPTY_RESPONSE" 

88 

89# some responses (ie. dump) are binary, and just meant to never be decoded 

90NEVER_DECODE = "NEVER_DECODE" 

91 

92 

93logger = logging.getLogger(__name__) 

94 

95 

96def is_debug_log_enabled(): 

97 return logger.isEnabledFor(logging.DEBUG) 

98 

99 

100def add_debug_log_for_operation_failure(connection: "AbstractConnection"): 

101 logger.debug( 

102 f"Operation failed, " 

103 f"with connection: {connection}, details: {connection.extract_connection_details() if connection else 'no connection'}", 

104 ) 

105 

106 

107class CaseInsensitiveDict(dict): 

108 "Case insensitive dict implementation. Assumes string keys only." 

109 

110 def __init__(self, data: Dict[str, str]) -> None: 

111 for k, v in data.items(): 

112 self[k.upper()] = v 

113 

114 def __contains__(self, k): 

115 return super().__contains__(k.upper()) 

116 

117 def __delitem__(self, k): 

118 super().__delitem__(k.upper()) 

119 

120 def __getitem__(self, k): 

121 return super().__getitem__(k.upper()) 

122 

123 def get(self, k, default=None): 

124 return super().get(k.upper(), default) 

125 

126 def __setitem__(self, k, v): 

127 super().__setitem__(k.upper(), v) 

128 

129 def update(self, data): 

130 data = CaseInsensitiveDict(data) 

131 super().update(data) 

132 

133 

134class AbstractRedis: 

135 pass 

136 

137 

138class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): 

139 """ 

140 Implementation of the Redis protocol. 

141 

142 This abstract class provides a Python interface to all Redis commands 

143 and an implementation of the Redis protocol. 

144 

145 Pipelines derive from this, implementing how 

146 the commands are sent and received to the Redis server. Based on 

147 configuration, an instance will either use a ConnectionPool, or 

148 Connection object to talk to redis. 

149 

150 It is not safe to pass PubSub or Pipeline objects between threads. 

151 """ 

152 

153 @classmethod 

154 def from_url(cls, url: str, **kwargs) -> "Redis": 

155 """ 

156 Return a Redis client object configured from the given URL 

157 

158 For example:: 

159 

160 redis://[[username]:[password]]@localhost:6379/0 

161 rediss://[[username]:[password]]@localhost:6379/0 

162 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

163 

164 Three URL schemes are supported: 

165 

166 - `redis://` creates a TCP socket connection. See more at: 

167 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

168 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

169 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

170 - ``unix://``: creates a Unix Domain Socket connection. 

171 

172 The username, password, hostname, path and all querystring values 

173 are passed through urllib.parse.unquote in order to replace any 

174 percent-encoded values with their corresponding characters. 

175 

176 There are several ways to specify a database number. The first value 

177 found will be used: 

178 

179 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

180 2. If using the redis:// or rediss:// schemes, the path argument 

181 of the url, e.g. redis://localhost/0 

182 3. A ``db`` keyword argument to this function. 

183 

184 If none of these options are specified, the default db=0 is used. 

185 

186 All querystring options are cast to their appropriate Python types. 

187 Boolean arguments can be specified with string values "True"/"False" 

188 or "Yes"/"No". Values that cannot be properly cast cause a 

189 ``ValueError`` to be raised. Once parsed, the querystring arguments 

190 and keyword arguments are passed to the ``ConnectionPool``'s 

191 class initializer. In the case of conflicting arguments, querystring 

192 arguments always win. 

193 

194 """ 

195 single_connection_client = kwargs.pop("single_connection_client", False) 

196 connection_pool = ConnectionPool.from_url(url, **kwargs) 

197 client = cls( 

198 connection_pool=connection_pool, 

199 single_connection_client=single_connection_client, 

200 ) 

201 client.auto_close_connection_pool = True 

202 return client 

203 

204 @classmethod 

205 def from_pool( 

206 cls: Type["Redis"], 

207 connection_pool: ConnectionPool, 

208 ) -> "Redis": 

209 """ 

210 Return a Redis client from the given connection pool. 

211 The Redis client will take ownership of the connection pool and 

212 close it when the Redis client is closed. 

213 """ 

214 client = cls( 

215 connection_pool=connection_pool, 

216 ) 

217 client.auto_close_connection_pool = True 

218 return client 

219 

220 @deprecated_args( 

221 args_to_warn=["retry_on_timeout"], 

222 reason="TimeoutError is included by default.", 

223 version="6.0.0", 

224 ) 

225 @deprecated_args( 

226 args_to_warn=["lib_name", "lib_version"], 

227 reason="Use 'driver_info' parameter instead. " 

228 "lib_name and lib_version will be removed in a future version.", 

229 ) 

230 def __init__( 

231 self, 

232 host: str = "localhost", 

233 port: int = 6379, 

234 db: int = 0, 

235 password: Optional[str] = None, 

236 socket_timeout: Optional[float] = None, 

237 socket_connect_timeout: Optional[float] = None, 

238 socket_keepalive: Optional[bool] = None, 

239 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, 

240 connection_pool: Optional[ConnectionPool] = None, 

241 unix_socket_path: Optional[str] = None, 

242 encoding: str = "utf-8", 

243 encoding_errors: str = "strict", 

244 decode_responses: bool = False, 

245 retry_on_timeout: bool = False, 

246 retry: Retry = Retry( 

247 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 

248 ), 

249 retry_on_error: Optional[List[Type[Exception]]] = None, 

250 ssl: bool = False, 

251 ssl_keyfile: Optional[str] = None, 

252 ssl_certfile: Optional[str] = None, 

253 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required", 

254 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

255 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

256 ssl_ca_certs: Optional[str] = None, 

257 ssl_ca_path: Optional[str] = None, 

258 ssl_ca_data: Optional[str] = None, 

259 ssl_check_hostname: bool = True, 

260 ssl_password: Optional[str] = None, 

261 ssl_validate_ocsp: bool = False, 

262 ssl_validate_ocsp_stapled: bool = False, 

263 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None, 

264 ssl_ocsp_expected_cert: Optional[str] = None, 

265 ssl_min_version: Optional["ssl.TLSVersion"] = None, 

266 ssl_ciphers: Optional[str] = None, 

267 max_connections: Optional[int] = None, 

268 single_connection_client: bool = False, 

269 health_check_interval: int = 0, 

270 client_name: Optional[str] = None, 

271 lib_name: Optional[str] = None, 

272 lib_version: Optional[str] = None, 

273 driver_info: Optional["DriverInfo"] = None, 

274 username: Optional[str] = None, 

275 redis_connect_func: Optional[Callable[[], None]] = None, 

276 credential_provider: Optional[CredentialProvider] = None, 

277 protocol: Optional[int] = 2, 

278 cache: Optional[CacheInterface] = None, 

279 cache_config: Optional[CacheConfig] = None, 

280 event_dispatcher: Optional[EventDispatcher] = None, 

281 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

282 oss_cluster_maint_notifications_handler: Optional[ 

283 OSSMaintNotificationsHandler 

284 ] = None, 

285 ) -> None: 

286 """ 

287 Initialize a new Redis client. 

288 

289 To specify a retry policy for specific errors, you have two options: 

290 

291 1. Set the `retry_on_error` to a list of the error/s to retry on, and 

292 you can also set `retry` to a valid `Retry` object(in case the default 

293 one is not appropriate) - with this approach the retries will be triggered 

294 on the default errors specified in the Retry object enriched with the 

295 errors specified in `retry_on_error`. 

296 

297 2. Define a `Retry` object with configured 'supported_errors' and set 

298 it to the `retry` parameter - with this approach you completely redefine 

299 the errors on which retries will happen. 

300 

301 `retry_on_timeout` is deprecated - please include the TimeoutError 

302 either in the Retry object or in the `retry_on_error` list. 

303 

304 When 'connection_pool' is provided - the retry configuration of the 

305 provided pool will be used. 

306 

307 Args: 

308 

309 single_connection_client: 

310 if `True`, connection pool is not used. In that case `Redis` 

311 instance use is not thread safe. 

312 decode_responses: 

313 if `True`, the response will be decoded to utf-8. 

314 Argument is ignored when connection_pool is provided. 

315 driver_info: 

316 Optional DriverInfo object to identify upstream libraries. 

317 If provided, lib_name and lib_version are ignored. 

318 If not provided, a DriverInfo will be created from lib_name and lib_version. 

319 Argument is ignored when connection_pool is provided. 

320 lib_name: 

321 **Deprecated.** Use driver_info instead. Library name for CLIENT SETINFO. 

322 lib_version: 

323 **Deprecated.** Use driver_info instead. Library version for CLIENT SETINFO. 

324 maint_notifications_config: 

325 configures the pool to support maintenance notifications - see 

326 `redis.maint_notifications.MaintNotificationsConfig` for details. 

327 Only supported with RESP3 

328 If not provided and protocol is RESP3, the maintenance notifications 

329 will be enabled by default (logic is included in the connection pool 

330 initialization). 

331 Argument is ignored when connection_pool is provided. 

332 oss_cluster_maint_notifications_handler: 

333 handler for OSS cluster notifications - see 

334 `redis.maint_notifications.OSSMaintNotificationsHandler` for details. 

335 Only supported with RESP3 

336 Argument is ignored when connection_pool is provided. 

337 """ 

338 if event_dispatcher is None: 

339 self._event_dispatcher = EventDispatcher() 

340 else: 

341 self._event_dispatcher = event_dispatcher 

342 if not connection_pool: 

343 if not retry_on_error: 

344 retry_on_error = [] 

345 

346 # Handle driver_info: if provided, use it; otherwise create from lib_name/lib_version 

347 computed_driver_info = resolve_driver_info( 

348 driver_info, lib_name, lib_version 

349 ) 

350 

351 kwargs = { 

352 "db": db, 

353 "username": username, 

354 "password": password, 

355 "socket_timeout": socket_timeout, 

356 "encoding": encoding, 

357 "encoding_errors": encoding_errors, 

358 "decode_responses": decode_responses, 

359 "retry_on_error": retry_on_error, 

360 "retry": copy.deepcopy(retry), 

361 "max_connections": max_connections, 

362 "health_check_interval": health_check_interval, 

363 "client_name": client_name, 

364 "driver_info": computed_driver_info, 

365 "redis_connect_func": redis_connect_func, 

366 "credential_provider": credential_provider, 

367 "protocol": protocol, 

368 } 

369 # based on input, setup appropriate connection args 

370 if unix_socket_path is not None: 

371 kwargs.update( 

372 { 

373 "path": unix_socket_path, 

374 "connection_class": UnixDomainSocketConnection, 

375 } 

376 ) 

377 else: 

378 # TCP specific options 

379 kwargs.update( 

380 { 

381 "host": host, 

382 "port": port, 

383 "socket_connect_timeout": socket_connect_timeout, 

384 "socket_keepalive": socket_keepalive, 

385 "socket_keepalive_options": socket_keepalive_options, 

386 } 

387 ) 

388 

389 if ssl: 

390 kwargs.update( 

391 { 

392 "connection_class": SSLConnection, 

393 "ssl_keyfile": ssl_keyfile, 

394 "ssl_certfile": ssl_certfile, 

395 "ssl_cert_reqs": ssl_cert_reqs, 

396 "ssl_include_verify_flags": ssl_include_verify_flags, 

397 "ssl_exclude_verify_flags": ssl_exclude_verify_flags, 

398 "ssl_ca_certs": ssl_ca_certs, 

399 "ssl_ca_data": ssl_ca_data, 

400 "ssl_check_hostname": ssl_check_hostname, 

401 "ssl_password": ssl_password, 

402 "ssl_ca_path": ssl_ca_path, 

403 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled, 

404 "ssl_validate_ocsp": ssl_validate_ocsp, 

405 "ssl_ocsp_context": ssl_ocsp_context, 

406 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert, 

407 "ssl_min_version": ssl_min_version, 

408 "ssl_ciphers": ssl_ciphers, 

409 } 

410 ) 

411 if (cache_config or cache) and check_protocol_version(protocol, 3): 

412 kwargs.update( 

413 { 

414 "cache": cache, 

415 "cache_config": cache_config, 

416 } 

417 ) 

418 maint_notifications_enabled = ( 

419 maint_notifications_config and maint_notifications_config.enabled 

420 ) 

421 if maint_notifications_enabled and protocol not in [ 

422 3, 

423 "3", 

424 ]: 

425 raise RedisError( 

426 "Maintenance notifications handlers on connection are only supported with RESP version 3" 

427 ) 

428 if maint_notifications_config: 

429 kwargs.update( 

430 { 

431 "maint_notifications_config": maint_notifications_config, 

432 } 

433 ) 

434 if oss_cluster_maint_notifications_handler: 

435 kwargs.update( 

436 { 

437 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler, 

438 } 

439 ) 

440 connection_pool = ConnectionPool(**kwargs) 

441 self._event_dispatcher.dispatch( 

442 AfterPooledConnectionsInstantiationEvent( 

443 [connection_pool], ClientType.SYNC, credential_provider 

444 ) 

445 ) 

446 self.auto_close_connection_pool = True 

447 else: 

448 self.auto_close_connection_pool = False 

449 self._event_dispatcher.dispatch( 

450 AfterPooledConnectionsInstantiationEvent( 

451 [connection_pool], ClientType.SYNC, credential_provider 

452 ) 

453 ) 

454 

455 self.connection_pool = connection_pool 

456 

457 if (cache_config or cache) and self.connection_pool.get_protocol() not in [ 

458 3, 

459 "3", 

460 ]: 

461 raise RedisError("Client caching is only supported with RESP version 3") 

462 

463 self.single_connection_lock = threading.RLock() 

464 self.connection = None 

465 self._single_connection_client = single_connection_client 

466 if self._single_connection_client: 

467 self.connection = self.connection_pool.get_connection() 

468 self._event_dispatcher.dispatch( 

469 AfterSingleConnectionInstantiationEvent( 

470 self.connection, ClientType.SYNC, self.single_connection_lock 

471 ) 

472 ) 

473 

474 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks) 

475 

476 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]: 

477 self.response_callbacks.update(_RedisCallbacksRESP3) 

478 else: 

479 self.response_callbacks.update(_RedisCallbacksRESP2) 

480 

481 def __repr__(self) -> str: 

482 return ( 

483 f"<{type(self).__module__}.{type(self).__name__}" 

484 f"({repr(self.connection_pool)})>" 

485 ) 

486 

487 def get_encoder(self) -> "Encoder": 

488 """Get the connection pool's encoder""" 

489 return self.connection_pool.get_encoder() 

490 

491 def get_connection_kwargs(self) -> Dict: 

492 """Get the connection's key-word arguments""" 

493 return self.connection_pool.connection_kwargs 

494 

495 def get_retry(self) -> Optional[Retry]: 

496 return self.get_connection_kwargs().get("retry") 

497 

498 def set_retry(self, retry: Retry) -> None: 

499 self.get_connection_kwargs().update({"retry": retry}) 

500 self.connection_pool.set_retry(retry) 

501 

502 def set_response_callback(self, command: str, callback: Callable) -> None: 

503 """Set a custom Response Callback""" 

504 self.response_callbacks[command] = callback 

505 

506 def load_external_module(self, funcname, func) -> None: 

507 """ 

508 This function can be used to add externally defined redis modules, 

509 and their namespaces to the redis client. 

510 

511 funcname - A string containing the name of the function to create 

512 func - The function, being added to this class. 

513 

514 ex: Assume that one has a custom redis module named foomod that 

515 creates command named 'foo.dothing' and 'foo.anotherthing' in redis. 

516 To load function functions into this namespace: 

517 

518 from redis import Redis 

519 from foomodule import F 

520 r = Redis() 

521 r.load_external_module("foo", F) 

522 r.foo().dothing('your', 'arguments') 

523 

524 For a concrete example see the reimport of the redisjson module in 

525 tests/test_connection.py::test_loading_external_modules 

526 """ 

527 setattr(self, funcname, func) 

528 

529 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline": 

530 """ 

531 Return a new pipeline object that can queue multiple commands for 

532 later execution. ``transaction`` indicates whether all commands 

533 should be executed atomically. Apart from making a group of operations 

534 atomic, pipelines are useful for reducing the back-and-forth overhead 

535 between the client and server. 

536 """ 

537 return Pipeline( 

538 self.connection_pool, self.response_callbacks, transaction, shard_hint 

539 ) 

540 

541 def transaction( 

542 self, func: Callable[["Pipeline"], None], *watches, **kwargs 

543 ) -> Union[List[Any], Any, None]: 

544 """ 

545 Convenience method for executing the callable `func` as a transaction 

546 while watching all keys specified in `watches`. The 'func' callable 

547 should expect a single argument which is a Pipeline object. 

548 """ 

549 shard_hint = kwargs.pop("shard_hint", None) 

550 value_from_callable = kwargs.pop("value_from_callable", False) 

551 watch_delay = kwargs.pop("watch_delay", None) 

552 with self.pipeline(True, shard_hint) as pipe: 

553 while True: 

554 try: 

555 if watches: 

556 pipe.watch(*watches) 

557 func_value = func(pipe) 

558 exec_value = pipe.execute() 

559 return func_value if value_from_callable else exec_value 

560 except WatchError: 

561 if watch_delay is not None and watch_delay > 0: 

562 time.sleep(watch_delay) 

563 continue 

564 

565 def lock( 

566 self, 

567 name: str, 

568 timeout: Optional[float] = None, 

569 sleep: float = 0.1, 

570 blocking: bool = True, 

571 blocking_timeout: Optional[float] = None, 

572 lock_class: Union[None, Any] = None, 

573 thread_local: bool = True, 

574 raise_on_release_error: bool = True, 

575 ): 

576 """ 

577 Return a new Lock object using key ``name`` that mimics 

578 the behavior of threading.Lock. 

579 

580 If specified, ``timeout`` indicates a maximum life for the lock. 

581 By default, it will remain locked until release() is called. 

582 

583 ``sleep`` indicates the amount of time to sleep per loop iteration 

584 when the lock is in blocking mode and another client is currently 

585 holding the lock. 

586 

587 ``blocking`` indicates whether calling ``acquire`` should block until 

588 the lock has been acquired or to fail immediately, causing ``acquire`` 

589 to return False and the lock not being acquired. Defaults to True. 

590 Note this value can be overridden by passing a ``blocking`` 

591 argument to ``acquire``. 

592 

593 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

594 spend trying to acquire the lock. A value of ``None`` indicates 

595 continue trying forever. ``blocking_timeout`` can be specified as a 

596 float or integer, both representing the number of seconds to wait. 

597 

598 ``lock_class`` forces the specified lock implementation. Note that as 

599 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

600 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

601 you have created your own custom lock class. 

602 

603 ``thread_local`` indicates whether the lock token is placed in 

604 thread-local storage. By default, the token is placed in thread local 

605 storage so that a thread only sees its token, not a token set by 

606 another thread. Consider the following timeline: 

607 

608 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

609 thread-1 sets the token to "abc" 

610 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

611 Lock instance. 

612 time: 5, thread-1 has not yet completed. redis expires the lock 

613 key. 

614 time: 5, thread-2 acquired `my-lock` now that it's available. 

615 thread-2 sets the token to "xyz" 

616 time: 6, thread-1 finishes its work and calls release(). if the 

617 token is *not* stored in thread local storage, then 

618 thread-1 would see the token value as "xyz" and would be 

619 able to successfully release the thread-2's lock. 

620 

621 ``raise_on_release_error`` indicates whether to raise an exception when 

622 the lock is no longer owned when exiting the context manager. By default, 

623 this is True, meaning an exception will be raised. If False, the warning 

624 will be logged and the exception will be suppressed. 

625 

626 In some use cases it's necessary to disable thread local storage. For 

627 example, if you have code where one thread acquires a lock and passes 

628 that lock instance to a worker thread to release later. If thread 

629 local storage isn't disabled in this case, the worker thread won't see 

630 the token set by the thread that acquired the lock. Our assumption 

631 is that these cases aren't common and as such default to using 

632 thread local storage.""" 

633 if lock_class is None: 

634 lock_class = Lock 

635 return lock_class( 

636 self, 

637 name, 

638 timeout=timeout, 

639 sleep=sleep, 

640 blocking=blocking, 

641 blocking_timeout=blocking_timeout, 

642 thread_local=thread_local, 

643 raise_on_release_error=raise_on_release_error, 

644 ) 

645 

646 def pubsub(self, **kwargs): 

647 """ 

648 Return a Publish/Subscribe object. With this object, you can 

649 subscribe to channels and listen for messages that get published to 

650 them. 

651 """ 

652 return PubSub( 

653 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs 

654 ) 

655 

656 def monitor(self): 

657 return Monitor(self.connection_pool) 

658 

659 def client(self): 

660 return self.__class__( 

661 connection_pool=self.connection_pool, 

662 single_connection_client=True, 

663 ) 

664 

665 def __enter__(self): 

666 return self 

667 

668 def __exit__(self, exc_type, exc_value, traceback): 

669 self.close() 

670 

671 def __del__(self): 

672 try: 

673 self.close() 

674 except Exception: 

675 pass 

676 

677 def close(self) -> None: 

678 # In case a connection property does not yet exist 

679 # (due to a crash earlier in the Redis() constructor), return 

680 # immediately as there is nothing to clean-up. 

681 if not hasattr(self, "connection"): 

682 return 

683 

684 conn = self.connection 

685 if conn: 

686 self.connection = None 

687 self.connection_pool.release(conn) 

688 

689 if self.auto_close_connection_pool: 

690 self.connection_pool.disconnect() 

691 

692 def _send_command_parse_response(self, conn, command_name, *args, **options): 

693 """ 

694 Send a command and parse the response 

695 """ 

696 conn.send_command(*args, **options) 

697 return self.parse_response(conn, command_name, **options) 

698 

699 def _close_connection( 

700 self, 

701 conn, 

702 error: Optional[Exception] = None, 

703 failure_count: Optional[int] = None, 

704 start_time: Optional[float] = None, 

705 command_name: Optional[str] = None, 

706 ) -> None: 

707 """ 

708 Close the connection before retrying. 

709 

710 The supported exceptions are already checked in the 

711 retry object so we don't need to do it here. 

712 

713 After we disconnect the connection, it will try to reconnect and 

714 do a health check as part of the send_command logic(on connection level). 

715 """ 

716 if error and failure_count <= conn.retry.get_retries(): 

717 record_operation_duration( 

718 command_name=command_name, 

719 duration_seconds=time.monotonic() - start_time, 

720 server_address=getattr(conn, "host", None), 

721 server_port=getattr(conn, "port", None), 

722 db_namespace=str(conn.db), 

723 error=error, 

724 retry_attempts=failure_count, 

725 ) 

726 

727 conn.disconnect() 

728 

729 # COMMAND EXECUTION AND PROTOCOL PARSING 

730 def execute_command(self, *args, **options): 

731 return self._execute_command(*args, **options) 

732 

733 def _execute_command(self, *args, **options): 

734 """Execute a command and return a parsed response""" 

735 pool = self.connection_pool 

736 command_name = args[0] 

737 conn = self.connection or pool.get_connection() 

738 

739 # Start timing for observability 

740 start_time = time.monotonic() 

741 # Track actual retry attempts for error reporting 

742 actual_retry_attempts = [0] 

743 

744 def failure_callback(error, failure_count): 

745 if is_debug_log_enabled(): 

746 add_debug_log_for_operation_failure(conn) 

747 actual_retry_attempts[0] = failure_count 

748 self._close_connection(conn, error, failure_count, start_time, command_name) 

749 

750 if self._single_connection_client: 

751 self.single_connection_lock.acquire() 

752 try: 

753 result = conn.retry.call_with_retry( 

754 lambda: self._send_command_parse_response( 

755 conn, command_name, *args, **options 

756 ), 

757 failure_callback, 

758 with_failure_count=True, 

759 ) 

760 

761 record_operation_duration( 

762 command_name=command_name, 

763 duration_seconds=time.monotonic() - start_time, 

764 server_address=getattr(conn, "host", None), 

765 server_port=getattr(conn, "port", None), 

766 db_namespace=str(conn.db), 

767 ) 

768 return result 

769 except Exception as e: 

770 record_error_count( 

771 server_address=getattr(conn, "host", None), 

772 server_port=getattr(conn, "port", None), 

773 network_peer_address=getattr(conn, "host", None), 

774 network_peer_port=getattr(conn, "port", None), 

775 error_type=e, 

776 retry_attempts=actual_retry_attempts[0], 

777 is_internal=False, 

778 ) 

779 raise 

780 

781 finally: 

782 if conn and conn.should_reconnect(): 

783 self._close_connection(conn) 

784 conn.connect() 

785 if self._single_connection_client: 

786 self.single_connection_lock.release() 

787 if not self.connection: 

788 pool.release(conn) 

789 

790 def parse_response(self, connection, command_name, **options): 

791 """Parses a response from the Redis server""" 

792 try: 

793 if NEVER_DECODE in options: 

794 response = connection.read_response(disable_decoding=True) 

795 options.pop(NEVER_DECODE) 

796 else: 

797 response = connection.read_response() 

798 except ResponseError: 

799 if EMPTY_RESPONSE in options: 

800 return options[EMPTY_RESPONSE] 

801 raise 

802 

803 if EMPTY_RESPONSE in options: 

804 options.pop(EMPTY_RESPONSE) 

805 

806 # Remove keys entry, it needs only for cache. 

807 options.pop("keys", None) 

808 

809 if command_name in self.response_callbacks: 

810 return self.response_callbacks[command_name](response, **options) 

811 return response 

812 

813 def get_cache(self) -> Optional[CacheInterface]: 

814 return self.connection_pool.cache 

815 

816 

817StrictRedis = Redis 

818 

819 

820class Monitor: 

821 """ 

822 Monitor is useful for handling the MONITOR command to the redis server. 

823 next_command() method returns one command from monitor 

824 listen() method yields commands from monitor. 

825 """ 

826 

827 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)") 

828 command_re = re.compile(r'"(.*?)(?<!\\)"') 

829 

830 def __init__(self, connection_pool): 

831 self.connection_pool = connection_pool 

832 self.connection = self.connection_pool.get_connection() 

833 

834 def __enter__(self): 

835 self._start_monitor() 

836 return self 

837 

838 def __exit__(self, *args): 

839 self.connection.disconnect() 

840 self.connection_pool.release(self.connection) 

841 

842 def next_command(self): 

843 """Parse the response from a monitor command""" 

844 response = self.connection.read_response() 

845 

846 if response is None: 

847 return None 

848 

849 if isinstance(response, bytes): 

850 response = self.connection.encoder.decode(response, force=True) 

851 

852 command_time, command_data = response.split(" ", 1) 

853 m = self.monitor_re.match(command_data) 

854 db_id, client_info, command = m.groups() 

855 command = " ".join(self.command_re.findall(command)) 

856 # Redis escapes double quotes because each piece of the command 

857 # string is surrounded by double quotes. We don't have that 

858 # requirement so remove the escaping and leave the quote. 

859 command = command.replace('\\"', '"') 

860 

861 if client_info == "lua": 

862 client_address = "lua" 

863 client_port = "" 

864 client_type = "lua" 

865 elif client_info.startswith("unix"): 

866 client_address = "unix" 

867 client_port = client_info[5:] 

868 client_type = "unix" 

869 else: 

870 if client_info == "": 

871 client_address = "" 

872 client_port = "" 

873 client_type = "unknown" 

874 else: 

875 # use rsplit as ipv6 addresses contain colons 

876 client_address, client_port = client_info.rsplit(":", 1) 

877 client_type = "tcp" 

878 return { 

879 "time": float(command_time), 

880 "db": int(db_id), 

881 "client_address": client_address, 

882 "client_port": client_port, 

883 "client_type": client_type, 

884 "command": command, 

885 } 

886 

887 def listen(self): 

888 """Listen for commands coming to the server.""" 

889 while True: 

890 yield self.next_command() 

891 

892 def _start_monitor(self): 

893 self.connection.send_command("MONITOR") 

894 # check that monitor returns 'OK', but don't return it to user 

895 response = self.connection.read_response() 

896 

897 if not bool_ok(response): 

898 raise RedisError(f"MONITOR failed: {response}") 

899 

900 

901class PubSub: 

902 """ 

903 PubSub provides publish, subscribe and listen support to Redis channels. 

904 

905 After subscribing to one or more channels, the listen() method will block 

906 until a message arrives on one of the subscribed channels. That message 

907 will be returned and it's safe to start listening again. 

908 """ 

909 

910 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage") 

911 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe") 

912 HEALTH_CHECK_MESSAGE = "redis-py-health-check" 

913 

914 def __init__( 

915 self, 

916 connection_pool, 

917 shard_hint=None, 

918 ignore_subscribe_messages: bool = False, 

919 encoder: Optional["Encoder"] = None, 

920 push_handler_func: Union[None, Callable[[str], None]] = None, 

921 event_dispatcher: Optional["EventDispatcher"] = None, 

922 ): 

923 self.connection_pool = connection_pool 

924 self.shard_hint = shard_hint 

925 self.ignore_subscribe_messages = ignore_subscribe_messages 

926 self.connection = None 

927 self.subscribed_event = threading.Event() 

928 # we need to know the encoding options for this connection in order 

929 # to lookup channel and pattern names for callback handlers. 

930 self.encoder = encoder 

931 self.push_handler_func = push_handler_func 

932 if event_dispatcher is None: 

933 self._event_dispatcher = EventDispatcher() 

934 else: 

935 self._event_dispatcher = event_dispatcher 

936 

937 self._lock = threading.RLock() 

938 if self.encoder is None: 

939 self.encoder = self.connection_pool.get_encoder() 

940 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) 

941 if self.encoder.decode_responses: 

942 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] 

943 else: 

944 self.health_check_response = [b"pong", self.health_check_response_b] 

945 if self.push_handler_func is None: 

946 _set_info_logger() 

947 self.reset() 

948 

949 def __enter__(self) -> "PubSub": 

950 return self 

951 

952 def __exit__(self, exc_type, exc_value, traceback) -> None: 

953 self.reset() 

954 

955 def __del__(self) -> None: 

956 try: 

957 # if this object went out of scope prior to shutting down 

958 # subscriptions, close the connection manually before 

959 # returning it to the connection pool 

960 self.reset() 

961 except Exception: 

962 pass 

963 

964 def reset(self) -> None: 

965 if self.connection: 

966 self.connection.disconnect() 

967 self.connection.deregister_connect_callback(self.on_connect) 

968 self.connection_pool.release(self.connection) 

969 self.connection = None 

970 self.health_check_response_counter = 0 

971 self.channels = {} 

972 self.pending_unsubscribe_channels = set() 

973 self.shard_channels = {} 

974 self.pending_unsubscribe_shard_channels = set() 

975 self.patterns = {} 

976 self.pending_unsubscribe_patterns = set() 

977 self.subscribed_event.clear() 

978 

979 def close(self) -> None: 

980 self.reset() 

981 

982 def on_connect(self, connection) -> None: 

983 "Re-subscribe to any channels and patterns previously subscribed to" 

984 # NOTE: for python3, we can't pass bytestrings as keyword arguments 

985 # so we need to decode channel/pattern names back to unicode strings 

986 # before passing them to [p]subscribe. 

987 # 

988 # However, channels subscribed without a callback (positional args) may 

989 # have binary names that are not valid in the current encoding (e.g. 

990 # arbitrary bytes that are not valid UTF-8). These channels are stored 

991 # with a ``None`` handler. We re-subscribe them as positional args so 

992 # that no decoding is required. 

993 self.pending_unsubscribe_channels.clear() 

994 self.pending_unsubscribe_patterns.clear() 

995 self.pending_unsubscribe_shard_channels.clear() 

996 if self.channels: 

997 channels_with_handlers = {} 

998 channels_without_handlers = [] 

999 for k, v in self.channels.items(): 

1000 if v is not None: 

1001 channels_with_handlers[self.encoder.decode(k, force=True)] = v 

1002 else: 

1003 channels_without_handlers.append(k) 

1004 if channels_with_handlers or channels_without_handlers: 

1005 self.subscribe(*channels_without_handlers, **channels_with_handlers) 

1006 if self.patterns: 

1007 patterns_with_handlers = {} 

1008 patterns_without_handlers = [] 

1009 for k, v in self.patterns.items(): 

1010 if v is not None: 

1011 patterns_with_handlers[self.encoder.decode(k, force=True)] = v 

1012 else: 

1013 patterns_without_handlers.append(k) 

1014 if patterns_with_handlers or patterns_without_handlers: 

1015 self.psubscribe(*patterns_without_handlers, **patterns_with_handlers) 

1016 if self.shard_channels: 

1017 shard_with_handlers = {} 

1018 shard_without_handlers = [] 

1019 for k, v in self.shard_channels.items(): 

1020 if v is not None: 

1021 shard_with_handlers[self.encoder.decode(k, force=True)] = v 

1022 else: 

1023 shard_without_handlers.append(k) 

1024 if shard_with_handlers or shard_without_handlers: 

1025 self.ssubscribe(*shard_without_handlers, **shard_with_handlers) 

1026 

1027 @property 

1028 def subscribed(self) -> bool: 

1029 """Indicates if there are subscriptions to any channels or patterns""" 

1030 return self.subscribed_event.is_set() 

1031 

1032 def execute_command(self, *args): 

1033 """Execute a publish/subscribe command""" 

1034 

1035 # NOTE: don't parse the response in this function -- it could pull a 

1036 # legitimate message off the stack if the connection is already 

1037 # subscribed to one or more channels 

1038 

1039 if self.connection is None: 

1040 self.connection = self.connection_pool.get_connection() 

1041 # register a callback that re-subscribes to any channels we 

1042 # were listening to when we were disconnected 

1043 self.connection.register_connect_callback(self.on_connect) 

1044 if self.push_handler_func is not None: 

1045 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

1046 self._event_dispatcher.dispatch( 

1047 AfterPubSubConnectionInstantiationEvent( 

1048 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

1049 ) 

1050 ) 

1051 connection = self.connection 

1052 kwargs = {"check_health": not self.subscribed} 

1053 if not self.subscribed: 

1054 self.clean_health_check_responses() 

1055 with self._lock: 

1056 self._execute(connection, connection.send_command, *args, **kwargs) 

1057 

1058 def clean_health_check_responses(self) -> None: 

1059 """ 

1060 If any health check responses are present, clean them 

1061 """ 

1062 ttl = 10 

1063 conn = self.connection 

1064 while conn and self.health_check_response_counter > 0 and ttl > 0: 

1065 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout): 

1066 response = self._execute(conn, conn.read_response) 

1067 if self.is_health_check_response(response): 

1068 self.health_check_response_counter -= 1 

1069 else: 

1070 raise PubSubError( 

1071 "A non health check response was cleaned by " 

1072 "execute_command: {}".format(response) 

1073 ) 

1074 ttl -= 1 

1075 

1076 def _reconnect( 

1077 self, 

1078 conn, 

1079 error: Optional[Exception] = None, 

1080 failure_count: Optional[int] = None, 

1081 start_time: Optional[float] = None, 

1082 command_name: Optional[str] = None, 

1083 ) -> None: 

1084 """ 

1085 The supported exceptions are already checked in the 

1086 retry object so we don't need to do it here. 

1087 

1088 In this error handler we are trying to reconnect to the server. 

1089 """ 

1090 if error and failure_count <= conn.retry.get_retries(): 

1091 if command_name: 

1092 record_operation_duration( 

1093 command_name=command_name, 

1094 duration_seconds=time.monotonic() - start_time, 

1095 server_address=getattr(conn, "host", None), 

1096 server_port=getattr(conn, "port", None), 

1097 db_namespace=str(conn.db), 

1098 error=error, 

1099 retry_attempts=failure_count, 

1100 ) 

1101 conn.disconnect() 

1102 conn.connect() 

1103 

1104 def _execute(self, conn, command, *args, **kwargs): 

1105 """ 

1106 Connect manually upon disconnection. If the Redis server is down, 

1107 this will fail and raise a ConnectionError as desired. 

1108 After reconnection, the ``on_connect`` callback should have been 

1109 called by the # connection to resubscribe us to any channels and 

1110 patterns we were previously listening to 

1111 """ 

1112 

1113 if conn.should_reconnect(): 

1114 self._reconnect(conn) 

1115 

1116 if not len(args) == 0: 

1117 command_name = args[0] 

1118 else: 

1119 command_name = None 

1120 

1121 # Start timing for observability 

1122 start_time = time.monotonic() 

1123 # Track actual retry attempts for error reporting 

1124 actual_retry_attempts = [0] 

1125 

1126 def failure_callback(error, failure_count): 

1127 actual_retry_attempts[0] = failure_count 

1128 self._reconnect(conn, error, failure_count, start_time, command_name) 

1129 

1130 try: 

1131 response = conn.retry.call_with_retry( 

1132 lambda: command(*args, **kwargs), 

1133 failure_callback, 

1134 with_failure_count=True, 

1135 ) 

1136 

1137 if command_name: 

1138 record_operation_duration( 

1139 command_name=command_name, 

1140 duration_seconds=time.monotonic() - start_time, 

1141 server_address=getattr(conn, "host", None), 

1142 server_port=getattr(conn, "port", None), 

1143 db_namespace=str(conn.db), 

1144 ) 

1145 

1146 return response 

1147 except Exception as e: 

1148 record_error_count( 

1149 server_address=getattr(conn, "host", None), 

1150 server_port=getattr(conn, "port", None), 

1151 network_peer_address=getattr(conn, "host", None), 

1152 network_peer_port=getattr(conn, "port", None), 

1153 error_type=e, 

1154 retry_attempts=actual_retry_attempts[0], 

1155 is_internal=False, 

1156 ) 

1157 raise 

1158 

1159 def parse_response(self, block=True, timeout=0): 

1160 """Parse the response from a publish/subscribe command""" 

1161 conn = self.connection 

1162 if conn is None: 

1163 raise RuntimeError( 

1164 "pubsub connection not set: " 

1165 "did you forget to call subscribe() or psubscribe()?" 

1166 ) 

1167 

1168 self.check_health() 

1169 

1170 def try_read(): 

1171 if not block: 

1172 if not conn.can_read(timeout=timeout): 

1173 return None 

1174 else: 

1175 conn.connect() 

1176 return conn.read_response(disconnect_on_error=False, push_request=True) 

1177 

1178 response = self._execute(conn, try_read) 

1179 

1180 if self.is_health_check_response(response): 

1181 # ignore the health check message as user might not expect it 

1182 self.health_check_response_counter -= 1 

1183 return None 

1184 return response 

1185 

1186 def is_health_check_response(self, response) -> bool: 

1187 """ 

1188 Check if the response is a health check response. 

1189 If there are no subscriptions redis responds to PING command with a 

1190 bulk response, instead of a multi-bulk with "pong" and the response. 

1191 """ 

1192 if self.encoder.decode_responses: 

1193 return ( 

1194 response 

1195 in [ 

1196 self.health_check_response, # If there is a subscription 

1197 self.HEALTH_CHECK_MESSAGE, # If there are no subscriptions and decode_responses=True 

1198 ] 

1199 ) 

1200 else: 

1201 return ( 

1202 response 

1203 in [ 

1204 self.health_check_response, # If there is a subscription 

1205 self.health_check_response_b, # If there isn't a subscription and decode_responses=False 

1206 ] 

1207 ) 

1208 

1209 def check_health(self) -> None: 

1210 conn = self.connection 

1211 if conn is None: 

1212 raise RuntimeError( 

1213 "pubsub connection not set: " 

1214 "did you forget to call subscribe() or psubscribe()?" 

1215 ) 

1216 

1217 if conn.health_check_interval and time.monotonic() > conn.next_health_check: 

1218 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False) 

1219 self.health_check_response_counter += 1 

1220 

1221 def _normalize_keys(self, data) -> Dict: 

1222 """ 

1223 normalize channel/pattern names to be either bytes or strings 

1224 based on whether responses are automatically decoded. this saves us 

1225 from coercing the value for each message coming in. 

1226 """ 

1227 encode = self.encoder.encode 

1228 decode = self.encoder.decode 

1229 return {decode(encode(k)): v for k, v in data.items()} 

1230 

1231 def psubscribe(self, *args, **kwargs): 

1232 """ 

1233 Subscribe to channel patterns. Patterns supplied as keyword arguments 

1234 expect a pattern name as the key and a callable as the value. A 

1235 pattern's callable will be invoked automatically when a message is 

1236 received on that pattern rather than producing a message via 

1237 ``listen()``. 

1238 """ 

1239 if args: 

1240 args = list_or_args(args[0], args[1:]) 

1241 new_patterns = dict.fromkeys(args) 

1242 new_patterns.update(kwargs) 

1243 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys()) 

1244 # update the patterns dict AFTER we send the command. we don't want to 

1245 # subscribe twice to these patterns, once for the command and again 

1246 # for the reconnection. 

1247 new_patterns = self._normalize_keys(new_patterns) 

1248 self.patterns.update(new_patterns) 

1249 if not self.subscribed: 

1250 # Set the subscribed_event flag to True 

1251 self.subscribed_event.set() 

1252 # Clear the health check counter 

1253 self.health_check_response_counter = 0 

1254 self.pending_unsubscribe_patterns.difference_update(new_patterns) 

1255 return ret_val 

1256 

1257 def punsubscribe(self, *args): 

1258 """ 

1259 Unsubscribe from the supplied patterns. If empty, unsubscribe from 

1260 all patterns. 

1261 """ 

1262 if args: 

1263 args = list_or_args(args[0], args[1:]) 

1264 patterns = self._normalize_keys(dict.fromkeys(args)) 

1265 else: 

1266 patterns = self.patterns 

1267 self.pending_unsubscribe_patterns.update(patterns) 

1268 return self.execute_command("PUNSUBSCRIBE", *args) 

1269 

1270 def subscribe(self, *args, **kwargs): 

1271 """ 

1272 Subscribe to channels. Channels supplied as keyword arguments expect 

1273 a channel name as the key and a callable as the value. A channel's 

1274 callable will be invoked automatically when a message is received on 

1275 that channel rather than producing a message via ``listen()`` or 

1276 ``get_message()``. 

1277 """ 

1278 if args: 

1279 args = list_or_args(args[0], args[1:]) 

1280 new_channels = dict.fromkeys(args) 

1281 new_channels.update(kwargs) 

1282 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys()) 

1283 # update the channels dict AFTER we send the command. we don't want to 

1284 # subscribe twice to these channels, once for the command and again 

1285 # for the reconnection. 

1286 new_channels = self._normalize_keys(new_channels) 

1287 self.channels.update(new_channels) 

1288 if not self.subscribed: 

1289 # Set the subscribed_event flag to True 

1290 self.subscribed_event.set() 

1291 # Clear the health check counter 

1292 self.health_check_response_counter = 0 

1293 self.pending_unsubscribe_channels.difference_update(new_channels) 

1294 return ret_val 

1295 

1296 def unsubscribe(self, *args): 

1297 """ 

1298 Unsubscribe from the supplied channels. If empty, unsubscribe from 

1299 all channels 

1300 """ 

1301 if args: 

1302 args = list_or_args(args[0], args[1:]) 

1303 channels = self._normalize_keys(dict.fromkeys(args)) 

1304 else: 

1305 channels = self.channels 

1306 self.pending_unsubscribe_channels.update(channels) 

1307 return self.execute_command("UNSUBSCRIBE", *args) 

1308 

1309 def ssubscribe(self, *args, target_node=None, **kwargs): 

1310 """ 

1311 Subscribes the client to the specified shard channels. 

1312 Channels supplied as keyword arguments expect a channel name as the key 

1313 and a callable as the value. A channel's callable will be invoked automatically 

1314 when a message is received on that channel rather than producing a message via 

1315 ``listen()`` or ``get_sharded_message()``. 

1316 """ 

1317 if args: 

1318 args = list_or_args(args[0], args[1:]) 

1319 new_s_channels = dict.fromkeys(args) 

1320 new_s_channels.update(kwargs) 

1321 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys()) 

1322 # update the s_channels dict AFTER we send the command. we don't want to 

1323 # subscribe twice to these channels, once for the command and again 

1324 # for the reconnection. 

1325 new_s_channels = self._normalize_keys(new_s_channels) 

1326 self.shard_channels.update(new_s_channels) 

1327 if not self.subscribed: 

1328 # Set the subscribed_event flag to True 

1329 self.subscribed_event.set() 

1330 # Clear the health check counter 

1331 self.health_check_response_counter = 0 

1332 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels) 

1333 return ret_val 

1334 

1335 def sunsubscribe(self, *args, target_node=None): 

1336 """ 

1337 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from 

1338 all shard_channels 

1339 """ 

1340 if args: 

1341 args = list_or_args(args[0], args[1:]) 

1342 s_channels = self._normalize_keys(dict.fromkeys(args)) 

1343 else: 

1344 s_channels = self.shard_channels 

1345 self.pending_unsubscribe_shard_channels.update(s_channels) 

1346 return self.execute_command("SUNSUBSCRIBE", *args) 

1347 

1348 def listen(self): 

1349 "Listen for messages on channels this client has been subscribed to" 

1350 while self.subscribed: 

1351 response = self.handle_message(self.parse_response(block=True)) 

1352 if response is not None: 

1353 yield response 

1354 

1355 def get_message( 

1356 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0 

1357 ): 

1358 """ 

1359 Get the next message if one is available, otherwise None. 

1360 

1361 If timeout is specified, the system will wait for `timeout` seconds 

1362 before returning. Timeout should be specified as a floating point 

1363 number, or None, to wait indefinitely. 

1364 """ 

1365 if not self.subscribed: 

1366 # Wait for subscription 

1367 start_time = time.monotonic() 

1368 if self.subscribed_event.wait(timeout) is True: 

1369 # The connection was subscribed during the timeout time frame. 

1370 # The timeout should be adjusted based on the time spent 

1371 # waiting for the subscription 

1372 time_spent = time.monotonic() - start_time 

1373 timeout = max(0.0, timeout - time_spent) 

1374 else: 

1375 # The connection isn't subscribed to any channels or patterns, 

1376 # so no messages are available 

1377 return None 

1378 

1379 response = self.parse_response(block=(timeout is None), timeout=timeout) 

1380 

1381 if response: 

1382 return self.handle_message(response, ignore_subscribe_messages) 

1383 return None 

1384 

1385 get_sharded_message = get_message 

1386 

1387 def ping(self, message: Union[str, None] = None) -> bool: 

1388 """ 

1389 Ping the Redis server to test connectivity. 

1390 

1391 Sends a PING command to the Redis server and returns True if the server 

1392 responds with "PONG". 

1393 """ 

1394 args = ["PING", message] if message is not None else ["PING"] 

1395 return self.execute_command(*args) 

1396 

1397 def handle_message(self, response, ignore_subscribe_messages=False): 

1398 """ 

1399 Parses a pub/sub message. If the channel or pattern was subscribed to 

1400 with a message handler, the handler is invoked instead of a parsed 

1401 message being returned. 

1402 """ 

1403 if response is None: 

1404 return None 

1405 if isinstance(response, bytes): 

1406 response = [b"pong", response] if response != b"PONG" else [b"pong", b""] 

1407 

1408 message_type = str_if_bytes(response[0]) 

1409 if message_type == "pmessage": 

1410 message = { 

1411 "type": message_type, 

1412 "pattern": response[1], 

1413 "channel": response[2], 

1414 "data": response[3], 

1415 } 

1416 elif message_type == "pong": 

1417 message = { 

1418 "type": message_type, 

1419 "pattern": None, 

1420 "channel": None, 

1421 "data": response[1], 

1422 } 

1423 else: 

1424 message = { 

1425 "type": message_type, 

1426 "pattern": None, 

1427 "channel": response[1], 

1428 "data": response[2], 

1429 } 

1430 

1431 if message_type in ["message", "pmessage"]: 

1432 channel = str_if_bytes(message["channel"]) 

1433 record_pubsub_message( 

1434 direction=PubSubDirection.RECEIVE, 

1435 channel=channel, 

1436 ) 

1437 elif message_type == "smessage": 

1438 channel = str_if_bytes(message["channel"]) 

1439 record_pubsub_message( 

1440 direction=PubSubDirection.RECEIVE, 

1441 channel=channel, 

1442 sharded=True, 

1443 ) 

1444 

1445 # if this is an unsubscribe message, remove it from memory 

1446 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 

1447 if message_type == "punsubscribe": 

1448 pattern = response[1] 

1449 if pattern in self.pending_unsubscribe_patterns: 

1450 self.pending_unsubscribe_patterns.remove(pattern) 

1451 self.patterns.pop(pattern, None) 

1452 elif message_type == "sunsubscribe": 

1453 s_channel = response[1] 

1454 if s_channel in self.pending_unsubscribe_shard_channels: 

1455 self.pending_unsubscribe_shard_channels.remove(s_channel) 

1456 self.shard_channels.pop(s_channel, None) 

1457 else: 

1458 channel = response[1] 

1459 if channel in self.pending_unsubscribe_channels: 

1460 self.pending_unsubscribe_channels.remove(channel) 

1461 self.channels.pop(channel, None) 

1462 if not self.channels and not self.patterns and not self.shard_channels: 

1463 # There are no subscriptions anymore, set subscribed_event flag 

1464 # to false 

1465 self.subscribed_event.clear() 

1466 

1467 if message_type in self.PUBLISH_MESSAGE_TYPES: 

1468 # if there's a message handler, invoke it 

1469 if message_type == "pmessage": 

1470 handler = self.patterns.get(message["pattern"], None) 

1471 elif message_type == "smessage": 

1472 handler = self.shard_channels.get(message["channel"], None) 

1473 else: 

1474 handler = self.channels.get(message["channel"], None) 

1475 if handler: 

1476 handler(message) 

1477 return None 

1478 elif message_type != "pong": 

1479 # this is a subscribe/unsubscribe message. ignore if we don't 

1480 # want them 

1481 if ignore_subscribe_messages or self.ignore_subscribe_messages: 

1482 return None 

1483 

1484 return message 

1485 

1486 def run_in_thread( 

1487 self, 

1488 sleep_time: float = 0.0, 

1489 daemon: bool = False, 

1490 exception_handler: Optional[Callable] = None, 

1491 pubsub=None, 

1492 sharded_pubsub: bool = False, 

1493 ) -> "PubSubWorkerThread": 

1494 for channel, handler in self.channels.items(): 

1495 if handler is None: 

1496 raise PubSubError(f"Channel: '{channel}' has no handler registered") 

1497 for pattern, handler in self.patterns.items(): 

1498 if handler is None: 

1499 raise PubSubError(f"Pattern: '{pattern}' has no handler registered") 

1500 for s_channel, handler in self.shard_channels.items(): 

1501 if handler is None: 

1502 raise PubSubError( 

1503 f"Shard Channel: '{s_channel}' has no handler registered" 

1504 ) 

1505 

1506 pubsub = self if pubsub is None else pubsub 

1507 thread = PubSubWorkerThread( 

1508 pubsub, 

1509 sleep_time, 

1510 daemon=daemon, 

1511 exception_handler=exception_handler, 

1512 sharded_pubsub=sharded_pubsub, 

1513 ) 

1514 thread.start() 

1515 return thread 

1516 

1517 

1518class PubSubWorkerThread(threading.Thread): 

1519 def __init__( 

1520 self, 

1521 pubsub, 

1522 sleep_time: float, 

1523 daemon: bool = False, 

1524 exception_handler: Union[ 

1525 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None 

1526 ] = None, 

1527 sharded_pubsub: bool = False, 

1528 ): 

1529 super().__init__() 

1530 self.daemon = daemon 

1531 self.pubsub = pubsub 

1532 self.sleep_time = sleep_time 

1533 self.exception_handler = exception_handler 

1534 self.sharded_pubsub = sharded_pubsub 

1535 self._running = threading.Event() 

1536 

1537 def run(self) -> None: 

1538 if self._running.is_set(): 

1539 return 

1540 self._running.set() 

1541 pubsub = self.pubsub 

1542 sleep_time = self.sleep_time 

1543 while self._running.is_set(): 

1544 try: 

1545 if not self.sharded_pubsub: 

1546 pubsub.get_message( 

1547 ignore_subscribe_messages=True, timeout=sleep_time 

1548 ) 

1549 else: 

1550 pubsub.get_sharded_message( 

1551 ignore_subscribe_messages=True, timeout=sleep_time 

1552 ) 

1553 except BaseException as e: 

1554 if self.exception_handler is None: 

1555 raise 

1556 self.exception_handler(e, pubsub, self) 

1557 pubsub.close() 

1558 

1559 def stop(self) -> None: 

1560 # trip the flag so the run loop exits. the run loop will 

1561 # close the pubsub connection, which disconnects the socket 

1562 # and returns the connection to the pool. 

1563 self._running.clear() 

1564 

1565 

1566class Pipeline(Redis): 

1567 """ 

1568 Pipelines provide a way to transmit multiple commands to the Redis server 

1569 in one transmission. This is convenient for batch processing, such as 

1570 saving all the values in a list to Redis. 

1571 

1572 All commands executed within a pipeline(when running in transactional mode, 

1573 which is the default behavior) are wrapped with MULTI and EXEC 

1574 calls. This guarantees all commands executed in the pipeline will be 

1575 executed atomically. 

1576 

1577 Any command raising an exception does *not* halt the execution of 

1578 subsequent commands in the pipeline. Instead, the exception is caught 

1579 and its instance is placed into the response list returned by execute(). 

1580 Code iterating over the response list should be able to deal with an 

1581 instance of an exception as a potential value. In general, these will be 

1582 ResponseError exceptions, such as those raised when issuing a command 

1583 on a key of a different datatype. 

1584 """ 

1585 

1586 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

1587 

1588 def __init__( 

1589 self, 

1590 connection_pool: ConnectionPool, 

1591 response_callbacks, 

1592 transaction, 

1593 shard_hint, 

1594 ): 

1595 self.connection_pool = connection_pool 

1596 self.connection: Optional[Connection] = None 

1597 self.response_callbacks = response_callbacks 

1598 self.transaction = transaction 

1599 self.shard_hint = shard_hint 

1600 self.watching = False 

1601 self.command_stack = [] 

1602 self.scripts: Set[Script] = set() 

1603 self.explicit_transaction = False 

1604 

1605 def __enter__(self) -> "Pipeline": 

1606 return self 

1607 

1608 def __exit__(self, exc_type, exc_value, traceback): 

1609 self.reset() 

1610 

1611 def __del__(self): 

1612 try: 

1613 self.reset() 

1614 except Exception: 

1615 pass 

1616 

1617 def __len__(self) -> int: 

1618 return len(self.command_stack) 

1619 

1620 def __bool__(self) -> bool: 

1621 """Pipeline instances should always evaluate to True""" 

1622 return True 

1623 

1624 def reset(self) -> None: 

1625 self.command_stack = [] 

1626 self.scripts = set() 

1627 # make sure to reset the connection state in the event that we were 

1628 # watching something 

1629 if self.watching and self.connection: 

1630 try: 

1631 # call this manually since our unwatch or 

1632 # immediate_execute_command methods can call reset() 

1633 self.connection.send_command("UNWATCH") 

1634 self.connection.read_response() 

1635 except ConnectionError: 

1636 # disconnect will also remove any previous WATCHes 

1637 self.connection.disconnect() 

1638 # clean up the other instance attributes 

1639 self.watching = False 

1640 self.explicit_transaction = False 

1641 

1642 # we can safely return the connection to the pool here since we're 

1643 # sure we're no longer WATCHing anything 

1644 if self.connection: 

1645 self.connection_pool.release(self.connection) 

1646 self.connection = None 

1647 

1648 def close(self) -> None: 

1649 """Close the pipeline""" 

1650 self.reset() 

1651 

1652 def multi(self) -> None: 

1653 """ 

1654 Start a transactional block of the pipeline after WATCH commands 

1655 are issued. End the transactional block with `execute`. 

1656 """ 

1657 if self.explicit_transaction: 

1658 raise RedisError("Cannot issue nested calls to MULTI") 

1659 if self.command_stack: 

1660 raise RedisError( 

1661 "Commands without an initial WATCH have already been issued" 

1662 ) 

1663 self.explicit_transaction = True 

1664 

1665 def execute_command(self, *args, **kwargs): 

1666 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction: 

1667 return self.immediate_execute_command(*args, **kwargs) 

1668 return self.pipeline_execute_command(*args, **kwargs) 

1669 

1670 def _disconnect_reset_raise_on_watching( 

1671 self, 

1672 conn: AbstractConnection, 

1673 error: Exception, 

1674 failure_count: Optional[int] = None, 

1675 start_time: Optional[float] = None, 

1676 command_name: Optional[str] = None, 

1677 ) -> None: 

1678 """ 

1679 Close the connection reset watching state and 

1680 raise an exception if we were watching. 

1681 

1682 The supported exceptions are already checked in the 

1683 retry object so we don't need to do it here. 

1684 

1685 After we disconnect the connection, it will try to reconnect and 

1686 do a health check as part of the send_command logic(on connection level). 

1687 """ 

1688 if error and failure_count <= conn.retry.get_retries(): 

1689 record_operation_duration( 

1690 command_name=command_name, 

1691 duration_seconds=time.monotonic() - start_time, 

1692 server_address=getattr(conn, "host", None), 

1693 server_port=getattr(conn, "port", None), 

1694 db_namespace=str(conn.db), 

1695 error=error, 

1696 retry_attempts=failure_count, 

1697 ) 

1698 conn.disconnect() 

1699 

1700 # if we were already watching a variable, the watch is no longer 

1701 # valid since this connection has died. raise a WatchError, which 

1702 # indicates the user should retry this transaction. 

1703 if self.watching: 

1704 self.reset() 

1705 raise WatchError( 

1706 f"A {type(error).__name__} occurred while watching one or more keys" 

1707 ) 

1708 

1709 def immediate_execute_command(self, *args, **options): 

1710 """ 

1711 Execute a command immediately, but don't auto-retry on the supported 

1712 errors for retry if we're already WATCHing a variable. 

1713 Used when issuing WATCH or subsequent commands retrieving their values but before 

1714 MULTI is called. 

1715 """ 

1716 command_name = args[0] 

1717 conn = self.connection 

1718 # if this is the first call, we need a connection 

1719 if not conn: 

1720 conn = self.connection_pool.get_connection() 

1721 self.connection = conn 

1722 

1723 # Start timing for observability 

1724 start_time = time.monotonic() 

1725 # Track actual retry attempts for error reporting 

1726 actual_retry_attempts = [0] 

1727 

1728 def failure_callback(error, failure_count): 

1729 if is_debug_log_enabled(): 

1730 add_debug_log_for_operation_failure(conn) 

1731 actual_retry_attempts[0] = failure_count 

1732 self._disconnect_reset_raise_on_watching( 

1733 conn, error, failure_count, start_time, command_name 

1734 ) 

1735 

1736 try: 

1737 response = conn.retry.call_with_retry( 

1738 lambda: self._send_command_parse_response( 

1739 conn, command_name, *args, **options 

1740 ), 

1741 failure_callback, 

1742 with_failure_count=True, 

1743 ) 

1744 

1745 record_operation_duration( 

1746 command_name=command_name, 

1747 duration_seconds=time.monotonic() - start_time, 

1748 server_address=getattr(conn, "host", None), 

1749 server_port=getattr(conn, "port", None), 

1750 db_namespace=str(conn.db), 

1751 ) 

1752 

1753 return response 

1754 except Exception as e: 

1755 record_error_count( 

1756 server_address=getattr(conn, "host", None), 

1757 server_port=getattr(conn, "port", None), 

1758 network_peer_address=getattr(conn, "host", None), 

1759 network_peer_port=getattr(conn, "port", None), 

1760 error_type=e, 

1761 retry_attempts=actual_retry_attempts[0], 

1762 is_internal=False, 

1763 ) 

1764 raise 

1765 

1766 def pipeline_execute_command(self, *args, **options) -> "Pipeline": 

1767 """ 

1768 Stage a command to be executed when execute() is next called 

1769 

1770 Returns the current Pipeline object back so commands can be 

1771 chained together, such as: 

1772 

1773 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

1774 

1775 At some other point, you can then run: pipe.execute(), 

1776 which will execute all commands queued in the pipe. 

1777 """ 

1778 self.command_stack.append((args, options)) 

1779 return self 

1780 

1781 def _execute_transaction( 

1782 self, connection: Connection, commands, raise_on_error 

1783 ) -> List: 

1784 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) 

1785 all_cmds = connection.pack_commands( 

1786 [args for args, options in cmds if EMPTY_RESPONSE not in options] 

1787 ) 

1788 connection.send_packed_command(all_cmds) 

1789 errors = [] 

1790 

1791 # parse off the response for MULTI 

1792 # NOTE: we need to handle ResponseErrors here and continue 

1793 # so that we read all the additional command messages from 

1794 # the socket 

1795 try: 

1796 self.parse_response(connection, "_") 

1797 except ResponseError as e: 

1798 errors.append((0, e)) 

1799 

1800 # and all the other commands 

1801 for i, command in enumerate(commands): 

1802 if EMPTY_RESPONSE in command[1]: 

1803 errors.append((i, command[1][EMPTY_RESPONSE])) 

1804 else: 

1805 try: 

1806 self.parse_response(connection, "_") 

1807 except ResponseError as e: 

1808 self.annotate_exception(e, i + 1, command[0]) 

1809 errors.append((i, e)) 

1810 

1811 # parse the EXEC. 

1812 try: 

1813 response = self.parse_response(connection, "_") 

1814 except ExecAbortError: 

1815 if errors: 

1816 raise errors[0][1] 

1817 raise 

1818 

1819 # EXEC clears any watched keys 

1820 self.watching = False 

1821 

1822 if response is None: 

1823 raise WatchError("Watched variable changed.") 

1824 

1825 # put any parse errors into the response 

1826 for i, e in errors: 

1827 response.insert(i, e) 

1828 

1829 if len(response) != len(commands): 

1830 self.connection.disconnect() 

1831 raise ResponseError( 

1832 "Wrong number of response items from pipeline execution" 

1833 ) 

1834 

1835 # find any errors in the response and raise if necessary 

1836 if raise_on_error: 

1837 self.raise_first_error(commands, response) 

1838 

1839 # We have to run response callbacks manually 

1840 data = [] 

1841 for r, cmd in zip(response, commands): 

1842 if not isinstance(r, Exception): 

1843 args, options = cmd 

1844 # Remove keys entry, it needs only for cache. 

1845 options.pop("keys", None) 

1846 command_name = args[0] 

1847 if command_name in self.response_callbacks: 

1848 r = self.response_callbacks[command_name](r, **options) 

1849 data.append(r) 

1850 

1851 return data 

1852 

1853 def _execute_pipeline(self, connection, commands, raise_on_error): 

1854 # build up all commands into a single request to increase network perf 

1855 all_cmds = connection.pack_commands([args for args, _ in commands]) 

1856 connection.send_packed_command(all_cmds) 

1857 

1858 responses = [] 

1859 for args, options in commands: 

1860 try: 

1861 responses.append(self.parse_response(connection, args[0], **options)) 

1862 except ResponseError as e: 

1863 responses.append(e) 

1864 

1865 if raise_on_error: 

1866 self.raise_first_error(commands, responses) 

1867 

1868 return responses 

1869 

1870 def raise_first_error(self, commands, response): 

1871 for i, r in enumerate(response): 

1872 if isinstance(r, ResponseError): 

1873 self.annotate_exception(r, i + 1, commands[i][0]) 

1874 raise r 

1875 

1876 def annotate_exception(self, exception, number, command): 

1877 cmd = " ".join(map(safe_str, command)) 

1878 msg = ( 

1879 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

1880 f"caused error: {exception.args[0]}" 

1881 ) 

1882 exception.args = (msg,) + exception.args[1:] 

1883 

1884 def parse_response(self, connection, command_name, **options): 

1885 result = Redis.parse_response(self, connection, command_name, **options) 

1886 if command_name in self.UNWATCH_COMMANDS: 

1887 self.watching = False 

1888 elif command_name == "WATCH": 

1889 self.watching = True 

1890 return result 

1891 

1892 def load_scripts(self): 

1893 # make sure all scripts that are about to be run on this pipeline exist 

1894 scripts = list(self.scripts) 

1895 immediate = self.immediate_execute_command 

1896 shas = [s.sha for s in scripts] 

1897 # we can't use the normal script_* methods because they would just 

1898 # get buffered in the pipeline. 

1899 exists = immediate("SCRIPT EXISTS", *shas) 

1900 if not all(exists): 

1901 for s, exist in zip(scripts, exists): 

1902 if not exist: 

1903 s.sha = immediate("SCRIPT LOAD", s.script) 

1904 

1905 def _disconnect_raise_on_watching( 

1906 self, 

1907 conn: AbstractConnection, 

1908 error: Exception, 

1909 failure_count: Optional[int] = None, 

1910 start_time: Optional[float] = None, 

1911 command_name: Optional[str] = None, 

1912 ) -> None: 

1913 """ 

1914 Close the connection, raise an exception if we were watching. 

1915 

1916 The supported exceptions are already checked in the 

1917 retry object so we don't need to do it here. 

1918 

1919 After we disconnect the connection, it will try to reconnect and 

1920 do a health check as part of the send_command logic(on connection level). 

1921 """ 

1922 if error and failure_count <= conn.retry.get_retries(): 

1923 record_operation_duration( 

1924 command_name=command_name, 

1925 duration_seconds=time.monotonic() - start_time, 

1926 server_address=getattr(conn, "host", None), 

1927 server_port=getattr(conn, "port", None), 

1928 db_namespace=str(conn.db), 

1929 error=error, 

1930 retry_attempts=failure_count, 

1931 ) 

1932 conn.disconnect() 

1933 # if we were watching a variable, the watch is no longer valid 

1934 # since this connection has died. raise a WatchError, which 

1935 # indicates the user should retry this transaction. 

1936 if self.watching: 

1937 raise WatchError( 

1938 f"A {type(error).__name__} occurred while watching one or more keys" 

1939 ) 

1940 

1941 def execute(self, raise_on_error: bool = True) -> List[Any]: 

1942 """Execute all the commands in the current pipeline""" 

1943 stack = self.command_stack 

1944 if not stack and not self.watching: 

1945 return [] 

1946 if self.scripts: 

1947 self.load_scripts() 

1948 if self.transaction or self.explicit_transaction: 

1949 execute = self._execute_transaction 

1950 operation_name = "MULTI" 

1951 else: 

1952 execute = self._execute_pipeline 

1953 operation_name = "PIPELINE" 

1954 

1955 conn = self.connection 

1956 if not conn: 

1957 conn = self.connection_pool.get_connection() 

1958 # assign to self.connection so reset() releases the connection 

1959 # back to the pool after we're done 

1960 self.connection = conn 

1961 

1962 # Start timing for observability 

1963 start_time = time.monotonic() 

1964 # Track actual retry attempts for error reporting 

1965 actual_retry_attempts = [0] 

1966 

1967 def failure_callback(error, failure_count): 

1968 if is_debug_log_enabled(): 

1969 add_debug_log_for_operation_failure(conn) 

1970 actual_retry_attempts[0] = failure_count 

1971 self._disconnect_raise_on_watching( 

1972 conn, error, failure_count, start_time, operation_name 

1973 ) 

1974 

1975 try: 

1976 response = conn.retry.call_with_retry( 

1977 lambda: execute(conn, stack, raise_on_error), 

1978 failure_callback, 

1979 with_failure_count=True, 

1980 ) 

1981 

1982 record_operation_duration( 

1983 command_name=operation_name, 

1984 duration_seconds=time.monotonic() - start_time, 

1985 server_address=getattr(conn, "host", None), 

1986 server_port=getattr(conn, "port", None), 

1987 db_namespace=str(conn.db), 

1988 ) 

1989 return response 

1990 except Exception as e: 

1991 record_error_count( 

1992 server_address=getattr(conn, "host", None), 

1993 server_port=getattr(conn, "port", None), 

1994 network_peer_address=getattr(conn, "host", None), 

1995 network_peer_port=getattr(conn, "port", None), 

1996 error_type=e, 

1997 retry_attempts=actual_retry_attempts[0], 

1998 is_internal=False, 

1999 ) 

2000 raise 

2001 

2002 finally: 

2003 # in reset() the connection is disconnected before returned to the pool if 

2004 # it is marked for reconnect. 

2005 self.reset() 

2006 

2007 def discard(self): 

2008 """ 

2009 Flushes all previously queued commands 

2010 See: https://redis.io/commands/DISCARD 

2011 """ 

2012 self.execute_command("DISCARD") 

2013 

2014 def watch(self, *names): 

2015 """Watches the values at keys ``names``""" 

2016 if self.explicit_transaction: 

2017 raise RedisError("Cannot issue a WATCH after a MULTI") 

2018 return self.execute_command("WATCH", *names) 

2019 

2020 def unwatch(self) -> bool: 

2021 """Unwatches all previously specified keys""" 

2022 return self.watching and self.execute_command("UNWATCH") or True