Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

705 statements  

1import copy 

2import re 

3import threading 

4import time 

5from itertools import chain 

6from typing import ( 

7 TYPE_CHECKING, 

8 Any, 

9 Callable, 

10 Dict, 

11 List, 

12 Mapping, 

13 Optional, 

14 Set, 

15 Type, 

16 Union, 

17) 

18 

19from redis._parsers.encoders import Encoder 

20from redis._parsers.helpers import ( 

21 _RedisCallbacks, 

22 _RedisCallbacksRESP2, 

23 _RedisCallbacksRESP3, 

24 bool_ok, 

25) 

26from redis.backoff import ExponentialWithJitterBackoff 

27from redis.cache import CacheConfig, CacheInterface 

28from redis.commands import ( 

29 CoreCommands, 

30 RedisModuleCommands, 

31 SentinelCommands, 

32 list_or_args, 

33) 

34from redis.commands.core import Script 

35from redis.connection import ( 

36 AbstractConnection, 

37 Connection, 

38 ConnectionPool, 

39 SSLConnection, 

40 UnixDomainSocketConnection, 

41) 

42from redis.credentials import CredentialProvider 

43from redis.event import ( 

44 AfterPooledConnectionsInstantiationEvent, 

45 AfterPubSubConnectionInstantiationEvent, 

46 AfterSingleConnectionInstantiationEvent, 

47 ClientType, 

48 EventDispatcher, 

49) 

50from redis.exceptions import ( 

51 ConnectionError, 

52 ExecAbortError, 

53 PubSubError, 

54 RedisError, 

55 ResponseError, 

56 WatchError, 

57) 

58from redis.lock import Lock 

59from redis.maint_notifications import ( 

60 MaintNotificationsConfig, 

61 MaintNotificationsPoolHandler, 

62) 

63from redis.retry import Retry 

64from redis.utils import ( 

65 _set_info_logger, 

66 deprecated_args, 

67 get_lib_version, 

68 safe_str, 

69 str_if_bytes, 

70 truncate_text, 

71) 

72 

73if TYPE_CHECKING: 

74 import ssl 

75 

76 import OpenSSL 

77 

78SYM_EMPTY = b"" 

79EMPTY_RESPONSE = "EMPTY_RESPONSE" 

80 

81# some responses (ie. dump) are binary, and just meant to never be decoded 

82NEVER_DECODE = "NEVER_DECODE" 

83 

84 

85class CaseInsensitiveDict(dict): 

86 "Case insensitive dict implementation. Assumes string keys only." 

87 

88 def __init__(self, data: Dict[str, str]) -> None: 

89 for k, v in data.items(): 

90 self[k.upper()] = v 

91 

92 def __contains__(self, k): 

93 return super().__contains__(k.upper()) 

94 

95 def __delitem__(self, k): 

96 super().__delitem__(k.upper()) 

97 

98 def __getitem__(self, k): 

99 return super().__getitem__(k.upper()) 

100 

101 def get(self, k, default=None): 

102 return super().get(k.upper(), default) 

103 

104 def __setitem__(self, k, v): 

105 super().__setitem__(k.upper(), v) 

106 

107 def update(self, data): 

108 data = CaseInsensitiveDict(data) 

109 super().update(data) 

110 

111 

112class AbstractRedis: 

113 pass 

114 

115 

116class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): 

117 """ 

118 Implementation of the Redis protocol. 

119 

120 This abstract class provides a Python interface to all Redis commands 

121 and an implementation of the Redis protocol. 

122 

123 Pipelines derive from this, implementing how 

124 the commands are sent and received to the Redis server. Based on 

125 configuration, an instance will either use a ConnectionPool, or 

126 Connection object to talk to redis. 

127 

128 It is not safe to pass PubSub or Pipeline objects between threads. 

129 """ 

130 

131 @classmethod 

132 def from_url(cls, url: str, **kwargs) -> "Redis": 

133 """ 

134 Return a Redis client object configured from the given URL 

135 

136 For example:: 

137 

138 redis://[[username]:[password]]@localhost:6379/0 

139 rediss://[[username]:[password]]@localhost:6379/0 

140 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

141 

142 Three URL schemes are supported: 

143 

144 - `redis://` creates a TCP socket connection. See more at: 

145 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

146 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

147 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

148 - ``unix://``: creates a Unix Domain Socket connection. 

149 

150 The username, password, hostname, path and all querystring values 

151 are passed through urllib.parse.unquote in order to replace any 

152 percent-encoded values with their corresponding characters. 

153 

154 There are several ways to specify a database number. The first value 

155 found will be used: 

156 

157 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

158 2. If using the redis:// or rediss:// schemes, the path argument 

159 of the url, e.g. redis://localhost/0 

160 3. A ``db`` keyword argument to this function. 

161 

162 If none of these options are specified, the default db=0 is used. 

163 

164 All querystring options are cast to their appropriate Python types. 

165 Boolean arguments can be specified with string values "True"/"False" 

166 or "Yes"/"No". Values that cannot be properly cast cause a 

167 ``ValueError`` to be raised. Once parsed, the querystring arguments 

168 and keyword arguments are passed to the ``ConnectionPool``'s 

169 class initializer. In the case of conflicting arguments, querystring 

170 arguments always win. 

171 

172 """ 

173 single_connection_client = kwargs.pop("single_connection_client", False) 

174 connection_pool = ConnectionPool.from_url(url, **kwargs) 

175 client = cls( 

176 connection_pool=connection_pool, 

177 single_connection_client=single_connection_client, 

178 ) 

179 client.auto_close_connection_pool = True 

180 return client 

181 

182 @classmethod 

183 def from_pool( 

184 cls: Type["Redis"], 

185 connection_pool: ConnectionPool, 

186 ) -> "Redis": 

187 """ 

188 Return a Redis client from the given connection pool. 

189 The Redis client will take ownership of the connection pool and 

190 close it when the Redis client is closed. 

191 """ 

192 client = cls( 

193 connection_pool=connection_pool, 

194 ) 

195 client.auto_close_connection_pool = True 

196 return client 

197 

198 @deprecated_args( 

199 args_to_warn=["retry_on_timeout"], 

200 reason="TimeoutError is included by default.", 

201 version="6.0.0", 

202 ) 

203 def __init__( 

204 self, 

205 host: str = "localhost", 

206 port: int = 6379, 

207 db: int = 0, 

208 password: Optional[str] = None, 

209 socket_timeout: Optional[float] = None, 

210 socket_connect_timeout: Optional[float] = None, 

211 socket_keepalive: Optional[bool] = None, 

212 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, 

213 connection_pool: Optional[ConnectionPool] = None, 

214 unix_socket_path: Optional[str] = None, 

215 encoding: str = "utf-8", 

216 encoding_errors: str = "strict", 

217 decode_responses: bool = False, 

218 retry_on_timeout: bool = False, 

219 retry: Retry = Retry( 

220 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 

221 ), 

222 retry_on_error: Optional[List[Type[Exception]]] = None, 

223 ssl: bool = False, 

224 ssl_keyfile: Optional[str] = None, 

225 ssl_certfile: Optional[str] = None, 

226 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required", 

227 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

228 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

229 ssl_ca_certs: Optional[str] = None, 

230 ssl_ca_path: Optional[str] = None, 

231 ssl_ca_data: Optional[str] = None, 

232 ssl_check_hostname: bool = True, 

233 ssl_password: Optional[str] = None, 

234 ssl_validate_ocsp: bool = False, 

235 ssl_validate_ocsp_stapled: bool = False, 

236 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None, 

237 ssl_ocsp_expected_cert: Optional[str] = None, 

238 ssl_min_version: Optional["ssl.TLSVersion"] = None, 

239 ssl_ciphers: Optional[str] = None, 

240 max_connections: Optional[int] = None, 

241 single_connection_client: bool = False, 

242 health_check_interval: int = 0, 

243 client_name: Optional[str] = None, 

244 lib_name: Optional[str] = "redis-py", 

245 lib_version: Optional[str] = get_lib_version(), 

246 username: Optional[str] = None, 

247 redis_connect_func: Optional[Callable[[], None]] = None, 

248 credential_provider: Optional[CredentialProvider] = None, 

249 protocol: Optional[int] = 2, 

250 cache: Optional[CacheInterface] = None, 

251 cache_config: Optional[CacheConfig] = None, 

252 event_dispatcher: Optional[EventDispatcher] = None, 

253 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

254 ) -> None: 

