Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

707 statements  

1import copy 

2import re 

3import threading 

4import time 

5from itertools import chain 

6from typing import ( 

7 TYPE_CHECKING, 

8 Any, 

9 Callable, 

10 Dict, 

11 List, 

12 Mapping, 

13 Optional, 

14 Set, 

15 Type, 

16 Union, 

17) 

18 

19from redis._parsers.encoders import Encoder 

20from redis._parsers.helpers import ( 

21 _RedisCallbacks, 

22 _RedisCallbacksRESP2, 

23 _RedisCallbacksRESP3, 

24 bool_ok, 

25) 

26from redis.backoff import ExponentialWithJitterBackoff 

27from redis.cache import CacheConfig, CacheInterface 

28from redis.commands import ( 

29 CoreCommands, 

30 RedisModuleCommands, 

31 SentinelCommands, 

32 list_or_args, 

33) 

34from redis.commands.core import Script 

35from redis.connection import ( 

36 AbstractConnection, 

37 Connection, 

38 ConnectionPool, 

39 SSLConnection, 

40 UnixDomainSocketConnection, 

41) 

42from redis.credentials import CredentialProvider 

43from redis.event import ( 

44 AfterPooledConnectionsInstantiationEvent, 

45 AfterPubSubConnectionInstantiationEvent, 

46 AfterSingleConnectionInstantiationEvent, 

47 ClientType, 

48 EventDispatcher, 

49) 

50from redis.exceptions import ( 

51 ConnectionError, 

52 ExecAbortError, 

53 PubSubError, 

54 RedisError, 

55 ResponseError, 

56 WatchError, 

57) 

58from redis.lock import Lock 

59from redis.maint_notifications import ( 

60 MaintNotificationsConfig, 

61) 

62from redis.retry import Retry 

63from redis.utils import ( 

64 _set_info_logger, 

65 deprecated_args, 

66 get_lib_version, 

67 safe_str, 

68 str_if_bytes, 

69 truncate_text, 

70) 

71 

72if TYPE_CHECKING: 

73 import ssl 

74 

75 import OpenSSL 

76 

77SYM_EMPTY = b"" 

78EMPTY_RESPONSE = "EMPTY_RESPONSE" 

79 

80# some responses (ie. dump) are binary, and just meant to never be decoded 

81NEVER_DECODE = "NEVER_DECODE" 

82 

83 

84class CaseInsensitiveDict(dict): 

85 "Case insensitive dict implementation. Assumes string keys only." 

86 

87 def __init__(self, data: Dict[str, str]) -> None: 

88 for k, v in data.items(): 

89 self[k.upper()] = v 

90 

91 def __contains__(self, k): 

92 return super().__contains__(k.upper()) 

93 

94 def __delitem__(self, k): 

95 super().__delitem__(k.upper()) 

96 

97 def __getitem__(self, k): 

98 return super().__getitem__(k.upper()) 

99 

100 def get(self, k, default=None): 

101 return super().get(k.upper(), default) 

102 

103 def __setitem__(self, k, v): 

104 super().__setitem__(k.upper(), v) 

105 

106 def update(self, data): 

107 data = CaseInsensitiveDict(data) 

108 super().update(data) 

109 

110 

111class AbstractRedis: 

112 pass 

113 

114 

115class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): 

116 """ 

117 Implementation of the Redis protocol. 

118 

119 This abstract class provides a Python interface to all Redis commands 

120 and an implementation of the Redis protocol. 

121 

122 Pipelines derive from this, implementing how 

123 the commands are sent and received to the Redis server. Based on 

124 configuration, an instance will either use a ConnectionPool, or 

125 Connection object to talk to redis. 

126 

127 It is not safe to pass PubSub or Pipeline objects between threads. 

128 """ 

129 

130 @classmethod 

131 def from_url(cls, url: str, **kwargs) -> "Redis": 

132 """ 

133 Return a Redis client object configured from the given URL 

134 

135 For example:: 

136 

137 redis://[[username]:[password]]@localhost:6379/0 

138 rediss://[[username]:[password]]@localhost:6379/0 

139 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

140 

141 Three URL schemes are supported: 

142 

143 - `redis://` creates a TCP socket connection. See more at: 

144 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

145 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

146 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

147 - ``unix://``: creates a Unix Domain Socket connection. 

148 

149 The username, password, hostname, path and all querystring values 

150 are passed through urllib.parse.unquote in order to replace any 

151 percent-encoded values with their corresponding characters. 

152 

153 There are several ways to specify a database number. The first value 

154 found will be used: 

155 

156 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

157 2. If using the redis:// or rediss:// schemes, the path argument 

158 of the url, e.g. redis://localhost/0 

159 3. A ``db`` keyword argument to this function. 

160 

161 If none of these options are specified, the default db=0 is used. 

162 

163 All querystring options are cast to their appropriate Python types. 

164 Boolean arguments can be specified with string values "True"/"False" 

165 or "Yes"/"No". Values that cannot be properly cast cause a 

166 ``ValueError`` to be raised. Once parsed, the querystring arguments 

167 and keyword arguments are passed to the ``ConnectionPool``'s 

168 class initializer. In the case of conflicting arguments, querystring 

169 arguments always win. 

170 

171 """ 

172 single_connection_client = kwargs.pop("single_connection_client", False) 

173 connection_pool = ConnectionPool.from_url(url, **kwargs) 

174 client = cls( 

175 connection_pool=connection_pool, 

176 single_connection_client=single_connection_client, 

177 ) 

178 client.auto_close_connection_pool = True 

179 return client 

180 

181 @classmethod 

182 def from_pool( 

183 cls: Type["Redis"], 

184 connection_pool: ConnectionPool, 

185 ) -> "Redis": 

186 """ 

187 Return a Redis client from the given connection pool. 

188 The Redis client will take ownership of the connection pool and 

189 close it when the Redis client is closed. 

190 """ 

191 client = cls( 

192 connection_pool=connection_pool, 

193 ) 

194 client.auto_close_connection_pool = True 

195 return client 

196 

197 @deprecated_args( 

198 args_to_warn=["retry_on_timeout"], 

199 reason="TimeoutError is included by default.", 

200 version="6.0.0", 

201 ) 

202 def __init__( 

203 self, 

204 host: str = "localhost", 

205 port: int = 6379, 

206 db: int = 0, 

207 password: Optional[str] = None, 

208 socket_timeout: Optional[float] = None, 

209 socket_connect_timeout: Optional[float] = None, 

210 socket_keepalive: Optional[bool] = None, 

211 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, 

212 connection_pool: Optional[ConnectionPool] = None, 

213 unix_socket_path: Optional[str] = None, 

214 encoding: str = "utf-8", 

215 encoding_errors: str = "strict", 

216 decode_responses: bool = False, 

217 retry_on_timeout: bool = False, 

218 retry: Retry = Retry( 

219 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 

220 ), 

221 retry_on_error: Optional[List[Type[Exception]]] = None, 

222 ssl: bool = False, 

223 ssl_keyfile: Optional[str] = None, 

224 ssl_certfile: Optional[str] = None, 

225 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required", 

226 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

227 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

228 ssl_ca_certs: Optional[str] = None, 

229 ssl_ca_path: Optional[str] = None, 

230 ssl_ca_data: Optional[str] = None, 

231 ssl_check_hostname: bool = True, 

232 ssl_password: Optional[str] = None, 

233 ssl_validate_ocsp: bool = False, 

234 ssl_validate_ocsp_stapled: bool = False, 

235 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None, 

236 ssl_ocsp_expected_cert: Optional[str] = None, 

237 ssl_min_version: Optional["ssl.TLSVersion"] = None, 

238 ssl_ciphers: Optional[str] = None, 

239 max_connections: Optional[int] = None, 

240 single_connection_client: bool = False, 

241 health_check_interval: int = 0, 

242 client_name: Optional[str] = None, 

243 lib_name: Optional[str] = "redis-py", 

244 lib_version: Optional[str] = get_lib_version(), 

245 username: Optional[str] = None, 

246 redis_connect_func: Optional[Callable[[], None]] = None, 

247 credential_provider: Optional[CredentialProvider] = None, 

248 protocol: Optional[int] = 2, 

249 cache: Optional[CacheInterface] = None, 

250 cache_config: Optional[CacheConfig] = None, 

251 event_dispatcher: Optional[EventDispatcher] = None, 

252 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

253 ) -> None: 