255 """ 

256 Initialize a new Redis client. 

257 

258 To specify a retry policy for specific errors, you have two options: 

259 

260 1. Set the `retry_on_error` to a list of the error/s to retry on, and 

261 you can also set `retry` to a valid `Retry` object(in case the default 

262 one is not appropriate) - with this approach the retries will be triggered 

263 on the default errors specified in the Retry object enriched with the 

264 errors specified in `retry_on_error`. 

265 

266 2. Define a `Retry` object with configured 'supported_errors' and set 

267 it to the `retry` parameter - with this approach you completely redefine 

268 the errors on which retries will happen. 

269 

270 `retry_on_timeout` is deprecated - please include the TimeoutError 

271 either in the Retry object or in the `retry_on_error` list. 

272 

273 When 'connection_pool' is provided - the retry configuration of the 

274 provided pool will be used. 

275 

276 Args: 

277 

278 single_connection_client: 

279 if `True`, connection pool is not used. In that case `Redis` 

280 instance use is not thread safe. 

281 """ 

282 if event_dispatcher is None: 

283 self._event_dispatcher = EventDispatcher() 

284 else: 

285 self._event_dispatcher = event_dispatcher 

286 if not connection_pool: 

287 if not retry_on_error: 

288 retry_on_error = [] 

289 kwargs = { 

290 "db": db, 

291 "username": username, 

292 "password": password, 

293 "socket_timeout": socket_timeout, 

294 "encoding": encoding, 

295 "encoding_errors": encoding_errors, 

296 "decode_responses": decode_responses, 

297 "retry_on_error": retry_on_error, 

298 "retry": copy.deepcopy(retry), 

299 "max_connections": max_connections, 

300 "health_check_interval": health_check_interval, 

301 "client_name": client_name, 

302 "lib_name": lib_name, 

303 "lib_version": lib_version, 

304 "redis_connect_func": redis_connect_func, 

305 "credential_provider": credential_provider, 

306 "protocol": protocol, 

307 } 

308 # based on input, setup appropriate connection args 

309 if unix_socket_path is not None: 

310 kwargs.update( 

311 { 

312 "path": unix_socket_path, 

313 "connection_class": UnixDomainSocketConnection, 

314 } 

315 ) 

316 else: 

317 # TCP specific options 

318 kwargs.update( 

319 { 

320 "host": host, 

321 "port": port, 

322 "socket_connect_timeout": socket_connect_timeout, 

323 "socket_keepalive": socket_keepalive, 

324 "socket_keepalive_options": socket_keepalive_options, 

325 } 

326 ) 

327 

328 if ssl: 

329 kwargs.update( 

330 { 

331 "connection_class": SSLConnection, 

332 "ssl_keyfile": ssl_keyfile, 

333 "ssl_certfile": ssl_certfile, 

334 "ssl_cert_reqs": ssl_cert_reqs, 

335 "ssl_include_verify_flags": ssl_include_verify_flags, 

336 "ssl_exclude_verify_flags": ssl_exclude_verify_flags, 

337 "ssl_ca_certs": ssl_ca_certs, 

338 "ssl_ca_data": ssl_ca_data, 

339 "ssl_check_hostname": ssl_check_hostname, 

340 "ssl_password": ssl_password, 

341 "ssl_ca_path": ssl_ca_path, 

342 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled, 

343 "ssl_validate_ocsp": ssl_validate_ocsp, 

344 "ssl_ocsp_context": ssl_ocsp_context, 

345 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert, 

346 "ssl_min_version": ssl_min_version, 

347 "ssl_ciphers": ssl_ciphers, 

348 } 

349 ) 

350 if (cache_config or cache) and protocol in [3, "3"]: 

351 kwargs.update( 

352 { 

353 "cache": cache, 

354 "cache_config": cache_config, 

355 } 

356 ) 

357 connection_pool = ConnectionPool(**kwargs) 

358 self._event_dispatcher.dispatch( 

359 AfterPooledConnectionsInstantiationEvent( 

360 [connection_pool], ClientType.SYNC, credential_provider 

361 ) 

362 ) 

363 self.auto_close_connection_pool = True 

364 else: 

365 self.auto_close_connection_pool = False 

366 self._event_dispatcher.dispatch( 

367 AfterPooledConnectionsInstantiationEvent( 

368 [connection_pool], ClientType.SYNC, credential_provider 

369 ) 

370 ) 

371 

372 self.connection_pool = connection_pool 

373 

374 if (cache_config or cache) and self.connection_pool.get_protocol() not in [ 

375 3, 

376 "3", 

377 ]: 

378 raise RedisError("Client caching is only supported with RESP version 3") 

379 

380 if maint_notifications_config and self.connection_pool.get_protocol() not in [ 

381 3, 

382 "3", 

383 ]: 

384 raise RedisError( 

385 "Push handlers on connection are only supported with RESP version 3" 

386 ) 

387 if maint_notifications_config and maint_notifications_config.enabled: 

388 self.maint_notifications_pool_handler = MaintNotificationsPoolHandler( 

389 self.connection_pool, maint_notifications_config 

390 ) 

391 self.connection_pool.set_maint_notifications_pool_handler( 

392 self.maint_notifications_pool_handler 

393 ) 

394 else: 

395 self.maint_notifications_pool_handler = None 

396 

397 self.single_connection_lock = threading.RLock() 

398 self.connection = None 

399 self._single_connection_client = single_connection_client 

400 if self._single_connection_client: 

401 self.connection = self.connection_pool.get_connection() 

402 self._event_dispatcher.dispatch( 

403 AfterSingleConnectionInstantiationEvent( 

404 self.connection, ClientType.SYNC, self.single_connection_lock 

405 ) 

406 ) 

407 

408 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks) 

409 

410 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]: 

411 self.response_callbacks.update(_RedisCallbacksRESP3) 

412 else: 

413 self.response_callbacks.update(_RedisCallbacksRESP2) 

414 

415 def __repr__(self) -> str: 

416 return ( 

417 f"<{type(self).__module__}.{type(self).__name__}" 

418 f"({repr(self.connection_pool)})>" 

419 ) 

420 

421 def get_encoder(self) -> "Encoder": 

422 """Get the connection pool's encoder""" 

423 return self.connection_pool.get_encoder() 

424 

425 def get_connection_kwargs(self) -> Dict: 

426 """Get the connection's key-word arguments""" 

427 return self.connection_pool.connection_kwargs 

428 

429 def get_retry(self) -> Optional[Retry]: 

430 return self.get_connection_kwargs().get("retry") 

431 

432 def set_retry(self, retry: Retry) -> None: 

433 self.get_connection_kwargs().update({"retry": retry}) 

434 self.connection_pool.set_retry(retry) 

435 

436 def set_response_callback(self, command: str, callback: Callable) -> None: 

437 """Set a custom Response Callback""" 

438 self.response_callbacks[command] = callback 

439 

440 def load_external_module(self, funcname, func) -> None: 

441 """ 

442 This function can be used to add externally defined redis modules, 

443 and their namespaces to the redis client. 

444 

445 funcname - A string containing the name of the function to create 

446 func - The function, being added to this class. 

447 

448 ex: Assume that one has a custom redis module named foomod that 

449 creates command named 'foo.dothing' and 'foo.anotherthing' in redis. 

450 To load function functions into this namespace: 

451 

452 from redis import Redis 

453 from foomodule import F 

454 r = Redis() 

455 r.load_external_module("foo", F) 

456 r.foo().dothing('your', 'arguments') 

457 

458 For a concrete example see the reimport of the redisjson module in 

459 tests/test_connection.py::test_loading_external_modules 

460 """ 

461 setattr(self, funcname, func) 

462 

463 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline": 

464 """ 

465 Return a new pipeline object that can queue multiple commands for 

466 later execution. ``transaction`` indicates whether all commands 

467 should be executed atomically. Apart from making a group of operations 

468 atomic, pipelines are useful for reducing the back-and-forth overhead 

469 between the client and server. 

470 """ 

471 return Pipeline( 

472 self.connection_pool, self.response_callbacks, transaction, shard_hint 

473 ) 

474 

475 def transaction( 

476 self, func: Callable[["Pipeline"], None], *watches, **kwargs 

477 ) -> Union[List[Any], Any, None]: 

478 """ 

479 Convenience method for executing the callable `func` as a transaction 

480 while watching all keys specified in `watches`. The 'func' callable 

481 should expect a single argument which is a Pipeline object. 

482 """ 

483 shard_hint = kwargs.pop("shard_hint", None) 

484 value_from_callable = kwargs.pop("value_from_callable", False) 

485 watch_delay = kwargs.pop("watch_delay", None) 

486 with self.pipeline(True, shard_hint) as pipe: 

487 while True: 

488 try: 

489 if watches: 

490 pipe.watch(*watches) 

491 func_value = func(pipe) 

492 exec_value = pipe.execute() 

493 return func_value if value_from_callable else exec_value 

494 except WatchError: 

495 if watch_delay is not None and watch_delay > 0: 

496 time.sleep(watch_delay) 

497 continue 

498 

499 def lock( 

500 self, 

501 name: str, 

502 timeout: Optional[float] = None, 

503 sleep: float = 0.1, 

504 blocking: bool = True, 

505 blocking_timeout: Optional[float] = None, 

506 lock_class: Union[None, Any] = None, 

507 thread_local: bool = True, 

508 raise_on_release_error: bool = True, 

509 ): 

510 """ 

511 Return a new Lock object using key ``name`` that mimics 

512 the behavior of threading.Lock. 

513 

514 If specified, ``timeout`` indicates a maximum life for the lock. 

515 By default, it will remain locked until release() is called. 

516 

517 ``sleep`` indicates the amount of time to sleep per loop iteration 

518 when the lock is in blocking mode and another client is currently 

519 holding the lock. 

520 

521 ``blocking`` indicates whether calling ``acquire`` should block until 

522 the lock has been acquired or to fail immediately, causing ``acquire`` 

523 to return False and the lock not being acquired. Defaults to True. 

524 Note this value can be overridden by passing a ``blocking`` 

525 argument to ``acquire``. 

526 

527 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

528 spend trying to acquire the lock. A value of ``None`` indicates 

529 continue trying forever. ``blocking_timeout`` can be specified as a 

530 float or integer, both representing the number of seconds to wait. 

531 

532 ``lock_class`` forces the specified lock implementation. Note that as 

533 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

534 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

535 you have created your own custom lock class. 

536 

537 ``thread_local`` indicates whether the lock token is placed in 

538 thread-local storage. By default, the token is placed in thread local 

539 storage so that a thread only sees its token, not a token set by 

540 another thread. Consider the following timeline: 

541 

542 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

543 thread-1 sets the token to "abc" 

544 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

545 Lock instance. 

546 time: 5, thread-1 has not yet completed. redis expires the lock 

547 key. 

548 time: 5, thread-2 acquired `my-lock` now that it's available. 

549 thread-2 sets the token to "xyz" 

550 time: 6, thread-1 finishes its work and calls release(). if the 

551 token is *not* stored in thread local storage, then 

552 thread-1 would see the token value as "xyz" and would be 

553 able to successfully release the thread-2's lock. 

554 

555 ``raise_on_release_error`` indicates whether to raise an exception when 

556 the lock is no longer owned when exiting the context manager. By default, 

557 this is True, meaning an exception will be raised. If False, the warning 

558 will be logged and the exception will be suppressed. 

559 

560 In some use cases it's necessary to disable thread local storage. For 

561 example, if you have code where one thread acquires a lock and passes 

562 that lock instance to a worker thread to release later. If thread 

563 local storage isn't disabled in this case, the worker thread won't see 

564 the token set by the thread that acquired the lock. Our assumption 

565 is that these cases aren't common and as such default to using 

566 thread local storage.""" 

567 if lock_class is None: 

568 lock_class = Lock 

569 return lock_class( 

570 self, 

571 name, 

572 timeout=timeout, 

573 sleep=sleep, 

574 blocking=blocking, 

575 blocking_timeout=blocking_timeout, 

576 thread_local=thread_local, 

577 raise_on_release_error=raise_on_release_error, 

578 ) 

579 

580 def pubsub(self, **kwargs): 

581 """ 

582 Return a Publish/Subscribe object. With this object, you can 

583 subscribe to channels and listen for messages that get published to 

584 them. 

585 """ 

586 return PubSub( 

587 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs 

588 ) 

589 

590 def monitor(self): 

591 return Monitor(self.connection_pool) 

592 

593 def client(self): 

594 maint_notifications_config = ( 

595 None 

596 if self.maint_notifications_pool_handler is None 

597 else self.maint_notifications_pool_handler.config 

598 ) 

599 return self.__class__( 

600 connection_pool=self.connection_pool, 

601 single_connection_client=True, 

602 maint_notifications_config=maint_notifications_config, 

603 ) 

604 

605 def __enter__(self): 

606 return self 

607 

608 def __exit__(self, exc_type, exc_value, traceback): 

609 self.close() 

610 

611 def __del__(self): 

612 try: 

613 self.close() 

614 except Exception: 

615 pass 

616 

617 def close(self) -> None: 

618 # In case a connection property does not yet exist 

619 # (due to a crash earlier in the Redis() constructor), return 

620 # immediately as there is nothing to clean-up. 

621 if not hasattr(self, "connection"): 

622 return 

623 

624 conn = self.connection 

625 if conn: 

626 self.connection = None 

627 self.connection_pool.release(conn) 

628 

629 if self.auto_close_connection_pool: 

630 self.connection_pool.disconnect() 

631 

632 def _send_command_parse_response(self, conn, command_name, *args, **options): 

633 """ 

634 Send a command and parse the response 

635 """ 

636 conn.send_command(*args, **options) 

637 return self.parse_response(conn, command_name, **options) 

638 

639 def _close_connection(self, conn) -> None: 

640 """ 

641 Close the connection before retrying. 

642 

643 The supported exceptions are already checked in the 

644 retry object so we don't need to do it here. 

645 

646 After we disconnect the connection, it will try to reconnect and 

647 do a health check as part of the send_command logic(on connection level). 

648 """ 

649 

650 conn.disconnect() 

651 

652 # COMMAND EXECUTION AND PROTOCOL PARSING 

653 def execute_command(self, *args, **options): 

654 return self._execute_command(*args, **options) 