254 """ 

255 Initialize a new Redis client. 

256 

257 To specify a retry policy for specific errors, you have two options: 

258 

259 1. Set the `retry_on_error` to a list of the error/s to retry on, and 

260 you can also set `retry` to a valid `Retry` object(in case the default 

261 one is not appropriate) - with this approach the retries will be triggered 

262 on the default errors specified in the Retry object enriched with the 

263 errors specified in `retry_on_error`. 

264 

265 2. Define a `Retry` object with configured 'supported_errors' and set 

266 it to the `retry` parameter - with this approach you completely redefine 

267 the errors on which retries will happen. 

268 

269 `retry_on_timeout` is deprecated - please include the TimeoutError 

270 either in the Retry object or in the `retry_on_error` list. 

271 

272 When 'connection_pool' is provided - the retry configuration of the 

273 provided pool will be used. 

274 

275 Args: 

276 

277 single_connection_client: 

278 if `True`, connection pool is not used. In that case `Redis` 

279 instance use is not thread safe. 

280 decode_responses: 

281 if `True`, the response will be decoded to utf-8. 

282 Argument is ignored when connection_pool is provided. 

283 maint_notifications_config: 

284 configuration the pool to support maintenance notifications - see 

285 `redis.maint_notifications.MaintNotificationsConfig` for details. 

286 Only supported with RESP3 

287 If not provided and protocol is RESP3, the maintenance notifications 

288 will be enabled by default (logic is included in the connection pool 

289 initialization). 

290 Argument is ignored when connection_pool is provided. 

291 """ 

292 if event_dispatcher is None: 

293 self._event_dispatcher = EventDispatcher() 

294 else: 

295 self._event_dispatcher = event_dispatcher 

296 if not connection_pool: 

297 if not retry_on_error: 

298 retry_on_error = [] 

299 kwargs = { 

300 "db": db, 

301 "username": username, 

302 "password": password, 

303 "socket_timeout": socket_timeout, 

304 "encoding": encoding, 

305 "encoding_errors": encoding_errors, 

306 "decode_responses": decode_responses, 

307 "retry_on_error": retry_on_error, 

308 "retry": copy.deepcopy(retry), 

309 "max_connections": max_connections, 

310 "health_check_interval": health_check_interval, 

311 "client_name": client_name, 

312 "lib_name": lib_name, 

313 "lib_version": lib_version, 

314 "redis_connect_func": redis_connect_func, 

315 "credential_provider": credential_provider, 

316 "protocol": protocol, 

317 } 

318 # based on input, setup appropriate connection args 

319 if unix_socket_path is not None: 

320 kwargs.update( 

321 { 

322 "path": unix_socket_path, 

323 "connection_class": UnixDomainSocketConnection, 

324 } 

325 ) 

326 else: 

327 # TCP specific options 

328 kwargs.update( 

329 { 

330 "host": host, 

331 "port": port, 

332 "socket_connect_timeout": socket_connect_timeout, 

333 "socket_keepalive": socket_keepalive, 

334 "socket_keepalive_options": socket_keepalive_options, 

335 } 

336 ) 

337 

338 if ssl: 

339 kwargs.update( 

340 { 

341 "connection_class": SSLConnection, 

342 "ssl_keyfile": ssl_keyfile, 

343 "ssl_certfile": ssl_certfile, 

344 "ssl_cert_reqs": ssl_cert_reqs, 

345 "ssl_include_verify_flags": ssl_include_verify_flags, 

346 "ssl_exclude_verify_flags": ssl_exclude_verify_flags, 

347 "ssl_ca_certs": ssl_ca_certs, 

348 "ssl_ca_data": ssl_ca_data, 

349 "ssl_check_hostname": ssl_check_hostname, 

350 "ssl_password": ssl_password, 

351 "ssl_ca_path": ssl_ca_path, 

352 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled, 

353 "ssl_validate_ocsp": ssl_validate_ocsp, 

354 "ssl_ocsp_context": ssl_ocsp_context, 

355 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert, 

356 "ssl_min_version": ssl_min_version, 

357 "ssl_ciphers": ssl_ciphers, 

358 } 

359 ) 

360 if (cache_config or cache) and protocol in [3, "3"]: 

361 kwargs.update( 

362 { 

363 "cache": cache, 

364 "cache_config": cache_config, 

365 } 

366 ) 

367 maint_notifications_enabled = ( 

368 maint_notifications_config and maint_notifications_config.enabled 

369 ) 

370 if maint_notifications_enabled and protocol not in [ 

371 3, 

372 "3", 

373 ]: 

374 raise RedisError( 

375 "Maintenance notifications handlers on connection are only supported with RESP version 3" 

376 ) 

377 if maint_notifications_config: 

378 kwargs.update( 

379 { 

380 "maint_notifications_config": maint_notifications_config, 

381 } 

382 ) 

383 connection_pool = ConnectionPool(**kwargs) 

384 self._event_dispatcher.dispatch( 

385 AfterPooledConnectionsInstantiationEvent( 

386 [connection_pool], ClientType.SYNC, credential_provider 

387 ) 

388 ) 

389 self.auto_close_connection_pool = True 

390 else: 

391 self.auto_close_connection_pool = False 

392 self._event_dispatcher.dispatch( 

393 AfterPooledConnectionsInstantiationEvent( 

394 [connection_pool], ClientType.SYNC, credential_provider 

395 ) 

396 ) 

397 

398 self.connection_pool = connection_pool 

399 

400 if (cache_config or cache) and self.connection_pool.get_protocol() not in [ 

401 3, 

402 "3", 

403 ]: 

404 raise RedisError("Client caching is only supported with RESP version 3") 

405 

406 self.single_connection_lock = threading.RLock() 

407 self.connection = None 

408 self._single_connection_client = single_connection_client 

409 if self._single_connection_client: 

410 self.connection = self.connection_pool.get_connection() 

411 self._event_dispatcher.dispatch( 

412 AfterSingleConnectionInstantiationEvent( 

413 self.connection, ClientType.SYNC, self.single_connection_lock 

414 ) 

415 ) 

416 

417 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks) 

418 

419 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]: 

420 self.response_callbacks.update(_RedisCallbacksRESP3) 

421 else: 

422 self.response_callbacks.update(_RedisCallbacksRESP2) 

423 

424 def __repr__(self) -> str: 

425 return ( 

426 f"<{type(self).__module__}.{type(self).__name__}" 

427 f"({repr(self.connection_pool)})>" 

428 ) 

429 

430 def get_encoder(self) -> "Encoder": 

431 """Get the connection pool's encoder""" 

432 return self.connection_pool.get_encoder() 

433 

434 def get_connection_kwargs(self) -> Dict: 

435 """Get the connection's key-word arguments""" 

436 return self.connection_pool.connection_kwargs 

437 

438 def get_retry(self) -> Optional[Retry]: 

439 return self.get_connection_kwargs().get("retry") 

440 