655 

656 def _execute_command(self, *args, **options): 

657 """Execute a command and return a parsed response""" 

658 pool = self.connection_pool 

659 command_name = args[0] 

660 conn = self.connection or pool.get_connection() 

661 

662 if self._single_connection_client: 

663 self.single_connection_lock.acquire() 

664 try: 

665 return conn.retry.call_with_retry( 

666 lambda: self._send_command_parse_response( 

667 conn, command_name, *args, **options 

668 ), 

669 lambda _: self._close_connection(conn), 

670 ) 

671 

672 finally: 

673 if conn and conn.should_reconnect(): 

674 self._close_connection(conn) 

675 conn.connect() 

676 if self._single_connection_client: 

677 self.single_connection_lock.release() 

678 if not self.connection: 

679 pool.release(conn) 

680 

681 def parse_response(self, connection, command_name, **options): 

682 """Parses a response from the Redis server""" 

683 try: 

684 if NEVER_DECODE in options: 

685 response = connection.read_response(disable_decoding=True) 

686 options.pop(NEVER_DECODE) 

687 else: 

688 response = connection.read_response() 

689 except ResponseError: 

690 if EMPTY_RESPONSE in options: 

691 return options[EMPTY_RESPONSE] 

692 raise 

693 

694 if EMPTY_RESPONSE in options: 

695 options.pop(EMPTY_RESPONSE) 

696 

697 # Remove keys entry, it needs only for cache. 

698 options.pop("keys", None) 

699 

700 if command_name in self.response_callbacks: 

701 return self.response_callbacks[command_name](response, **options) 

702 return response 

703 

704 def get_cache(self) -> Optional[CacheInterface]: 

705 return self.connection_pool.cache 

706 

707 

708StrictRedis = Redis 

709 

710 

711class Monitor: 

712 """ 

713 Monitor is useful for handling the MONITOR command to the redis server. 

714 next_command() method returns one command from monitor 

715 listen() method yields commands from monitor. 

716 """ 

717 

718 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)") 

719 command_re = re.compile(r'"(.*?)(?<!\\)"') 

720 

721 def __init__(self, connection_pool): 

722 self.connection_pool = connection_pool 

723 self.connection = self.connection_pool.get_connection() 

724 

725 def __enter__(self): 

726 self._start_monitor() 

727 return self 

728 

729 def __exit__(self, *args): 

730 self.connection.disconnect() 

731 self.connection_pool.release(self.connection) 

732 

733 def next_command(self): 

734 """Parse the response from a monitor command""" 

735 response = self.connection.read_response() 

736 

737 if response is None: 

738 return None 

739 

740 if isinstance(response, bytes): 

741 response = self.connection.encoder.decode(response, force=True) 

742 

743 command_time, command_data = response.split(" ", 1) 

744 m = self.monitor_re.match(command_data) 

745 db_id, client_info, command = m.groups() 

746 command = " ".join(self.command_re.findall(command)) 

747 # Redis escapes double quotes because each piece of the command 

748 # string is surrounded by double quotes. We don't have that 

749 # requirement so remove the escaping and leave the quote. 

750 command = command.replace('\\"', '"') 

751 

752 if client_info == "lua": 

753 client_address = "lua" 

754 client_port = "" 

755 client_type = "lua" 

756 elif client_info.startswith("unix"): 

757 client_address = "unix" 

758 client_port = client_info[5:] 

759 client_type = "unix" 

760 else: 

761 # use rsplit as ipv6 addresses contain colons 

762 client_address, client_port = client_info.rsplit(":", 1) 

763 client_type = "tcp" 

764 return { 

765 "time": float(command_time), 

766 "db": int(db_id), 

767 "client_address": client_address, 

768 "client_port": client_port, 

769 "client_type": client_type, 

770 "command": command, 

771 } 

772 

773 def listen(self): 

774 """Listen for commands coming to the server.""" 

775 while True: 

776 yield self.next_command() 

777 

778 def _start_monitor(self): 

779 self.connection.send_command("MONITOR") 

780 # check that monitor returns 'OK', but don't return it to user 

781 response = self.connection.read_response() 

782 

783 if not bool_ok(response): 

784 raise RedisError(f"MONITOR failed: {response}") 

785 

786 

787class PubSub: 

788 """ 

789 PubSub provides publish, subscribe and listen support to Redis channels. 

790 

791 After subscribing to one or more channels, the listen() method will block 

792 until a message arrives on one of the subscribed channels. That message 

793 will be returned and it's safe to start listening again. 

794 """ 

795 

796 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage") 

797 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe") 

798 HEALTH_CHECK_MESSAGE = "redis-py-health-check" 

799 

800 def __init__( 

801 self, 

802 connection_pool, 

803 shard_hint=None, 

804 ignore_subscribe_messages: bool = False, 

805 encoder: Optional["Encoder"] = None, 

806 push_handler_func: Union[None, Callable[[str], None]] = None, 

807 event_dispatcher: Optional["EventDispatcher"] = None, 

808 ): 

809 self.connection_pool = connection_pool 

810 self.shard_hint = shard_hint 

811 self.ignore_subscribe_messages = ignore_subscribe_messages 

812 self.connection = None 

813 self.subscribed_event = threading.Event() 

814 # we need to know the encoding options for this connection in order 

815 # to lookup channel and pattern names for callback handlers. 

816 self.encoder = encoder 

817 self.push_handler_func = push_handler_func 

818 if event_dispatcher is None: 

819 self._event_dispatcher = EventDispatcher() 

820 else: 

821 self._event_dispatcher = event_dispatcher 

822 

823 self._lock = threading.RLock() 

824 if self.encoder is None: 

825 self.encoder = self.connection_pool.get_encoder() 

826 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) 

827 if self.encoder.decode_responses: 

828 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] 

829 else: 

830 self.health_check_response = [b"pong", self.health_check_response_b] 

831 if self.push_handler_func is None: 

832 _set_info_logger() 

833 self.reset() 

834 

835 def __enter__(self) -> "PubSub": 

836 return self 

837 

838 def __exit__(self, exc_type, exc_value, traceback) -> None: 

839 self.reset() 

840 

841 def __del__(self) -> None: 

842 try: 

843 # if this object went out of scope prior to shutting down 

844 # subscriptions, close the connection manually before 

845 # returning it to the connection pool 

846 self.reset() 

847 except Exception: 

848 pass 

849 

850 def reset(self) -> None: 

851 if self.connection: 

852 self.connection.disconnect() 

853 self.connection.deregister_connect_callback(self.on_connect) 

854 self.connection_pool.release(self.connection) 

855 self.connection = None 

856 self.health_check_response_counter = 0 

857 self.channels = {} 

858 self.pending_unsubscribe_channels = set() 

859 self.shard_channels = {} 

860 self.pending_unsubscribe_shard_channels = set() 

861 self.patterns = {} 

862 self.pending_unsubscribe_patterns = set() 

863 self.subscribed_event.clear() 

864 

865 def close(self) -> None: 

866 self.reset() 

867 

868 def on_connect(self, connection) -> None: 

869 "Re-subscribe to any channels and patterns previously subscribed to" 

870 # NOTE: for python3, we can't pass bytestrings as keyword arguments 

871 # so we need to decode channel/pattern names back to unicode strings 

872 # before passing them to [p]subscribe. 

873 self.pending_unsubscribe_channels.clear() 

874 self.pending_unsubscribe_patterns.clear() 

875 self.pending_unsubscribe_shard_channels.clear() 

876 if self.channels: 

877 channels = { 

878 self.encoder.decode(k, force=True): v for k, v in self.channels.items() 

879 } 

880 self.subscribe(**channels) 