441 def set_retry(self, retry: Retry) -> None: 

442 self.get_connection_kwargs().update({"retry": retry}) 

443 self.connection_pool.set_retry(retry) 

444 

445 def set_response_callback(self, command: str, callback: Callable) -> None: 

446 """Set a custom Response Callback""" 

447 self.response_callbacks[command] = callback 

448 

449 def load_external_module(self, funcname, func) -> None: 

450 """ 

451 This function can be used to add externally defined redis modules, 

452 and their namespaces to the redis client. 

453 

454 funcname - A string containing the name of the function to create 

455 func - The function, being added to this class. 

456 

457 ex: Assume that one has a custom redis module named foomod that 

458 creates command named 'foo.dothing' and 'foo.anotherthing' in redis. 

459 To load function functions into this namespace: 

460 

461 from redis import Redis 

462 from foomodule import F 

463 r = Redis() 

464 r.load_external_module("foo", F) 

465 r.foo().dothing('your', 'arguments') 

466 

467 For a concrete example see the reimport of the redisjson module in 

468 tests/test_connection.py::test_loading_external_modules 

469 """ 

470 setattr(self, funcname, func) 

471 

472 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline": 

473 """ 

474 Return a new pipeline object that can queue multiple commands for 

475 later execution. ``transaction`` indicates whether all commands 

476 should be executed atomically. Apart from making a group of operations 

477 atomic, pipelines are useful for reducing the back-and-forth overhead 

478 between the client and server. 

479 """ 

480 return Pipeline( 

481 self.connection_pool, self.response_callbacks, transaction, shard_hint 

482 ) 

483 

484 def transaction( 

485 self, func: Callable[["Pipeline"], None], *watches, **kwargs 

486 ) -> Union[List[Any], Any, None]: 

487 """ 

488 Convenience method for executing the callable `func` as a transaction 

489 while watching all keys specified in `watches`. The 'func' callable 

490 should expect a single argument which is a Pipeline object. 

491 """ 

492 shard_hint = kwargs.pop("shard_hint", None) 

493 value_from_callable = kwargs.pop("value_from_callable", False) 

494 watch_delay = kwargs.pop("watch_delay", None) 

495 with self.pipeline(True, shard_hint) as pipe: 

496 while True: 

497 try: 

498 if watches: 

499 pipe.watch(*watches) 

500 func_value = func(pipe) 

501 exec_value = pipe.execute() 

502 return func_value if value_from_callable else exec_value 

503 except WatchError: 

504 if watch_delay is not None and watch_delay > 0: 

505 time.sleep(watch_delay) 

506 continue 

507 

508 def lock( 

509 self, 

510 name: str, 

511 timeout: Optional[float] = None, 

512 sleep: float = 0.1, 

513 blocking: bool = True, 

514 blocking_timeout: Optional[float] = None, 

515 lock_class: Union[None, Any] = None, 

516 thread_local: bool = True, 

517 raise_on_release_error: bool = True, 

518 ): 

519 """ 

520 Return a new Lock object using key ``name`` that mimics 

521 the behavior of threading.Lock. 

522 

523 If specified, ``timeout`` indicates a maximum life for the lock. 

524 By default, it will remain locked until release() is called. 

525 

526 ``sleep`` indicates the amount of time to sleep per loop iteration 

527 when the lock is in blocking mode and another client is currently 

528 holding the lock. 

529 

530 ``blocking`` indicates whether calling ``acquire`` should block until 

531 the lock has been acquired or to fail immediately, causing ``acquire`` 

532 to return False and the lock not being acquired. Defaults to True. 

533 Note this value can be overridden by passing a ``blocking`` 

534 argument to ``acquire``. 

535 

536 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

537 spend trying to acquire the lock. A value of ``None`` indicates 

538 continue trying forever. ``blocking_timeout`` can be specified as a 

539 float or integer, both representing the number of seconds to wait. 

540 

541 ``lock_class`` forces the specified lock implementation. Note that as 

542 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

543 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

544 you have created your own custom lock class. 

545 

546 ``thread_local`` indicates whether the lock token is placed in 

547 thread-local storage. By default, the token is placed in thread local 

548 storage so that a thread only sees its token, not a token set by 

549 another thread. Consider the following timeline: 

550 

551 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

552 thread-1 sets the token to "abc" 

553 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

554 Lock instance. 

555 time: 5, thread-1 has not yet completed. redis expires the lock 

556 key. 

557 time: 5, thread-2 acquired `my-lock` now that it's available. 

558 thread-2 sets the token to "xyz" 

559 time: 6, thread-1 finishes its work and calls release(). if the 

560 token is *not* stored in thread local storage, then 

561 thread-1 would see the token value as "xyz" and would be 

562 able to successfully release the thread-2's lock. 

563 

564 ``raise_on_release_error`` indicates whether to raise an exception when 

565 the lock is no longer owned when exiting the context manager. By default, 

566 this is True, meaning an exception will be raised. If False, the warning 

567 will be logged and the exception will be suppressed. 

568 

569 In some use cases it's necessary to disable thread local storage. For 

570 example, if you have code where one thread acquires a lock and passes 

571 that lock instance to a worker thread to release later. If thread 

572 local storage isn't disabled in this case, the worker thread won't see 

573 the token set by the thread that acquired the lock. Our assumption 

574 is that these cases aren't common and as such default to using 

575 thread local storage.""" 

576 if lock_class is None: 

577 lock_class = Lock 

578 return lock_class( 

579 self, 

580 name, 

581 timeout=timeout, 

582 sleep=sleep, 

583 blocking=blocking, 

584 blocking_timeout=blocking_timeout, 

585 thread_local=thread_local, 

586 raise_on_release_error=raise_on_release_error, 

587 ) 

588 

589 def pubsub(self, **kwargs): 

590 """ 

591 Return a Publish/Subscribe object. With this object, you can 

592 subscribe to channels and listen for messages that get published to 

593 them. 

594 """ 

595 return PubSub( 

596 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs 

597 ) 

598 

599 def monitor(self): 

600 return Monitor(self.connection_pool) 

601 

602 def client(self): 

603 return self.__class__( 

604 connection_pool=self.connection_pool, 

605 single_connection_client=True, 

606 ) 

607 

608 def __enter__(self): 

609 return self 

610 

611 def __exit__(self, exc_type, exc_value, traceback): 

612 self.close() 

613 

614 def __del__(self): 

615 try: 

616 self.close() 

617 except Exception: 

618 pass 

619 

620 def close(self) -> None: 

621 # In case a connection property does not yet exist 

622 # (due to a crash earlier in the Redis() constructor), return 

623 # immediately as there is nothing to clean-up. 

624 if not hasattr(self, "connection"): 

625 return 

626 

627 conn = self.connection 

628 if conn: 

629 self.connection = None 

630 self.connection_pool.release(conn) 

631 

632 if self.auto_close_connection_pool: 

633 self.connection_pool.disconnect() 

634 

635 def _send_command_parse_response(self, conn, command_name, *args, **options): 

636 """ 

637 Send a command and parse the response 

638 """ 

639 conn.send_command(*args, **options) 

640 return self.parse_response(conn, command_name, **options) 

641 

642 def _close_connection(self, conn) -> None: 

643 """ 

644 Close the connection before retrying. 

645 

646 The supported exceptions are already checked in the 

647 retry object so we don't need to do it here. 

648 

649 After we disconnect the connection, it will try to reconnect and 

650 do a health check as part of the send_command logic(on connection level). 

651 """ 

652 

653 conn.disconnect() 

654 

655 # COMMAND EXECUTION AND PROTOCOL PARSING 

656 def execute_command(self, *args, **options): 

657 return self._execute_command(*args, **options) 