881 if self.patterns: 

882 patterns = { 

883 self.encoder.decode(k, force=True): v for k, v in self.patterns.items() 

884 } 

885 self.psubscribe(**patterns) 

886 if self.shard_channels: 

887 shard_channels = { 

888 self.encoder.decode(k, force=True): v 

889 for k, v in self.shard_channels.items() 

890 } 

891 self.ssubscribe(**shard_channels) 

892 

893 @property 

894 def subscribed(self) -> bool: 

895 """Indicates if there are subscriptions to any channels or patterns""" 

896 return self.subscribed_event.is_set() 

897 

898 def execute_command(self, *args): 

899 """Execute a publish/subscribe command""" 

900 

901 # NOTE: don't parse the response in this function -- it could pull a 

902 # legitimate message off the stack if the connection is already 

903 # subscribed to one or more channels 

904 

905 if self.connection is None: 

906 self.connection = self.connection_pool.get_connection() 

907 # register a callback that re-subscribes to any channels we 

908 # were listening to when we were disconnected 

909 self.connection.register_connect_callback(self.on_connect) 

910 if self.push_handler_func is not None: 

911 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

912 self._event_dispatcher.dispatch( 

913 AfterPubSubConnectionInstantiationEvent( 

914 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

915 ) 

916 ) 

917 connection = self.connection 

918 kwargs = {"check_health": not self.subscribed} 

919 if not self.subscribed: 

920 self.clean_health_check_responses() 

921 with self._lock: 

922 self._execute(connection, connection.send_command, *args, **kwargs) 

923 

924 def clean_health_check_responses(self) -> None: 

925 """ 

926 If any health check responses are present, clean them 

927 """ 

928 ttl = 10 

929 conn = self.connection 

930 while conn and self.health_check_response_counter > 0 and ttl > 0: 

931 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout): 

932 response = self._execute(conn, conn.read_response) 

933 if self.is_health_check_response(response): 

934 self.health_check_response_counter -= 1 

935 else: 

936 raise PubSubError( 

937 "A non health check response was cleaned by " 

938 "execute_command: {}".format(response) 

939 ) 

940 ttl -= 1 

941 

942 def _reconnect(self, conn) -> None: 

943 """ 

944 The supported exceptions are already checked in the 

945 retry object so we don't need to do it here. 

946 

947 In this error handler we are trying to reconnect to the server. 

948 """ 

949 conn.disconnect() 

950 conn.connect() 

951 

952 def _execute(self, conn, command, *args, **kwargs): 

953 """ 

954 Connect manually upon disconnection. If the Redis server is down, 

955 this will fail and raise a ConnectionError as desired. 

956 After reconnection, the ``on_connect`` callback should have been 

957 called by the # connection to resubscribe us to any channels and 

958 patterns we were previously listening to 

959 """ 

960 

961 if conn.should_reconnect(): 

962 self._reconnect(conn) 

963 

964 response = conn.retry.call_with_retry( 

965 lambda: command(*args, **kwargs), 

966 lambda _: self._reconnect(conn), 

967 ) 

968 

969 return response 

970 

971 def parse_response(self, block=True, timeout=0): 

972 """Parse the response from a publish/subscribe command""" 

973 conn = self.connection 

974 if conn is None: 

975 raise RuntimeError( 

976 "pubsub connection not set: " 

977 "did you forget to call subscribe() or psubscribe()?" 

978 ) 

979 

980 self.check_health() 

981 

982 def try_read(): 

983 if not block: 

984 if not conn.can_read(timeout=timeout): 

985 return None 

986 else: 

987 conn.connect() 

988 return conn.read_response(disconnect_on_error=False, push_request=True) 

989 

990 response = self._execute(conn, try_read) 

991 

992 if self.is_health_check_response(response): 

993 # ignore the health check message as user might not expect it 

994 self.health_check_response_counter -= 1 

995 return None 

996 return response 

997 

998 def is_health_check_response(self, response) -> bool: 

999 """ 

1000 Check if the response is a health check response. 

1001 If there are no subscriptions redis responds to PING command with a 

1002 bulk response, instead of a multi-bulk with "pong" and the response. 

1003 """ 

1004 return response in [ 

1005 self.health_check_response, # If there was a subscription 

1006 self.health_check_response_b, # If there wasn't 

1007 ] 

1008 

1009 def check_health(self) -> None: 

1010 conn = self.connection 

1011 if conn is None: 

1012 raise RuntimeError( 

1013 "pubsub connection not set: " 

1014 "did you forget to call subscribe() or psubscribe()?" 

1015 ) 

1016 

1017 if conn.health_check_interval and time.monotonic() > conn.next_health_check: 

1018 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False) 

1019 self.health_check_response_counter += 1 

1020 

1021 def _normalize_keys(self, data) -> Dict: 

1022 """ 

1023 normalize channel/pattern names to be either bytes or strings 

1024 based on whether responses are automatically decoded. this saves us 

1025 from coercing the value for each message coming in. 

1026 """ 

1027 encode = self.encoder.encode 

1028 decode = self.encoder.decode 

1029 return {decode(encode(k)): v for k, v in data.items()} 

1030 

1031 def psubscribe(self, *args, **kwargs): 

1032 """ 

1033 Subscribe to channel patterns. Patterns supplied as keyword arguments 

1034 expect a pattern name as the key and a callable as the value. A 

1035 pattern's callable will be invoked automatically when a message is 

1036 received on that pattern rather than producing a message via 

1037 ``listen()``. 

1038 """ 

1039 if args: 

1040 args = list_or_args(args[0], args[1:]) 

1041 new_patterns = dict.fromkeys(args) 

1042 new_patterns.update(kwargs) 

1043 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys()) 

1044 # update the patterns dict AFTER we send the command. we don't want to 

1045 # subscribe twice to these patterns, once for the command and again 

1046 # for the reconnection. 

1047 new_patterns = self._normalize_keys(new_patterns) 

1048 self.patterns.update(new_patterns) 

1049 if not self.subscribed: 

1050 # Set the subscribed_event flag to True 

1051 self.subscribed_event.set() 

1052 # Clear the health check counter 

1053 self.health_check_response_counter = 0 

1054 self.pending_unsubscribe_patterns.difference_update(new_patterns) 

1055 return ret_val 

1056 

1057 def punsubscribe(self, *args): 

1058 """ 

1059 Unsubscribe from the supplied patterns. If empty, unsubscribe from 

1060 all patterns. 

1061 """ 

1062 if args: 

1063 args = list_or_args(args[0], args[1:]) 

1064 patterns = self._normalize_keys(dict.fromkeys(args)) 

1065 else: 

1066 patterns = self.patterns 

1067 self.pending_unsubscribe_patterns.update(patterns) 

1068 return self.execute_command("PUNSUBSCRIBE", *args) 

1069 

1070 def subscribe(self, *args, **kwargs): 

1071 """ 

1072 Subscribe to channels. Channels supplied as keyword arguments expect 

1073 a channel name as the key and a callable as the value. A channel's 

1074 callable will be invoked automatically when a message is received on 

1075 that channel rather than producing a message via ``listen()`` or 

1076 ``get_message()``. 

1077 """ 

1078 if args: 

1079 args = list_or_args(args[0], args[1:]) 

1080 new_channels = dict.fromkeys(args) 

1081 new_channels.update(kwargs) 

1082 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys()) 

1083 # update the channels dict AFTER we send the command. we don't want to 

1084 # subscribe twice to these channels, once for the command and again 

1085 # for the reconnection. 

1086 new_channels = self._normalize_keys(new_channels) 

1087 self.channels.update(new_channels) 

1088 if not self.subscribed: 

1089 # Set the subscribed_event flag to True 

1090 self.subscribed_event.set() 

1091 # Clear the health check counter 

1092 self.health_check_response_counter = 0 

1093 self.pending_unsubscribe_channels.difference_update(new_channels) 

1094 return ret_val 

1095 

1096 def unsubscribe(self, *args): 

1097 """ 

1098 Unsubscribe from the supplied channels. If empty, unsubscribe from 

1099 all channels 

1100 """ 

1101 if args: 

1102 args = list_or_args(args[0], args[1:]) 

1103 channels = self._normalize_keys(dict.fromkeys(args)) 

1104 else: 

1105 channels = self.channels 

1106 self.pending_unsubscribe_channels.update(channels) 

1107 return self.execute_command("UNSUBSCRIBE", *args) 

1108 

1109 def ssubscribe(self, *args, target_node=None, **kwargs): 

1110 """ 

1111 Subscribes the client to the specified shard channels. 

1112 Channels supplied as keyword arguments expect a channel name as the key 

1113 and a callable as the value. A channel's callable will be invoked automatically 

1114 when a message is received on that channel rather than producing a message via 

1115 ``listen()`` or ``get_sharded_message()``. 

1116 """ 

1117 if args: 

1118 args = list_or_args(args[0], args[1:]) 

1119 new_s_channels = dict.fromkeys(args) 

1120 new_s_channels.update(kwargs) 

1121 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys()) 

1122 # update the s_channels dict AFTER we send the command. we don't want to 

1123 # subscribe twice to these channels, once for the command and again 

1124 # for the reconnection. 

1125 new_s_channels = self._normalize_keys(new_s_channels) 

1126 self.shard_channels.update(new_s_channels) 

1127 if not self.subscribed: 

1128 # Set the subscribed_event flag to True 

1129 self.subscribed_event.set() 

1130 # Clear the health check counter 

1131 self.health_check_response_counter = 0 

1132 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels) 

1133 return ret_val 

1134 

1135 def sunsubscribe(self, *args, target_node=None): 

1136 """ 

1137 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from 

1138 all shard_channels 

1139 """ 

1140 if args: 

1141 args = list_or_args(args[0], args[1:]) 

1142 s_channels = self._normalize_keys(dict.fromkeys(args)) 

1143 else: 

1144 s_channels = self.shard_channels 

1145 self.pending_unsubscribe_shard_channels.update(s_channels) 

1146 return self.execute_command("SUNSUBSCRIBE", *args) 

1147 

1148 def listen(self): 

1149 "Listen for messages on channels this client has been subscribed to" 

1150 while self.subscribed: 

1151 response = self.handle_message(self.parse_response(block=True)) 

1152 if response is not None: 

1153 yield response 

1154 

1155 def get_message( 

1156 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0 

1157 ): 

1158 """ 

1159 Get the next message if one is available, otherwise None. 

1160 

1161 If timeout is specified, the system will wait for `timeout` seconds 

1162 before returning. Timeout should be specified as a floating point 

1163 number, or None, to wait indefinitely. 

1164 """ 

1165 if not self.subscribed: 

1166 # Wait for subscription 

1167 start_time = time.monotonic() 

1168 if self.subscribed_event.wait(timeout) is True: 

1169 # The connection was subscribed during the timeout time frame. 

1170 # The timeout should be adjusted based on the time spent 

1171 # waiting for the subscription 

1172 time_spent = time.monotonic() - start_time 

1173 timeout = max(0.0, timeout - time_spent) 

1174 else: 

1175 # The connection isn't subscribed to any channels or patterns, 

1176 # so no messages are available 

1177 return None 

1178 

1179 response = self.parse_response(block=(timeout is None), timeout=timeout) 

1180 

1181 if response: 

1182 return self.handle_message(response, ignore_subscribe_messages) 

1183 return None 

1184 

1185 get_sharded_message = get_message 

1186 

1187 def ping(self, message: Union[str, None] = None) -> bool: 

1188 """ 

1189 Ping the Redis server to test connectivity. 

1190 

1191 Sends a PING command to the Redis server and returns True if the server 

1192 responds with "PONG". 

1193 """ 

1194 args = ["PING", message] if message is not None else ["PING"] 

1195 return self.execute_command(*args) 

1196 

1197 def handle_message(self, response, ignore_subscribe_messages=False): 

1198 """ 

1199 Parses a pub/sub message. If the channel or pattern was subscribed to 

1200 with a message handler, the handler is invoked instead of a parsed 

1201 message being returned. 

1202 """ 

1203 if response is None: 

1204 return None 

1205 if isinstance(response, bytes): 

1206 response = [b"pong", response] if response != b"PONG" else [b"pong", b""] 

1207 

1208 message_type = str_if_bytes(response[0]) 

1209 if message_type == "pmessage": 

1210 message = { 

1211 "type": message_type, 

1212 "pattern": response[1], 

1213 "channel": response[2], 

1214 "data": response[3], 

1215 } 

1216 elif message_type == "pong": 

1217 message = { 

1218 "type": message_type, 

1219 "pattern": None, 

1220 "channel": None, 

1221 "data": response[1], 

1222 } 

1223 else: 

1224 message = { 

1225 "type": message_type, 

1226 "pattern": None, 

1227 "channel": response[1], 

1228 "data": response[2], 

1229 } 

1230 

1231 # if this is an unsubscribe message, remove it from memory 

1232 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 

1233 if message_type == "punsubscribe": 

1234 pattern = response[1] 

1235 if pattern in self.pending_unsubscribe_patterns: 

1236 self.pending_unsubscribe_patterns.remove(pattern) 

1237 self.patterns.pop(pattern, None) 

1238 elif message_type == "sunsubscribe": 

1239 s_channel = response[1] 

1240 if s_channel in self.pending_unsubscribe_shard_channels: 

1241 self.pending_unsubscribe_shard_channels.remove(s_channel) 

1242 self.shard_channels.pop(s_channel, None) 

1243 else: 

1244 channel = response[1] 

1245 if channel in self.pending_unsubscribe_channels: 

1246 self.pending_unsubscribe_channels.remove(channel) 

1247 self.channels.pop(channel, None) 

1248 if not self.channels and not self.patterns and not self.shard_channels: 

1249 # There are no subscriptions anymore, set subscribed_event flag 

1250 # to false 

1251 self.subscribed_event.clear() 

1252 

1253 if message_type in self.PUBLISH_MESSAGE_TYPES: 

1254 # if there's a message handler, invoke it 

1255 if message_type == "pmessage": 

1256 handler = self.patterns.get(message["pattern"], None) 

1257 elif message_type == "smessage": 

1258 handler = self.shard_channels.get(message["channel"], None) 

1259 else: 

1260 handler = self.channels.get(message["channel"], None) 

1261 if handler: 

1262 handler(message) 

1263 return None 

1264 elif message_type != "pong": 

1265 # this is a subscribe/unsubscribe message. ignore if we don't 

1266 # want them 

1267 if ignore_subscribe_messages or self.ignore_subscribe_messages: 

1268 return None 

1269 

1270 return message 

1271 

1272 def run_in_thread( 

1273 self, 

1274 sleep_time: float = 0.0, 

1275 daemon: bool = False, 

1276 exception_handler: Optional[Callable] = None, 

1277 pubsub=None, 

1278 sharded_pubsub: bool = False, 

1279 ) -> "PubSubWorkerThread": 

1280 for channel, handler in self.channels.items(): 

1281 if handler is None: 

1282 raise PubSubError(f"Channel: '{channel}' has no handler registered") 

1283 for pattern, handler in self.patterns.items(): 