658 

659 def _execute_command(self, *args, **options): 

660 """Execute a command and return a parsed response""" 

661 pool = self.connection_pool 

662 command_name = args[0] 

663 conn = self.connection or pool.get_connection() 

664 

665 if self._single_connection_client: 

666 self.single_connection_lock.acquire() 

667 try: 

668 return conn.retry.call_with_retry( 

669 lambda: self._send_command_parse_response( 

670 conn, command_name, *args, **options 

671 ), 

672 lambda _: self._close_connection(conn), 

673 ) 

674 

675 finally: 

676 if conn and conn.should_reconnect(): 

677 self._close_connection(conn) 

678 conn.connect() 

679 if self._single_connection_client: 

680 self.single_connection_lock.release() 

681 if not self.connection: 

682 pool.release(conn) 

683 

684 def parse_response(self, connection, command_name, **options): 

685 """Parses a response from the Redis server""" 

686 try: 

687 if NEVER_DECODE in options: 

688 response = connection.read_response(disable_decoding=True) 

689 options.pop(NEVER_DECODE) 

690 else: 

691 response = connection.read_response() 

692 except ResponseError: 

693 if EMPTY_RESPONSE in options: 

694 return options[EMPTY_RESPONSE] 

695 raise 

696 

697 if EMPTY_RESPONSE in options: 

698 options.pop(EMPTY_RESPONSE) 

699 

700 # Remove keys entry, it needs only for cache. 

701 options.pop("keys", None) 

702 

703 if command_name in self.response_callbacks: 

704 return self.response_callbacks[command_name](response, **options) 

705 return response 

706 

707 def get_cache(self) -> Optional[CacheInterface]: 

708 return self.connection_pool.cache 

709 

710 

711StrictRedis = Redis 

712 

713 

714class Monitor: 

715 """ 

716 Monitor is useful for handling the MONITOR command to the redis server. 

717 next_command() method returns one command from monitor 

718 listen() method yields commands from monitor. 

719 """ 

720 

721 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)") 

722 command_re = re.compile(r'"(.*?)(?<!\\)"') 

723 

724 def __init__(self, connection_pool): 

725 self.connection_pool = connection_pool 

726 self.connection = self.connection_pool.get_connection() 

727 

728 def __enter__(self): 

729 self._start_monitor() 

730 return self 

731 

732 def __exit__(self, *args): 

733 self.connection.disconnect() 

734 self.connection_pool.release(self.connection) 

735 

736 def next_command(self): 

737 """Parse the response from a monitor command""" 

738 response = self.connection.read_response() 

739 

740 if response is None: 

741 return None 

742 

743 if isinstance(response, bytes): 

744 response = self.connection.encoder.decode(response, force=True) 

745 

746 command_time, command_data = response.split(" ", 1) 

747 m = self.monitor_re.match(command_data) 

748 db_id, client_info, command = m.groups() 

749 command = " ".join(self.command_re.findall(command)) 

750 # Redis escapes double quotes because each piece of the command 

751 # string is surrounded by double quotes. We don't have that 

752 # requirement so remove the escaping and leave the quote. 

753 command = command.replace('\\"', '"') 

754 

755 if client_info == "lua": 

756 client_address = "lua" 

757 client_port = "" 

758 client_type = "lua" 

759 elif client_info.startswith("unix"): 

760 client_address = "unix" 

761 client_port = client_info[5:] 

762 client_type = "unix" 

763 else: 

764 if client_info == "": 

765 client_address = "" 

766 client_port = "" 

767 client_type = "unknown" 

768 else: 

769 # use rsplit as ipv6 addresses contain colons 

770 client_address, client_port = client_info.rsplit(":", 1) 

771 client_type = "tcp" 

772 return { 

773 "time": float(command_time), 

774 "db": int(db_id), 

775 "client_address": client_address, 

776 "client_port": client_port, 

777 "client_type": client_type, 

778 "command": command, 

779 } 

780 

781 def listen(self): 

782 """Listen for commands coming to the server.""" 

783 while True: 

784 yield self.next_command() 

785 

786 def _start_monitor(self): 

787 self.connection.send_command("MONITOR") 

788 # check that monitor returns 'OK', but don't return it to user 

789 response = self.connection.read_response() 

790 

791 if not bool_ok(response): 

792 raise RedisError(f"MONITOR failed: {response}") 

793 

794 

795class PubSub: 

796 """ 

797 PubSub provides publish, subscribe and listen support to Redis channels. 

798 

799 After subscribing to one or more channels, the listen() method will block 

800 until a message arrives on one of the subscribed channels. That message 

801 will be returned and it's safe to start listening again. 

802 """ 

803 

804 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage") 

805 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe") 

806 HEALTH_CHECK_MESSAGE = "redis-py-health-check" 

807 

808 def __init__( 

809 self, 

810 connection_pool, 

811 shard_hint=None, 

812 ignore_subscribe_messages: bool = False, 

813 encoder: Optional["Encoder"] = None, 

814 push_handler_func: Union[None, Callable[[str], None]] = None, 

815 event_dispatcher: Optional["EventDispatcher"] = None, 

816 ): 

817 self.connection_pool = connection_pool 

818 self.shard_hint = shard_hint 

819 self.ignore_subscribe_messages = ignore_subscribe_messages 

820 self.connection = None 

821 self.subscribed_event = threading.Event() 

822 # we need to know the encoding options for this connection in order 

823 # to lookup channel and pattern names for callback handlers. 

824 self.encoder = encoder 

825 self.push_handler_func = push_handler_func 

826 if event_dispatcher is None: 

827 self._event_dispatcher = EventDispatcher() 

828 else: 

829 self._event_dispatcher = event_dispatcher 

830 

831 self._lock = threading.RLock() 

832 if self.encoder is None: 

833 self.encoder = self.connection_pool.get_encoder() 

834 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) 

835 if self.encoder.decode_responses: 

836 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] 

837 else: 

838 self.health_check_response = [b"pong", self.health_check_response_b] 

839 if self.push_handler_func is None: 

840 _set_info_logger() 

841 self.reset() 

842 

843 def __enter__(self) -> "PubSub": 

844 return self 

845 

846 def __exit__(self, exc_type, exc_value, traceback) -> None: 

847 self.reset() 

848 

849 def __del__(self) -> None: 

850 try: 

851 # if this object went out of scope prior to shutting down 

852 # subscriptions, close the connection manually before 

853 # returning it to the connection pool 

854 self.reset() 

855 except Exception: 

856 pass 

857 

858 def reset(self) -> None: 

859 if self.connection: 

860 self.connection.disconnect() 

861 self.connection.deregister_connect_callback(self.on_connect) 

862 self.connection_pool.release(self.connection) 

863 self.connection = None 

864 self.health_check_response_counter = 0 

865 self.channels = {} 

866 self.pending_unsubscribe_channels = set() 

867 self.shard_channels = {} 

868 self.pending_unsubscribe_shard_channels = set() 

869 self.patterns = {} 

870 self.pending_unsubscribe_patterns = set() 

871 self.subscribed_event.clear() 

872 

873 def close(self) -> None: 

874 self.reset() 

875 

876 def on_connect(self, connection) -> None: 

877 "Re-subscribe to any channels and patterns previously subscribed to" 

878 # NOTE: for python3, we can't pass bytestrings as keyword arguments 

879 # so we need to decode channel/pattern names back to unicode strings 

880 # before passing them to [p]subscribe. 

881 self.pending_unsubscribe_channels.clear() 

882 self.pending_unsubscribe_patterns.clear() 

883 self.pending_unsubscribe_shard_channels.clear() 

884 if self.channels: 