1284 if handler is None: 

1285 raise PubSubError(f"Pattern: '{pattern}' has no handler registered") 

1286 for s_channel, handler in self.shard_channels.items(): 

1287 if handler is None: 

1288 raise PubSubError( 

1289 f"Shard Channel: '{s_channel}' has no handler registered" 

1290 ) 

1291 

1292 pubsub = self if pubsub is None else pubsub 

1293 thread = PubSubWorkerThread( 

1294 pubsub, 

1295 sleep_time, 

1296 daemon=daemon, 

1297 exception_handler=exception_handler, 

1298 sharded_pubsub=sharded_pubsub, 

1299 ) 

1300 thread.start() 

1301 return thread 

1302 

1303 

1304class PubSubWorkerThread(threading.Thread): 

1305 def __init__( 

1306 self, 

1307 pubsub, 

1308 sleep_time: float, 

1309 daemon: bool = False, 

1310 exception_handler: Union[ 

1311 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None 

1312 ] = None, 

1313 sharded_pubsub: bool = False, 

1314 ): 

1315 super().__init__() 

1316 self.daemon = daemon 

1317 self.pubsub = pubsub 

1318 self.sleep_time = sleep_time 

1319 self.exception_handler = exception_handler 

1320 self.sharded_pubsub = sharded_pubsub 

1321 self._running = threading.Event() 

1322 

1323 def run(self) -> None: 

1324 if self._running.is_set(): 

1325 return 

1326 self._running.set() 

1327 pubsub = self.pubsub 

1328 sleep_time = self.sleep_time 

1329 while self._running.is_set(): 

1330 try: 

1331 if not self.sharded_pubsub: 

1332 pubsub.get_message( 

1333 ignore_subscribe_messages=True, timeout=sleep_time 

1334 ) 

1335 else: 

1336 pubsub.get_sharded_message( 

1337 ignore_subscribe_messages=True, timeout=sleep_time 

1338 ) 

1339 except BaseException as e: 

1340 if self.exception_handler is None: 

1341 raise 

1342 self.exception_handler(e, pubsub, self) 

1343 pubsub.close() 

1344 

1345 def stop(self) -> None: 

1346 # trip the flag so the run loop exits. the run loop will 

1347 # close the pubsub connection, which disconnects the socket 

1348 # and returns the connection to the pool. 

1349 self._running.clear() 

1350 

1351 

1352class Pipeline(Redis): 

1353 """ 

1354 Pipelines provide a way to transmit multiple commands to the Redis server 

1355 in one transmission. This is convenient for batch processing, such as 

1356 saving all the values in a list to Redis. 

1357 

1358 All commands executed within a pipeline(when running in transactional mode, 

1359 which is the default behavior) are wrapped with MULTI and EXEC 

1360 calls. This guarantees all commands executed in the pipeline will be 

1361 executed atomically. 

1362 

1363 Any command raising an exception does *not* halt the execution of 

1364 subsequent commands in the pipeline. Instead, the exception is caught 

1365 and its instance is placed into the response list returned by execute(). 

1366 Code iterating over the response list should be able to deal with an 

1367 instance of an exception as a potential value. In general, these will be 

1368 ResponseError exceptions, such as those raised when issuing a command 

1369 on a key of a different datatype. 

1370 """ 

1371 

1372 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

1373 

1374 def __init__( 

1375 self, 

1376 connection_pool: ConnectionPool, 

1377 response_callbacks, 

1378 transaction, 

1379 shard_hint, 

1380 ): 

1381 self.connection_pool = connection_pool 

1382 self.connection: Optional[Connection] = None 

1383 self.response_callbacks = response_callbacks 

1384 self.transaction = transaction 

1385 self.shard_hint = shard_hint 

1386 self.watching = False 

1387 self.command_stack = [] 

1388 self.scripts: Set[Script] = set() 

1389 self.explicit_transaction = False 

1390 

1391 def __enter__(self) -> "Pipeline": 

1392 return self 

1393 

1394 def __exit__(self, exc_type, exc_value, traceback): 

1395 self.reset() 

1396 

1397 def __del__(self): 

1398 try: 

1399 self.reset() 

1400 except Exception: 

1401 pass 

1402 

1403 def __len__(self) -> int: 

1404 return len(self.command_stack) 

1405 

1406 def __bool__(self) -> bool: 

1407 """Pipeline instances should always evaluate to True""" 

1408 return True 

1409 

1410 def reset(self) -> None: 

1411 self.command_stack = [] 

1412 self.scripts = set() 

1413 # make sure to reset the connection state in the event that we were 

1414 # watching something 

1415 if self.watching and self.connection: 

1416 try: 

1417 # call this manually since our unwatch or 

1418 # immediate_execute_command methods can call reset() 

1419 self.connection.send_command("UNWATCH") 

1420 self.connection.read_response() 

1421 except ConnectionError: 

1422 # disconnect will also remove any previous WATCHes 

1423 self.connection.disconnect() 

1424 # clean up the other instance attributes 

1425 self.watching = False 

1426 self.explicit_transaction = False 

1427 

1428 # we can safely return the connection to the pool here since we're 

1429 # sure we're no longer WATCHing anything 

1430 if self.connection: 

1431 self.connection_pool.release(self.connection) 

1432 self.connection = None 

1433 

1434 def close(self) -> None: 

1435 """Close the pipeline""" 

1436 self.reset() 

1437 

1438 def multi(self) -> None: 

1439 """ 

1440 Start a transactional block of the pipeline after WATCH commands 

1441 are issued. End the transactional block with `execute`. 

1442 """ 

1443 if self.explicit_transaction: 

1444 raise RedisError("Cannot issue nested calls to MULTI") 

1445 if self.command_stack: 

1446 raise RedisError( 

1447 "Commands without an initial WATCH have already been issued" 

1448 ) 

1449 self.explicit_transaction = True 

1450 

1451 def execute_command(self, *args, **kwargs): 

1452 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction: 

1453 return self.immediate_execute_command(*args, **kwargs) 

1454 return self.pipeline_execute_command(*args, **kwargs) 

1455 

1456 def _disconnect_reset_raise_on_watching( 

1457 self, 

1458 conn: AbstractConnection, 

1459 error: Exception, 

1460 ) -> None: 

1461 """ 

1462 Close the connection reset watching state and 

1463 raise an exception if we were watching. 

1464 

1465 The supported exceptions are already checked in the 

1466 retry object so we don't need to do it here. 

1467 

1468 After we disconnect the connection, it will try to reconnect and 

1469 do a health check as part of the send_command logic(on connection level). 

1470 """ 

1471 conn.disconnect() 

1472 

1473 # if we were already watching a variable, the watch is no longer 

1474 # valid since this connection has died. raise a WatchError, which 

1475 # indicates the user should retry this transaction. 

1476 if self.watching: 

1477 self.reset() 

1478 raise WatchError( 

1479 f"A {type(error).__name__} occurred while watching one or more keys" 

1480 ) 

1481 

1482 def immediate_execute_command(self, *args, **options): 

1483 """ 

1484 Execute a command immediately, but don't auto-retry on the supported 

1485 errors for retry if we're already WATCHing a variable. 

1486 Used when issuing WATCH or subsequent commands retrieving their values but before 

1487 MULTI is called. 

1488 """ 

1489 command_name = args[0] 

1490 conn = self.connection 

1491 # if this is the first call, we need a connection 

1492 if not conn: 

1493 conn = self.connection_pool.get_connection() 

1494 self.connection = conn 

1495 

1496 return conn.retry.call_with_retry( 

1497 lambda: self._send_command_parse_response( 

1498 conn, command_name, *args, **options 

1499 ), 

1500 lambda error: self._disconnect_reset_raise_on_watching(conn, error), 

1501 ) 