885 channels = { 

886 self.encoder.decode(k, force=True): v for k, v in self.channels.items() 

887 } 

888 self.subscribe(**channels) 

889 if self.patterns: 

890 patterns = { 

891 self.encoder.decode(k, force=True): v for k, v in self.patterns.items() 

892 } 

893 self.psubscribe(**patterns) 

894 if self.shard_channels: 

895 shard_channels = { 

896 self.encoder.decode(k, force=True): v 

897 for k, v in self.shard_channels.items() 

898 } 

899 self.ssubscribe(**shard_channels) 

900 

901 @property 

902 def subscribed(self) -> bool: 

903 """Indicates if there are subscriptions to any channels or patterns""" 

904 return self.subscribed_event.is_set() 

905 

906 def execute_command(self, *args): 

907 """Execute a publish/subscribe command""" 

908 

909 # NOTE: don't parse the response in this function -- it could pull a 

910 # legitimate message off the stack if the connection is already 

911 # subscribed to one or more channels 

912 

913 if self.connection is None: 

914 self.connection = self.connection_pool.get_connection() 

915 # register a callback that re-subscribes to any channels we 

916 # were listening to when we were disconnected 

917 self.connection.register_connect_callback(self.on_connect) 

918 if self.push_handler_func is not None: 

919 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

920 self._event_dispatcher.dispatch( 

921 AfterPubSubConnectionInstantiationEvent( 

922 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

923 ) 

924 ) 

925 connection = self.connection 

926 kwargs = {"check_health": not self.subscribed} 

927 if not self.subscribed: 

928 self.clean_health_check_responses() 

929 with self._lock: 

930 self._execute(connection, connection.send_command, *args, **kwargs) 

931 

932 def clean_health_check_responses(self) -> None: 

933 """ 

934 If any health check responses are present, clean them 

935 """ 

936 ttl = 10 

937 conn = self.connection 

938 while conn and self.health_check_response_counter > 0 and ttl > 0: 

939 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout): 

940 response = self._execute(conn, conn.read_response) 

941 if self.is_health_check_response(response): 

942 self.health_check_response_counter -= 1 

943 else: 

944 raise PubSubError( 

945 "A non health check response was cleaned by " 

946 "execute_command: {}".format(response) 

947 ) 

948 ttl -= 1 

949 

950 def _reconnect(self, conn) -> None: 

951 """ 

952 The supported exceptions are already checked in the 

953 retry object so we don't need to do it here. 

954 

955 In this error handler we are trying to reconnect to the server. 

956 """ 

957 conn.disconnect() 

958 conn.connect() 

959 

960 def _execute(self, conn, command, *args, **kwargs): 

961 """ 

962 Connect manually upon disconnection. If the Redis server is down, 

963 this will fail and raise a ConnectionError as desired. 

964 After reconnection, the ``on_connect`` callback should have been 

965 called by the # connection to resubscribe us to any channels and 

966 patterns we were previously listening to 

967 """ 

968 

969 if conn.should_reconnect(): 

970 self._reconnect(conn) 

971 

972 response = conn.retry.call_with_retry( 

973 lambda: command(*args, **kwargs), 

974 lambda _: self._reconnect(conn), 

975 ) 

976 

977 return response 

978 

979 def parse_response(self, block=True, timeout=0): 

980 """Parse the response from a publish/subscribe command""" 

981 conn = self.connection 

982 if conn is None: 

983 raise RuntimeError( 

984 "pubsub connection not set: " 

985 "did you forget to call subscribe() or psubscribe()?" 

986 ) 

987 

988 self.check_health() 

989 

990 def try_read(): 

991 if not block: 

992 if not conn.can_read(timeout=timeout): 

993 return None 

994 else: 

995 conn.connect() 

996 return conn.read_response(disconnect_on_error=False, push_request=True) 

997 

998 response = self._execute(conn, try_read) 

999 

1000 if self.is_health_check_response(response): 

1001 # ignore the health check message as user might not expect it 

1002 self.health_check_response_counter -= 1 

1003 return None 

1004 return response 

1005 

1006 def is_health_check_response(self, response) -> bool: 

1007 """ 

1008 Check if the response is a health check response. 

1009 If there are no subscriptions redis responds to PING command with a 

1010 bulk response, instead of a multi-bulk with "pong" and the response. 

1011 """ 

1012 return response in [ 

1013 self.health_check_response, # If there was a subscription 

1014 self.health_check_response_b, # If there wasn't 

1015 ] 

1016 

1017 def check_health(self) -> None: 

1018 conn = self.connection 

1019 if conn is None: 

1020 raise RuntimeError( 

1021 "pubsub connection not set: " 

1022 "did you forget to call subscribe() or psubscribe()?" 

1023 ) 

1024 

1025 if conn.health_check_interval and time.monotonic() > conn.next_health_check: 

1026 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False) 

1027 self.health_check_response_counter += 1 

1028 

1029 def _normalize_keys(self, data) -> Dict: 

1030 """ 

1031 normalize channel/pattern names to be either bytes or strings 

1032 based on whether responses are automatically decoded. this saves us 

1033 from coercing the value for each message coming in. 

1034 """ 

1035 encode = self.encoder.encode 

1036 decode = self.encoder.decode 

1037 return {decode(encode(k)): v for k, v in data.items()} 

1038 

1039 def psubscribe(self, *args, **kwargs): 

1040 """ 

1041 Subscribe to channel patterns. Patterns supplied as keyword arguments 

1042 expect a pattern name as the key and a callable as the value. A 

1043 pattern's callable will be invoked automatically when a message is 

1044 received on that pattern rather than producing a message via 

1045 ``listen()``. 

1046 """ 

1047 if args: 

1048 args = list_or_args(args[0], args[1:]) 

1049 new_patterns = dict.fromkeys(args) 

1050 new_patterns.update(kwargs) 

1051 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys()) 

1052 # update the patterns dict AFTER we send the command. we don't want to 

1053 # subscribe twice to these patterns, once for the command and again 

1054 # for the reconnection. 

1055 new_patterns = self._normalize_keys(new_patterns) 

1056 self.patterns.update(new_patterns) 

1057 if not self.subscribed: 

1058 # Set the subscribed_event flag to True 

1059 self.subscribed_event.set() 

1060 # Clear the health check counter 

1061 self.health_check_response_counter = 0 

1062 self.pending_unsubscribe_patterns.difference_update(new_patterns) 

1063 return ret_val 

1064 

1065 def punsubscribe(self, *args): 

1066 """ 

1067 Unsubscribe from the supplied patterns. If empty, unsubscribe from 

1068 all patterns. 

1069 """ 

1070 if args: 

1071 args = list_or_args(args[0], args[1:]) 

1072 patterns = self._normalize_keys(dict.fromkeys(args)) 

1073 else: 

1074 patterns = self.patterns 

1075 self.pending_unsubscribe_patterns.update(patterns) 

1076 return self.execute_command("PUNSUBSCRIBE", *args) 

1077 

1078 def subscribe(self, *args, **kwargs): 

1079 """ 

1080 Subscribe to channels. Channels supplied as keyword arguments expect 

1081 a channel name as the key and a callable as the value. A channel's 

1082 callable will be invoked automatically when a message is received on 

1083 that channel rather than producing a message via ``listen()`` or 

1084 ``get_message()``. 

1085 """ 

1086 if args: 

1087 args = list_or_args(args[0], args[1:]) 

1088 new_channels = dict.fromkeys(args) 

1089 new_channels.update(kwargs) 

1090 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys()) 

1091 # update the channels dict AFTER we send the command. we don't want to 

1092 # subscribe twice to these channels, once for the command and again 

1093 # for the reconnection. 

1094 new_channels = self._normalize_keys(new_channels) 

1095 self.channels.update(new_channels) 

1096 if not self.subscribed: 

1097 # Set the subscribed_event flag to True 

1098 self.subscribed_event.set() 

1099 # Clear the health check counter 

1100 self.health_check_response_counter = 0 

1101 self.pending_unsubscribe_channels.difference_update(new_channels) 

1102 return ret_val 

1103 

1104 def unsubscribe(self, *args): 

1105 """ 

1106 Unsubscribe from the supplied channels. If empty, unsubscribe from 

1107 all channels 

1108 """ 

1109 if args: 

1110 args = list_or_args(args[0], args[1:]) 

1111 channels = self._normalize_keys(dict.fromkeys(args)) 

1112 else: 

1113 channels = self.channels 

1114 self.pending_unsubscribe_channels.update(channels) 

1115 return self.execute_command("UNSUBSCRIBE", *args) 

1116 

1117 def ssubscribe(self, *args, target_node=None, **kwargs): 

1118 """ 

1119 Subscribes the client to the specified shard channels. 

1120 Channels supplied as keyword arguments expect a channel name as the key 

1121 and a callable as the value. A channel's callable will be invoked automatically 

1122 when a message is received on that channel rather than producing a message via 

1123 ``listen()`` or ``get_sharded_message()``. 

1124 """ 

1125 if args: 

1126 args = list_or_args(args[0], args[1:]) 

1127 new_s_channels = dict.fromkeys(args) 

1128 new_s_channels.update(kwargs) 

1129 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys()) 

1130 # update the s_channels dict AFTER we send the command. we don't want to 

1131 # subscribe twice to these channels, once for the command and again 

1132 # for the reconnection. 

1133 new_s_channels = self._normalize_keys(new_s_channels) 

1134 self.shard_channels.update(new_s_channels) 

1135 if not self.subscribed: 

1136 # Set the subscribed_event flag to True 

1137 self.subscribed_event.set() 

1138 # Clear the health check counter 

1139 self.health_check_response_counter = 0 

1140 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels) 

1141 return ret_val 

1142 

1143 def sunsubscribe(self, *args, target_node=None): 

1144 """ 

1145 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from 

1146 all shard_channels 

1147 """ 

1148 if args: 

1149 args = list_or_args(args[0], args[1:]) 

1150 s_channels = self._normalize_keys(dict.fromkeys(args)) 

1151 else: 

1152 s_channels = self.shard_channels 

1153 self.pending_unsubscribe_shard_channels.update(s_channels) 

1154 return self.execute_command("SUNSUBSCRIBE", *args) 

1155 

1156 def listen(self): 

1157 "Listen for messages on channels this client has been subscribed to" 

1158 while self.subscribed: 

1159 response = self.handle_message(self.parse_response(block=True)) 

1160 if response is not None: 

1161 yield response 

1162 

1163 def get_message( 

1164 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0 

1165 ): 

1166 """ 

1167 Get the next message if one is available, otherwise None. 

1168 

1169 If timeout is specified, the system will wait for `timeout` seconds 

1170 before returning. Timeout should be specified as a floating point 

1171 number, or None, to wait indefinitely. 

1172 """ 

1173 if not self.subscribed: 

1174 # Wait for subscription 

1175 start_time = time.monotonic() 

1176 if self.subscribed_event.wait(timeout) is True: 

1177 # The connection was subscribed during the timeout time frame. 

1178 # The timeout should be adjusted based on the time spent 

1179 # waiting for the subscription 

1180 time_spent = time.monotonic() - start_time 

1181 timeout = max(0.0, timeout - time_spent) 

1182 else: 

1183 # The connection isn't subscribed to any channels or patterns, 

1184 # so no messages are available 

1185 return None 

1186 

1187 response = self.parse_response(block=(timeout is None), timeout=timeout) 

1188 

1189 if response: 

1190 return self.handle_message(response, ignore_subscribe_messages) 

1191 return None 

1192 

1193 get_sharded_message = get_message 

1194 

1195 def ping(self, message: Union[str, None] = None) -> bool: 

1196 """ 

1197 Ping the Redis server to test connectivity. 

1198 

1199 Sends a PING command to the Redis server and returns True if the server 

1200 responds with "PONG". 

1201 """ 

1202 args = ["PING", message] if message is not None else ["PING"] 

1203 return self.execute_command(*args) 

1204 

1205 def handle_message(self, response, ignore_subscribe_messages=False): 

1206 """ 

1207 Parses a pub/sub message. If the channel or pattern was subscribed to 

1208 with a message handler, the handler is invoked instead of a parsed 

1209 message being returned. 

1210 """ 

1211 if response is None: 

1212 return None 

1213 if isinstance(response, bytes): 

1214 response = [b"pong", response] if response != b"PONG" else [b"pong", b""] 

1215 

1216 message_type = str_if_bytes(response[0]) 

1217 if message_type == "pmessage": 

1218 message = { 

1219 "type": message_type, 

1220 "pattern": response[1], 

1221 "channel": response[2], 

1222 "data": response[3], 

1223 } 

1224 elif message_type == "pong": 

1225 message = { 

1226 "type": message_type, 

1227 "pattern": None, 

1228 "channel": None, 

1229 "data": response[1], 

1230 } 

1231 else: 

1232 message = { 

1233 "type": message_type, 

1234 "pattern": None, 

1235 "channel": response[1], 

1236 "data": response[2], 

1237 } 

1238 

1239 # if this is an unsubscribe message, remove it from memory 

1240 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 

1241 if message_type == "punsubscribe": 

1242 pattern = response[1] 

1243 if pattern in self.pending_unsubscribe_patterns: 

1244 self.pending_unsubscribe_patterns.remove(pattern) 

1245 self.patterns.pop(pattern, None) 

1246 elif message_type == "sunsubscribe": 

1247 s_channel = response[1] 

1248 if s_channel in self.pending_unsubscribe_shard_channels: 

1249 self.pending_unsubscribe_shard_channels.remove(s_channel) 

1250 self.shard_channels.pop(s_channel, None) 

1251 else: 

1252 channel = response[1] 

1253 if channel in self.pending_unsubscribe_channels: 

1254 self.pending_unsubscribe_channels.remove(channel) 

1255 self.channels.pop(channel, None) 

1256 if not self.channels and not self.patterns and not self.shard_channels: 

1257 # There are no subscriptions anymore, set subscribed_event flag 

1258 # to false 

1259 self.subscribed_event.clear() 

1260 

1261 if message_type in self.PUBLISH_MESSAGE_TYPES: 

1262 # if there's a message handler, invoke it 

1263 if message_type == "pmessage": 

1264 handler = self.patterns.get(message["pattern"], None) 

1265 elif message_type == "smessage": 

1266 handler = self.shard_channels.get(message["channel"], None) 

1267 else: 

1268 handler = self.channels.get(message["channel"], None) 

1269 if handler: 

1270 handler(message) 

1271 return None 

1272 elif message_type != "pong": 

1273 # this is a subscribe/unsubscribe message. ignore if we don't 

1274 # want them 

1275 if ignore_subscribe_messages or self.ignore_subscribe_messages: 

1276 return None 

1277 

1278 return message 

1279 

1280 def run_in_thread( 

1281 self, 

1282 sleep_time: float = 0.0, 

1283 daemon: bool = False, 

1284 exception_handler: Optional[Callable] = None, 

1285 pubsub=None, 

1286 sharded_pubsub: bool = False, 

1287 ) -> "PubSubWorkerThread": 

1288 for channel, handler in self.channels.items(): 

1289 if handler is None: 

1290 raise PubSubError(f"Channel: '{channel}' has no handler registered") 