1502 

1503 def pipeline_execute_command(self, *args, **options) -> "Pipeline": 

1504 """ 

1505 Stage a command to be executed when execute() is next called 

1506 

1507 Returns the current Pipeline object back so commands can be 

1508 chained together, such as: 

1509 

1510 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

1511 

1512 At some other point, you can then run: pipe.execute(), 

1513 which will execute all commands queued in the pipe. 

1514 """ 

1515 self.command_stack.append((args, options)) 

1516 return self 

1517 

1518 def _execute_transaction( 

1519 self, connection: Connection, commands, raise_on_error 

1520 ) -> List: 

1521 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) 

1522 all_cmds = connection.pack_commands( 

1523 [args for args, options in cmds if EMPTY_RESPONSE not in options] 

1524 ) 

1525 connection.send_packed_command(all_cmds) 

1526 errors = [] 

1527 

1528 # parse off the response for MULTI 

1529 # NOTE: we need to handle ResponseErrors here and continue 

1530 # so that we read all the additional command messages from 

1531 # the socket 

1532 try: 

1533 self.parse_response(connection, "_") 

1534 except ResponseError as e: 

1535 errors.append((0, e)) 

1536 

1537 # and all the other commands 

1538 for i, command in enumerate(commands): 

1539 if EMPTY_RESPONSE in command[1]: 

1540 errors.append((i, command[1][EMPTY_RESPONSE])) 

1541 else: 

1542 try: 

1543 self.parse_response(connection, "_") 

1544 except ResponseError as e: 

1545 self.annotate_exception(e, i + 1, command[0]) 

1546 errors.append((i, e)) 

1547 

1548 # parse the EXEC. 

1549 try: 

1550 response = self.parse_response(connection, "_") 

1551 except ExecAbortError: 

1552 if errors: 

1553 raise errors[0][1] 

1554 raise 

1555 

1556 # EXEC clears any watched keys 

1557 self.watching = False 

1558 

1559 if response is None: 

1560 raise WatchError("Watched variable changed.") 

1561 

1562 # put any parse errors into the response 

1563 for i, e in errors: 

1564 response.insert(i, e) 

1565 

1566 if len(response) != len(commands): 

1567 self.connection.disconnect() 

1568 raise ResponseError( 

1569 "Wrong number of response items from pipeline execution" 

1570 ) 

1571 

1572 # find any errors in the response and raise if necessary 

1573 if raise_on_error: 

1574 self.raise_first_error(commands, response) 

1575 

1576 # We have to run response callbacks manually 

1577 data = [] 

1578 for r, cmd in zip(response, commands): 

1579 if not isinstance(r, Exception): 

1580 args, options = cmd 

1581 # Remove keys entry, it needs only for cache. 

1582 options.pop("keys", None) 

1583 command_name = args[0] 

1584 if command_name in self.response_callbacks: 

1585 r = self.response_callbacks[command_name](r, **options) 

1586 data.append(r) 

1587 

1588 return data 

1589 

1590 def _execute_pipeline(self, connection, commands, raise_on_error): 

1591 # build up all commands into a single request to increase network perf 

1592 all_cmds = connection.pack_commands([args for args, _ in commands]) 

1593 connection.send_packed_command(all_cmds) 

1594 

1595 responses = [] 

1596 for args, options in commands: 

1597 try: 

1598 responses.append(self.parse_response(connection, args[0], **options)) 

1599 except ResponseError as e: 

1600 responses.append(e) 

1601 

1602 if raise_on_error: 

1603 self.raise_first_error(commands, responses) 

1604 

1605 return responses 

1606 

1607 def raise_first_error(self, commands, response): 

1608 for i, r in enumerate(response): 

1609 if isinstance(r, ResponseError): 

1610 self.annotate_exception(r, i + 1, commands[i][0]) 

1611 raise r 

1612 

1613 def annotate_exception(self, exception, number, command): 

1614 cmd = " ".join(map(safe_str, command)) 

1615 msg = ( 

1616 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

1617 f"caused error: {exception.args[0]}" 

1618 ) 

1619 exception.args = (msg,) + exception.args[1:] 

1620 

1621 def parse_response(self, connection, command_name, **options): 

1622 result = Redis.parse_response(self, connection, command_name, **options) 

1623 if command_name in self.UNWATCH_COMMANDS: 

1624 self.watching = False 

1625 elif command_name == "WATCH": 

1626 self.watching = True 

1627 return result 

1628 

1629 def load_scripts(self): 

1630 # make sure all scripts that are about to be run on this pipeline exist 

1631 scripts = list(self.scripts) 

1632 immediate = self.immediate_execute_command 

1633 shas = [s.sha for s in scripts] 

1634 # we can't use the normal script_* methods because they would just 

1635 # get buffered in the pipeline. 

1636 exists = immediate("SCRIPT EXISTS", *shas) 

1637 if not all(exists): 

1638 for s, exist in zip(scripts, exists): 

1639 if not exist: 

1640 s.sha = immediate("SCRIPT LOAD", s.script) 

1641 

1642 def _disconnect_raise_on_watching( 

1643 self, 

1644 conn: AbstractConnection, 

1645 error: Exception, 

1646 ) -> None: 

1647 """ 

1648 Close the connection, raise an exception if we were watching. 

1649 

1650 The supported exceptions are already checked in the 

1651 retry object so we don't need to do it here. 

1652 

1653 After we disconnect the connection, it will try to reconnect and 

1654 do a health check as part of the send_command logic(on connection level). 

1655 """ 

1656 conn.disconnect() 

1657 # if we were watching a variable, the watch is no longer valid 

1658 # since this connection has died. raise a WatchError, which 

1659 # indicates the user should retry this transaction. 

1660 if self.watching: 

1661 raise WatchError( 

1662 f"A {type(error).__name__} occurred while watching one or more keys" 

1663 ) 

1664 

1665 def execute(self, raise_on_error: bool = True) -> List[Any]: 

1666 """Execute all the commands in the current pipeline""" 

1667 stack = self.command_stack 

1668 if not stack and not self.watching: 

1669 return [] 

1670 if self.scripts: 

1671 self.load_scripts() 

1672 if self.transaction or self.explicit_transaction: 

1673 execute = self._execute_transaction 

1674 else: 

1675 execute = self._execute_pipeline 

1676 

1677 conn = self.connection 

1678 if not conn: 

1679 conn = self.connection_pool.get_connection() 

1680 # assign to self.connection so reset() releases the connection 

1681 # back to the pool after we're done 

1682 self.connection = conn 

1683 

1684 try: 

1685 return conn.retry.call_with_retry( 

1686 lambda: execute(conn, stack, raise_on_error), 

1687 lambda error: self._disconnect_raise_on_watching(conn, error), 

1688 ) 

1689 finally: 

1690 # in reset() the connection is disconnected before returned to the pool if 

1691 # it is marked for reconnect. 

1692 self.reset() 

1693 

1694 def discard(self): 

1695 """ 

1696 Flushes all previously queued commands 

1697 See: https://redis.io/commands/DISCARD 

1698 """ 

1699 self.execute_command("DISCARD") 

1700 

1701 def watch(self, *names): 

1702 """Watches the values at keys ``names``""" 

1703 if self.explicit_transaction: 

1704 raise RedisError("Cannot issue a WATCH after a MULTI") 

1705 return self.execute_command("WATCH", *names) 

1706 

1707 def unwatch(self) -> bool: 

1708 """Unwatches all previously specified keys""" 

1709 return self.watching and self.execute_command("UNWATCH") or True