1291 for pattern, handler in self.patterns.items(): 

1292 if handler is None: 

1293 raise PubSubError(f"Pattern: '{pattern}' has no handler registered") 

1294 for s_channel, handler in self.shard_channels.items(): 

1295 if handler is None: 

1296 raise PubSubError( 

1297 f"Shard Channel: '{s_channel}' has no handler registered" 

1298 ) 

1299 

1300 pubsub = self if pubsub is None else pubsub 

1301 thread = PubSubWorkerThread( 

1302 pubsub, 

1303 sleep_time, 

1304 daemon=daemon, 

1305 exception_handler=exception_handler, 

1306 sharded_pubsub=sharded_pubsub, 

1307 ) 

1308 thread.start() 

1309 return thread 

1310 

1311 

1312class PubSubWorkerThread(threading.Thread): 

1313 def __init__( 

1314 self, 

1315 pubsub, 

1316 sleep_time: float, 

1317 daemon: bool = False, 

1318 exception_handler: Union[ 

1319 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None 

1320 ] = None, 

1321 sharded_pubsub: bool = False, 

1322 ): 

1323 super().__init__() 

1324 self.daemon = daemon 

1325 self.pubsub = pubsub 

1326 self.sleep_time = sleep_time 

1327 self.exception_handler = exception_handler 

1328 self.sharded_pubsub = sharded_pubsub 

1329 self._running = threading.Event() 

1330 

1331 def run(self) -> None: 

1332 if self._running.is_set(): 

1333 return 

1334 self._running.set() 

1335 pubsub = self.pubsub 

1336 sleep_time = self.sleep_time 

1337 while self._running.is_set(): 

1338 try: 

1339 if not self.sharded_pubsub: 

1340 pubsub.get_message( 

1341 ignore_subscribe_messages=True, timeout=sleep_time 

1342 ) 

1343 else: 

1344 pubsub.get_sharded_message( 

1345 ignore_subscribe_messages=True, timeout=sleep_time 

1346 ) 

1347 except BaseException as e: 

1348 if self.exception_handler is None: 

1349 raise 

1350 self.exception_handler(e, pubsub, self) 

1351 pubsub.close() 

1352 

1353 def stop(self) -> None: 

1354 # trip the flag so the run loop exits. the run loop will 

1355 # close the pubsub connection, which disconnects the socket 

1356 # and returns the connection to the pool. 

1357 self._running.clear() 

1358 

1359 

1360class Pipeline(Redis): 

1361 """ 

1362 Pipelines provide a way to transmit multiple commands to the Redis server 

1363 in one transmission. This is convenient for batch processing, such as 

1364 saving all the values in a list to Redis. 

1365 

1366 All commands executed within a pipeline(when running in transactional mode, 

1367 which is the default behavior) are wrapped with MULTI and EXEC 

1368 calls. This guarantees all commands executed in the pipeline will be 

1369 executed atomically. 

1370 

1371 Any command raising an exception does *not* halt the execution of 

1372 subsequent commands in the pipeline. Instead, the exception is caught 

1373 and its instance is placed into the response list returned by execute(). 

1374 Code iterating over the response list should be able to deal with an 

1375 instance of an exception as a potential value. In general, these will be 

1376 ResponseError exceptions, such as those raised when issuing a command 

1377 on a key of a different datatype. 

1378 """ 

1379 

1380 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

1381 

1382 def __init__( 

1383 self, 

1384 connection_pool: ConnectionPool, 

1385 response_callbacks, 

1386 transaction, 

1387 shard_hint, 

1388 ): 

1389 self.connection_pool = connection_pool 

1390 self.connection: Optional[Connection] = None 

1391 self.response_callbacks = response_callbacks 

1392 self.transaction = transaction 

1393 self.shard_hint = shard_hint 

1394 self.watching = False 

1395 self.command_stack = [] 

1396 self.scripts: Set[Script] = set() 

1397 self.explicit_transaction = False 

1398 

1399 def __enter__(self) -> "Pipeline": 

1400 return self 

1401 

1402 def __exit__(self, exc_type, exc_value, traceback): 

1403 self.reset() 

1404 

1405 def __del__(self): 

1406 try: 

1407 self.reset() 

1408 except Exception: 

1409 pass 

1410 

1411 def __len__(self) -> int: 

1412 return len(self.command_stack) 

1413 

1414 def __bool__(self) -> bool: 

1415 """Pipeline instances should always evaluate to True""" 

1416 return True 

1417 

1418 def reset(self) -> None: 

1419 self.command_stack = [] 

1420 self.scripts = set() 

1421 # make sure to reset the connection state in the event that we were 

1422 # watching something 

1423 if self.watching and self.connection: 

1424 try: 

1425 # call this manually since our unwatch or 

1426 # immediate_execute_command methods can call reset() 

1427 self.connection.send_command("UNWATCH") 

1428 self.connection.read_response() 

1429 except ConnectionError: 

1430 # disconnect will also remove any previous WATCHes 

1431 self.connection.disconnect() 

1432 # clean up the other instance attributes 

1433 self.watching = False 

1434 self.explicit_transaction = False 

1435 

1436 # we can safely return the connection to the pool here since we're 

1437 # sure we're no longer WATCHing anything 

1438 if self.connection: 

1439 self.connection_pool.release(self.connection) 

1440 self.connection = None 

1441 

1442 def close(self) -> None: 

1443 """Close the pipeline""" 

1444 self.reset() 

1445 

1446 def multi(self) -> None: 

1447 """ 

1448 Start a transactional block of the pipeline after WATCH commands 

1449 are issued. End the transactional block with `execute`. 

1450 """ 

1451 if self.explicit_transaction: 

1452 raise RedisError("Cannot issue nested calls to MULTI") 

1453 if self.command_stack: 

1454 raise RedisError( 

1455 "Commands without an initial WATCH have already been issued" 

1456 ) 

1457 self.explicit_transaction = True 

1458 

1459 def execute_command(self, *args, **kwargs): 

1460 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction: 

1461 return self.immediate_execute_command(*args, **kwargs) 

1462 return self.pipeline_execute_command(*args, **kwargs) 

1463 

1464 def _disconnect_reset_raise_on_watching( 

1465 self, 

1466 conn: AbstractConnection, 

1467 error: Exception, 

1468 ) -> None: 

1469 """ 

1470 Close the connection reset watching state and 

1471 raise an exception if we were watching. 

1472 

1473 The supported exceptions are already checked in the 

1474 retry object so we don't need to do it here. 

1475 

1476 After we disconnect the connection, it will try to reconnect and 

1477 do a health check as part of the send_command logic(on connection level). 

1478 """ 

1479 conn.disconnect() 

1480 

1481 # if we were already watching a variable, the watch is no longer 

1482 # valid since this connection has died. raise a WatchError, which 

1483 # indicates the user should retry this transaction. 

1484 if self.watching: 

1485 self.reset() 

1486 raise WatchError( 

1487 f"A {type(error).__name__} occurred while watching one or more keys" 

1488 ) 

1489 

1490 def immediate_execute_command(self, *args, **options): 

1491 """ 

1492 Execute a command immediately, but don't auto-retry on the supported 

1493 errors for retry if we're already WATCHing a variable. 

1494 Used when issuing WATCH or subsequent commands retrieving their values but before 

1495 MULTI is called. 

1496 """ 

1497 command_name = args[0] 

1498 conn = self.connection 

1499 # if this is the first call, we need a connection 

1500 if not conn: 

1501 conn = self.connection_pool.get_connection() 

1502 self.connection = conn 

1503 

1504 return conn.retry.call_with_retry( 

1505 lambda: self._send_command_parse_response( 

1506 conn, command_name, *args, **options 

1507 ), 

1508 lambda error: self._disconnect_reset_raise_on_watching(conn, error), 

1509 ) 

1510 

1511 def pipeline_execute_command(self, *args, **options) -> "Pipeline": 

1512 """ 

1513 Stage a command to be executed when execute() is next called 

1514 

1515 Returns the current Pipeline object back so commands can be 

1516 chained together, such as: 

1517 

1518 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

1519 

1520 At some other point, you can then run: pipe.execute(), 

1521 which will execute all commands queued in the pipe. 

1522 """ 

1523 self.command_stack.append((args, options)) 

1524 return self 

1525 

1526 def _execute_transaction( 

1527 self, connection: Connection, commands, raise_on_error 

1528 ) -> List: 

1529 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) 

1530 all_cmds = connection.pack_commands( 

1531 [args for args, options in cmds if EMPTY_RESPONSE not in options] 

1532 ) 

1533 connection.send_packed_command(all_cmds) 

1534 errors = [] 

1535 

1536 # parse off the response for MULTI 

1537 # NOTE: we need to handle ResponseErrors here and continue 

1538 # so that we read all the additional command messages from 

1539 # the socket 

1540 try: 

1541 self.parse_response(connection, "_") 

1542 except ResponseError as e: 

1543 errors.append((0, e)) 

1544 

1545 # and all the other commands 

1546 for i, command in enumerate(commands): 

1547 if EMPTY_RESPONSE in command[1]: 

1548 errors.append((i, command[1][EMPTY_RESPONSE])) 

1549 else: 

1550 try: 

1551 self.parse_response(connection, "_") 

1552 except ResponseError as e: 

1553 self.annotate_exception(e, i + 1, command[0]) 

1554 errors.append((i, e)) 

1555 

1556 # parse the EXEC. 

1557 try: 

1558 response = self.parse_response(connection, "_") 

1559 except ExecAbortError: 

1560 if errors: 

1561 raise errors[0][1] 

1562 raise 

1563 

1564 # EXEC clears any watched keys 

1565 self.watching = False 

1566 

1567 if response is None: 

1568 raise WatchError("Watched variable changed.") 

1569 

1570 # put any parse errors into the response 

1571 for i, e in errors: 

1572 response.insert(i, e) 

1573 

1574 if len(response) != len(commands): 

1575 self.connection.disconnect() 

1576 raise ResponseError( 

1577 "Wrong number of response items from pipeline execution" 

1578 ) 

1579 

1580 # find any errors in the response and raise if necessary 

1581 if raise_on_error: 

1582 self.raise_first_error(commands, response) 

1583 

1584 # We have to run response callbacks manually 

1585 data = [] 

1586 for r, cmd in zip(response, commands): 

1587 if not isinstance(r, Exception): 

1588 args, options = cmd 

1589 # Remove keys entry, it needs only for cache. 

1590 options.pop("keys", None) 

1591 command_name = args[0] 

1592 if command_name in self.response_callbacks: 

1593 r = self.response_callbacks[command_name](r, **options) 

1594 data.append(r) 

1595 

1596 return data 

1597 

1598 def _execute_pipeline(self, connection, commands, raise_on_error): 

1599 # build up all commands into a single request to increase network perf 

1600 all_cmds = connection.pack_commands([args for args, _ in commands]) 

1601 connection.send_packed_command(all_cmds) 

1602 

1603 responses = [] 

1604 for args, options in commands: 

1605 try: 

1606 responses.append(self.parse_response(connection, args[0], **options)) 

1607 except ResponseError as e: 

1608 responses.append(e) 

1609 

1610 if raise_on_error: 

1611 self.raise_first_error(commands, responses) 

1612 

1613 return responses 

1614 

1615 def raise_first_error(self, commands, response): 

1616 for i, r in enumerate(response): 

1617 if isinstance(r, ResponseError): 

1618 self.annotate_exception(r, i + 1, commands[i][0]) 

1619 raise r 

1620 

1621 def annotate_exception(self, exception, number, command): 

1622 cmd = " ".join(map(safe_str, command)) 

1623 msg = ( 

1624 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

1625 f"caused error: {exception.args[0]}" 

1626 ) 

1627 exception.args = (msg,) + exception.args[1:] 

1628 

1629 def parse_response(self, connection, command_name, **options): 

1630 result = Redis.parse_response(self, connection, command_name, **options) 

1631 if command_name in self.UNWATCH_COMMANDS: 

1632 self.watching = False 

1633 elif command_name == "WATCH": 

1634 self.watching = True 

1635 return result 

1636 

1637 def load_scripts(self): 

1638 # make sure all scripts that are about to be run on this pipeline exist 

1639 scripts = list(self.scripts) 

1640 immediate = self.immediate_execute_command 

1641 shas = [s.sha for s in scripts] 

1642 # we can't use the normal script_* methods because they would just 

1643 # get buffered in the pipeline. 

1644 exists = immediate("SCRIPT EXISTS", *shas) 

1645 if not all(exists): 

1646 for s, exist in zip(scripts, exists): 

1647 if not exist: 

1648 s.sha = immediate("SCRIPT LOAD", s.script) 

1649 

1650 def _disconnect_raise_on_watching( 

1651 self, 

1652 conn: AbstractConnection, 

1653 error: Exception, 

1654 ) -> None: 

1655 """ 

1656 Close the connection, raise an exception if we were watching. 

1657 

1658 The supported exceptions are already checked in the 

1659 retry object so we don't need to do it here. 

1660 

1661 After we disconnect the connection, it will try to reconnect and 

1662 do a health check as part of the send_command logic(on connection level). 

1663 """ 

1664 conn.disconnect() 

1665 # if we were watching a variable, the watch is no longer valid 

1666 # since this connection has died. raise a WatchError, which 

1667 # indicates the user should retry this transaction. 

1668 if self.watching: 

1669 raise WatchError( 

1670 f"A {type(error).__name__} occurred while watching one or more keys" 

1671 ) 

1672 

1673 def execute(self, raise_on_error: bool = True) -> List[Any]: 

1674 """Execute all the commands in the current pipeline""" 

1675 stack = self.command_stack 

1676 if not stack and not self.watching: 

1677 return [] 

1678 if self.scripts: 

1679 self.load_scripts() 

1680 if self.transaction or self.explicit_transaction: 

1681 execute = self._execute_transaction 

1682 else: 

1683 execute = self._execute_pipeline 

1684 

1685 conn = self.connection 

1686 if not conn: 

1687 conn = self.connection_pool.get_connection() 

1688 # assign to self.connection so reset() releases the connection 

1689 # back to the pool after we're done 

1690 self.connection = conn 

1691 

1692 try: 

1693 return conn.retry.call_with_retry( 

1694 lambda: execute(conn, stack, raise_on_error), 

1695 lambda error: self._disconnect_raise_on_watching(conn, error), 

1696 ) 

1697 finally: 

1698 # in reset() the connection is disconnected before returned to the pool if 

1699 # it is marked for reconnect. 

1700 self.reset() 

1701 

1702 def discard(self): 

1703 """ 

1704 Flushes all previously queued commands 

1705 See: https://redis.io/commands/DISCARD 

1706 """ 

1707 self.execute_command("DISCARD") 

1708 

1709 def watch(self, *names): 

1710 """Watches the values at keys ``names``""" 

1711 if self.explicit_transaction: 

1712 raise RedisError("Cannot issue a WATCH after a MULTI") 

1713 return self.execute_command("WATCH", *names) 

1714 

1715 def unwatch(self) -> bool: 

1716 """Unwatches all previously specified keys""" 

1717 return self.watching and self.execute_command("UNWATCH") or True