Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

703 statements  

1import copy 

2import re 

3import threading 

4import time 

5from itertools import chain 

6from typing import ( 

7 TYPE_CHECKING, 

8 Any, 

9 Callable, 

10 Dict, 

11 List, 

12 Mapping, 

13 Optional, 

14 Set, 

15 Type, 

16 Union, 

17) 

18 

19from redis._parsers.encoders import Encoder 

20from redis._parsers.helpers import ( 

21 _RedisCallbacks, 

22 _RedisCallbacksRESP2, 

23 _RedisCallbacksRESP3, 

24 bool_ok, 

25) 

26from redis.backoff import ExponentialWithJitterBackoff 

27from redis.cache import CacheConfig, CacheInterface 

28from redis.commands import ( 

29 CoreCommands, 

30 RedisModuleCommands, 

31 SentinelCommands, 

32 list_or_args, 

33) 

34from redis.commands.core import Script 

35from redis.connection import ( 

36 AbstractConnection, 

37 Connection, 

38 ConnectionPool, 

39 SSLConnection, 

40 UnixDomainSocketConnection, 

41) 

42from redis.credentials import CredentialProvider 

43from redis.event import ( 

44 AfterPooledConnectionsInstantiationEvent, 

45 AfterPubSubConnectionInstantiationEvent, 

46 AfterSingleConnectionInstantiationEvent, 

47 ClientType, 

48 EventDispatcher, 

49) 

50from redis.exceptions import ( 

51 ConnectionError, 

52 ExecAbortError, 

53 PubSubError, 

54 RedisError, 

55 ResponseError, 

56 WatchError, 

57) 

58from redis.lock import Lock 

59from redis.maint_notifications import ( 

60 MaintNotificationsConfig, 

61) 

62from redis.retry import Retry 

63from redis.utils import ( 

64 _set_info_logger, 

65 deprecated_args, 

66 get_lib_version, 

67 safe_str, 

68 str_if_bytes, 

69 truncate_text, 

70) 

71 

72if TYPE_CHECKING: 

73 import ssl 

74 

75 import OpenSSL 

76 

77SYM_EMPTY = b"" 

78EMPTY_RESPONSE = "EMPTY_RESPONSE" 

79 

80# some responses (ie. dump) are binary, and just meant to never be decoded 

81NEVER_DECODE = "NEVER_DECODE" 

82 

83 

84class CaseInsensitiveDict(dict): 

85 "Case insensitive dict implementation. Assumes string keys only." 

86 

87 def __init__(self, data: Dict[str, str]) -> None: 

88 for k, v in data.items(): 

89 self[k.upper()] = v 

90 

91 def __contains__(self, k): 

92 return super().__contains__(k.upper()) 

93 

94 def __delitem__(self, k): 

95 super().__delitem__(k.upper()) 

96 

97 def __getitem__(self, k): 

98 return super().__getitem__(k.upper()) 

99 

100 def get(self, k, default=None): 

101 return super().get(k.upper(), default) 

102 

103 def __setitem__(self, k, v): 

104 super().__setitem__(k.upper(), v) 

105 

106 def update(self, data): 

107 data = CaseInsensitiveDict(data) 

108 super().update(data) 

109 

110 

111class AbstractRedis: 

112 pass 

113 

114 

115class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): 

116 """ 

117 Implementation of the Redis protocol. 

118 

119 This abstract class provides a Python interface to all Redis commands 

120 and an implementation of the Redis protocol. 

121 

122 Pipelines derive from this, implementing how 

123 the commands are sent and received to the Redis server. Based on 

124 configuration, an instance will either use a ConnectionPool, or 

125 Connection object to talk to redis. 

126 

127 It is not safe to pass PubSub or Pipeline objects between threads. 

128 """ 

129 

130 @classmethod 

131 def from_url(cls, url: str, **kwargs) -> "Redis": 

132 """ 

133 Return a Redis client object configured from the given URL 

134 

135 For example:: 

136 

137 redis://[[username]:[password]]@localhost:6379/0 

138 rediss://[[username]:[password]]@localhost:6379/0 

139 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

140 

141 Three URL schemes are supported: 

142 

143 - `redis://` creates a TCP socket connection. See more at: 

144 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

145 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

146 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

147 - ``unix://``: creates a Unix Domain Socket connection. 

148 

149 The username, password, hostname, path and all querystring values 

150 are passed through urllib.parse.unquote in order to replace any 

151 percent-encoded values with their corresponding characters. 

152 

153 There are several ways to specify a database number. The first value 

154 found will be used: 

155 

156 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

157 2. If using the redis:// or rediss:// schemes, the path argument 

158 of the url, e.g. redis://localhost/0 

159 3. A ``db`` keyword argument to this function. 

160 

161 If none of these options are specified, the default db=0 is used. 

162 

163 All querystring options are cast to their appropriate Python types. 

164 Boolean arguments can be specified with string values "True"/"False" 

165 or "Yes"/"No". Values that cannot be properly cast cause a 

166 ``ValueError`` to be raised. Once parsed, the querystring arguments 

167 and keyword arguments are passed to the ``ConnectionPool``'s 

168 class initializer. In the case of conflicting arguments, querystring 

169 arguments always win. 

170 

171 """ 

172 single_connection_client = kwargs.pop("single_connection_client", False) 

173 connection_pool = ConnectionPool.from_url(url, **kwargs) 

174 client = cls( 

175 connection_pool=connection_pool, 

176 single_connection_client=single_connection_client, 

177 ) 

178 client.auto_close_connection_pool = True 

179 return client 

180 

181 @classmethod 

182 def from_pool( 

183 cls: Type["Redis"], 

184 connection_pool: ConnectionPool, 

185 ) -> "Redis": 

186 """ 

187 Return a Redis client from the given connection pool. 

188 The Redis client will take ownership of the connection pool and 

189 close it when the Redis client is closed. 

190 """ 

191 client = cls( 

192 connection_pool=connection_pool, 

193 ) 

194 client.auto_close_connection_pool = True 

195 return client 

196 

197 @deprecated_args( 

198 args_to_warn=["retry_on_timeout"], 

199 reason="TimeoutError is included by default.", 

200 version="6.0.0", 

201 ) 

202 def __init__( 

203 self, 

204 host: str = "localhost", 

205 port: int = 6379, 

206 db: int = 0, 

207 password: Optional[str] = None, 

208 socket_timeout: Optional[float] = None, 

209 socket_connect_timeout: Optional[float] = None, 

210 socket_keepalive: Optional[bool] = None, 

211 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, 

212 connection_pool: Optional[ConnectionPool] = None, 

213 unix_socket_path: Optional[str] = None, 

214 encoding: str = "utf-8", 

215 encoding_errors: str = "strict", 

216 decode_responses: bool = False, 

217 retry_on_timeout: bool = False, 

218 retry: Retry = Retry( 

219 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 

220 ), 

221 retry_on_error: Optional[List[Type[Exception]]] = None, 

222 ssl: bool = False, 

223 ssl_keyfile: Optional[str] = None, 

224 ssl_certfile: Optional[str] = None, 

225 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required", 

226 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

227 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

228 ssl_ca_certs: Optional[str] = None, 

229 ssl_ca_path: Optional[str] = None, 

230 ssl_ca_data: Optional[str] = None, 

231 ssl_check_hostname: bool = True, 

232 ssl_password: Optional[str] = None, 

233 ssl_validate_ocsp: bool = False, 

234 ssl_validate_ocsp_stapled: bool = False, 

235 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None, 

236 ssl_ocsp_expected_cert: Optional[str] = None, 

237 ssl_min_version: Optional["ssl.TLSVersion"] = None, 

238 ssl_ciphers: Optional[str] = None, 

239 max_connections: Optional[int] = None, 

240 single_connection_client: bool = False, 

241 health_check_interval: int = 0, 

242 client_name: Optional[str] = None, 

243 lib_name: Optional[str] = "redis-py", 

244 lib_version: Optional[str] = get_lib_version(), 

245 username: Optional[str] = None, 

246 redis_connect_func: Optional[Callable[[], None]] = None, 

247 credential_provider: Optional[CredentialProvider] = None, 

248 protocol: Optional[int] = 2, 

249 cache: Optional[CacheInterface] = None, 

250 cache_config: Optional[CacheConfig] = None, 

251 event_dispatcher: Optional[EventDispatcher] = None, 

252 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

253 ) -> None: 

254 """ 

255 Initialize a new Redis client. 

256 

257 To specify a retry policy for specific errors, you have two options: 

258 

259 1. Set the `retry_on_error` to a list of the error/s to retry on, and 

260 you can also set `retry` to a valid `Retry` object(in case the default 

261 one is not appropriate) - with this approach the retries will be triggered 

262 on the default errors specified in the Retry object enriched with the 

263 errors specified in `retry_on_error`. 

264 

265 2. Define a `Retry` object with configured 'supported_errors' and set 

266 it to the `retry` parameter - with this approach you completely redefine 

267 the errors on which retries will happen. 

268 

269 `retry_on_timeout` is deprecated - please include the TimeoutError 

270 either in the Retry object or in the `retry_on_error` list. 

271 

272 When 'connection_pool' is provided - the retry configuration of the 

273 provided pool will be used. 

274 

275 Args: 

276 

277 single_connection_client: 

278 if `True`, connection pool is not used. In that case `Redis` 

279 instance use is not thread safe. 

280 decode_responses: 

281 if `True`, the response will be decoded to utf-8. 

282 Argument is ignored when connection_pool is provided. 

283 maint_notifications_config: 

284 configuration the pool to support maintenance notifications - see 

285 `redis.maint_notifications.MaintNotificationsConfig` for details. 

286 Only supported with RESP3 

287 If not provided and protocol is RESP3, the maintenance notifications 

288 will be enabled by default (logic is included in the connection pool 

289 initialization). 

290 Argument is ignored when connection_pool is provided. 

291 """ 

292 if event_dispatcher is None: 

293 self._event_dispatcher = EventDispatcher() 

294 else: 

295 self._event_dispatcher = event_dispatcher 

296 if not connection_pool: 

297 if not retry_on_error: 

298 retry_on_error = [] 

299 kwargs = { 

300 "db": db, 

301 "username": username, 

302 "password": password, 

303 "socket_timeout": socket_timeout, 

304 "encoding": encoding, 

305 "encoding_errors": encoding_errors, 

306 "decode_responses": decode_responses, 

307 "retry_on_error": retry_on_error, 

308 "retry": copy.deepcopy(retry), 

309 "max_connections": max_connections, 

310 "health_check_interval": health_check_interval, 

311 "client_name": client_name, 

312 "lib_name": lib_name, 

313 "lib_version": lib_version, 

314 "redis_connect_func": redis_connect_func, 

315 "credential_provider": credential_provider, 

316 "protocol": protocol, 

317 } 

318 # based on input, setup appropriate connection args 

319 if unix_socket_path is not None: 

320 kwargs.update( 

321 { 

322 "path": unix_socket_path, 

323 "connection_class": UnixDomainSocketConnection, 

324 } 

325 ) 

326 else: 

327 # TCP specific options 

328 kwargs.update( 

329 { 

330 "host": host, 

331 "port": port, 

332 "socket_connect_timeout": socket_connect_timeout, 

333 "socket_keepalive": socket_keepalive, 

334 "socket_keepalive_options": socket_keepalive_options, 

335 } 

336 ) 

337 

338 if ssl: 

339 kwargs.update( 

340 { 

341 "connection_class": SSLConnection, 

342 "ssl_keyfile": ssl_keyfile, 

343 "ssl_certfile": ssl_certfile, 

344 "ssl_cert_reqs": ssl_cert_reqs, 

345 "ssl_include_verify_flags": ssl_include_verify_flags, 

346 "ssl_exclude_verify_flags": ssl_exclude_verify_flags, 

347 "ssl_ca_certs": ssl_ca_certs, 

348 "ssl_ca_data": ssl_ca_data, 

349 "ssl_check_hostname": ssl_check_hostname, 

350 "ssl_password": ssl_password, 

351 "ssl_ca_path": ssl_ca_path, 

352 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled, 

353 "ssl_validate_ocsp": ssl_validate_ocsp, 

354 "ssl_ocsp_context": ssl_ocsp_context, 

355 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert, 

356 "ssl_min_version": ssl_min_version, 

357 "ssl_ciphers": ssl_ciphers, 

358 } 

359 ) 

360 if (cache_config or cache) and protocol in [3, "3"]: 

361 kwargs.update( 

362 { 

363 "cache": cache, 

364 "cache_config": cache_config, 

365 } 

366 ) 

367 maint_notifications_enabled = ( 

368 maint_notifications_config and maint_notifications_config.enabled 

369 ) 

370 if maint_notifications_enabled and protocol not in [ 

371 3, 

372 "3", 

373 ]: 

374 raise RedisError( 

375 "Maintenance notifications handlers on connection are only supported with RESP version 3" 

376 ) 

377 if maint_notifications_config: 

378 kwargs.update( 

379 { 

380 "maint_notifications_config": maint_notifications_config, 

381 } 

382 ) 

383 connection_pool = ConnectionPool(**kwargs) 

384 self._event_dispatcher.dispatch( 

385 AfterPooledConnectionsInstantiationEvent( 

386 [connection_pool], ClientType.SYNC, credential_provider 

387 ) 

388 ) 

389 self.auto_close_connection_pool = True 

390 else: 

391 self.auto_close_connection_pool = False 

392 self._event_dispatcher.dispatch( 

393 AfterPooledConnectionsInstantiationEvent( 

394 [connection_pool], ClientType.SYNC, credential_provider 

395 ) 

396 ) 

397 

398 self.connection_pool = connection_pool 

399 

400 if (cache_config or cache) and self.connection_pool.get_protocol() not in [ 

401 3, 

402 "3", 

403 ]: 

404 raise RedisError("Client caching is only supported with RESP version 3") 

405 

406 self.single_connection_lock = threading.RLock() 

407 self.connection = None 

408 self._single_connection_client = single_connection_client 

409 if self._single_connection_client: 

410 self.connection = self.connection_pool.get_connection() 

411 self._event_dispatcher.dispatch( 

412 AfterSingleConnectionInstantiationEvent( 

413 self.connection, ClientType.SYNC, self.single_connection_lock 

414 ) 

415 ) 

416 

417 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks) 

418 

419 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]: 

420 self.response_callbacks.update(_RedisCallbacksRESP3) 

421 else: 

422 self.response_callbacks.update(_RedisCallbacksRESP2) 

423 

424 def __repr__(self) -> str: 

425 return ( 

426 f"<{type(self).__module__}.{type(self).__name__}" 

427 f"({repr(self.connection_pool)})>" 

428 ) 

429 

430 def get_encoder(self) -> "Encoder": 

431 """Get the connection pool's encoder""" 

432 return self.connection_pool.get_encoder() 

433 

434 def get_connection_kwargs(self) -> Dict: 

435 """Get the connection's key-word arguments""" 

436 return self.connection_pool.connection_kwargs 

437 

438 def get_retry(self) -> Optional[Retry]: 

439 return self.get_connection_kwargs().get("retry") 

440 

441 def set_retry(self, retry: Retry) -> None: 

442 self.get_connection_kwargs().update({"retry": retry}) 

443 self.connection_pool.set_retry(retry) 

444 

445 def set_response_callback(self, command: str, callback: Callable) -> None: 

446 """Set a custom Response Callback""" 

447 self.response_callbacks[command] = callback 

448 

449 def load_external_module(self, funcname, func) -> None: 

450 """ 

451 This function can be used to add externally defined redis modules, 

452 and their namespaces to the redis client. 

453 

454 funcname - A string containing the name of the function to create 

455 func - The function, being added to this class. 

456 

457 ex: Assume that one has a custom redis module named foomod that 

458 creates command named 'foo.dothing' and 'foo.anotherthing' in redis. 

459 To load function functions into this namespace: 

460 

461 from redis import Redis 

462 from foomodule import F 

463 r = Redis() 

464 r.load_external_module("foo", F) 

465 r.foo().dothing('your', 'arguments') 

466 

467 For a concrete example see the reimport of the redisjson module in 

468 tests/test_connection.py::test_loading_external_modules 

469 """ 

470 setattr(self, funcname, func) 

471 

472 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline": 

473 """ 

474 Return a new pipeline object that can queue multiple commands for 

475 later execution. ``transaction`` indicates whether all commands 

476 should be executed atomically. Apart from making a group of operations 

477 atomic, pipelines are useful for reducing the back-and-forth overhead 

478 between the client and server. 

479 """ 

480 return Pipeline( 

481 self.connection_pool, self.response_callbacks, transaction, shard_hint 

482 ) 

483 

484 def transaction( 

485 self, func: Callable[["Pipeline"], None], *watches, **kwargs 

486 ) -> Union[List[Any], Any, None]: 

487 """ 

488 Convenience method for executing the callable `func` as a transaction 

489 while watching all keys specified in `watches`. The 'func' callable 

490 should expect a single argument which is a Pipeline object. 

491 """ 

492 shard_hint = kwargs.pop("shard_hint", None) 

493 value_from_callable = kwargs.pop("value_from_callable", False) 

494 watch_delay = kwargs.pop("watch_delay", None) 

495 with self.pipeline(True, shard_hint) as pipe: 

496 while True: 

497 try: 

498 if watches: 

499 pipe.watch(*watches) 

500 func_value = func(pipe) 

501 exec_value = pipe.execute() 

502 return func_value if value_from_callable else exec_value 

503 except WatchError: 

504 if watch_delay is not None and watch_delay > 0: 

505 time.sleep(watch_delay) 

506 continue 

507 

508 def lock( 

509 self, 

510 name: str, 

511 timeout: Optional[float] = None, 

512 sleep: float = 0.1, 

513 blocking: bool = True, 

514 blocking_timeout: Optional[float] = None, 

515 lock_class: Union[None, Any] = None, 

516 thread_local: bool = True, 

517 raise_on_release_error: bool = True, 

518 ): 

519 """ 

520 Return a new Lock object using key ``name`` that mimics 

521 the behavior of threading.Lock. 

522 

523 If specified, ``timeout`` indicates a maximum life for the lock. 

524 By default, it will remain locked until release() is called. 

525 

526 ``sleep`` indicates the amount of time to sleep per loop iteration 

527 when the lock is in blocking mode and another client is currently 

528 holding the lock. 

529 

530 ``blocking`` indicates whether calling ``acquire`` should block until 

531 the lock has been acquired or to fail immediately, causing ``acquire`` 

532 to return False and the lock not being acquired. Defaults to True. 

533 Note this value can be overridden by passing a ``blocking`` 

534 argument to ``acquire``. 

535 

536 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

537 spend trying to acquire the lock. A value of ``None`` indicates 

538 continue trying forever. ``blocking_timeout`` can be specified as a 

539 float or integer, both representing the number of seconds to wait. 

540 

541 ``lock_class`` forces the specified lock implementation. Note that as 

542 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

543 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

544 you have created your own custom lock class. 

545 

546 ``thread_local`` indicates whether the lock token is placed in 

547 thread-local storage. By default, the token is placed in thread local 

548 storage so that a thread only sees its token, not a token set by 

549 another thread. Consider the following timeline: 

550 

551 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

552 thread-1 sets the token to "abc" 

553 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

554 Lock instance. 

555 time: 5, thread-1 has not yet completed. redis expires the lock 

556 key. 

557 time: 5, thread-2 acquired `my-lock` now that it's available. 

558 thread-2 sets the token to "xyz" 

559 time: 6, thread-1 finishes its work and calls release(). if the 

560 token is *not* stored in thread local storage, then 

561 thread-1 would see the token value as "xyz" and would be 

562 able to successfully release the thread-2's lock. 

563 

564 ``raise_on_release_error`` indicates whether to raise an exception when 

565 the lock is no longer owned when exiting the context manager. By default, 

566 this is True, meaning an exception will be raised. If False, the warning 

567 will be logged and the exception will be suppressed. 

568 

569 In some use cases it's necessary to disable thread local storage. For 

570 example, if you have code where one thread acquires a lock and passes 

571 that lock instance to a worker thread to release later. If thread 

572 local storage isn't disabled in this case, the worker thread won't see 

573 the token set by the thread that acquired the lock. Our assumption 

574 is that these cases aren't common and as such default to using 

575 thread local storage.""" 

576 if lock_class is None: 

577 lock_class = Lock 

578 return lock_class( 

579 self, 

580 name, 

581 timeout=timeout, 

582 sleep=sleep, 

583 blocking=blocking, 

584 blocking_timeout=blocking_timeout, 

585 thread_local=thread_local, 

586 raise_on_release_error=raise_on_release_error, 

587 ) 

588 

589 def pubsub(self, **kwargs): 

590 """ 

591 Return a Publish/Subscribe object. With this object, you can 

592 subscribe to channels and listen for messages that get published to 

593 them. 

594 """ 

595 return PubSub( 

596 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs 

597 ) 

598 

599 def monitor(self): 

600 return Monitor(self.connection_pool) 

601 

602 def client(self): 

603 return self.__class__( 

604 connection_pool=self.connection_pool, 

605 single_connection_client=True, 

606 ) 

607 

608 def __enter__(self): 

609 return self 

610 

611 def __exit__(self, exc_type, exc_value, traceback): 

612 self.close() 

613 

614 def __del__(self): 

615 try: 

616 self.close() 

617 except Exception: 

618 pass 

619 

620 def close(self) -> None: 

621 # In case a connection property does not yet exist 

622 # (due to a crash earlier in the Redis() constructor), return 

623 # immediately as there is nothing to clean-up. 

624 if not hasattr(self, "connection"): 

625 return 

626 

627 conn = self.connection 

628 if conn: 

629 self.connection = None 

630 self.connection_pool.release(conn) 

631 

632 if self.auto_close_connection_pool: 

633 self.connection_pool.disconnect() 

634 

635 def _send_command_parse_response(self, conn, command_name, *args, **options): 

636 """ 

637 Send a command and parse the response 

638 """ 

639 conn.send_command(*args, **options) 

640 return self.parse_response(conn, command_name, **options) 

641 

642 def _close_connection(self, conn) -> None: 

643 """ 

644 Close the connection before retrying. 

645 

646 The supported exceptions are already checked in the 

647 retry object so we don't need to do it here. 

648 

649 After we disconnect the connection, it will try to reconnect and 

650 do a health check as part of the send_command logic(on connection level). 

651 """ 

652 

653 conn.disconnect() 

654 

655 # COMMAND EXECUTION AND PROTOCOL PARSING 

656 def execute_command(self, *args, **options): 

657 return self._execute_command(*args, **options) 

658 

659 def _execute_command(self, *args, **options): 

660 """Execute a command and return a parsed response""" 

661 pool = self.connection_pool 

662 command_name = args[0] 

663 conn = self.connection or pool.get_connection() 

664 

665 if self._single_connection_client: 

666 self.single_connection_lock.acquire() 

667 try: 

668 return conn.retry.call_with_retry( 

669 lambda: self._send_command_parse_response( 

670 conn, command_name, *args, **options 

671 ), 

672 lambda _: self._close_connection(conn), 

673 ) 

674 

675 finally: 

676 if conn and conn.should_reconnect(): 

677 self._close_connection(conn) 

678 conn.connect() 

679 if self._single_connection_client: 

680 self.single_connection_lock.release() 

681 if not self.connection: 

682 pool.release(conn) 

683 

684 def parse_response(self, connection, command_name, **options): 

685 """Parses a response from the Redis server""" 

686 try: 

687 if NEVER_DECODE in options: 

688 response = connection.read_response(disable_decoding=True) 

689 options.pop(NEVER_DECODE) 

690 else: 

691 response = connection.read_response() 

692 except ResponseError: 

693 if EMPTY_RESPONSE in options: 

694 return options[EMPTY_RESPONSE] 

695 raise 

696 

697 if EMPTY_RESPONSE in options: 

698 options.pop(EMPTY_RESPONSE) 

699 

700 # Remove keys entry, it needs only for cache. 

701 options.pop("keys", None) 

702 

703 if command_name in self.response_callbacks: 

704 return self.response_callbacks[command_name](response, **options) 

705 return response 

706 

707 def get_cache(self) -> Optional[CacheInterface]: 

708 return self.connection_pool.cache 

709 

710 

711StrictRedis = Redis 

712 

713 

714class Monitor: 

715 """ 

716 Monitor is useful for handling the MONITOR command to the redis server. 

717 next_command() method returns one command from monitor 

718 listen() method yields commands from monitor. 

719 """ 

720 

721 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)") 

722 command_re = re.compile(r'"(.*?)(?<!\\)"') 

723 

724 def __init__(self, connection_pool): 

725 self.connection_pool = connection_pool 

726 self.connection = self.connection_pool.get_connection() 

727 

728 def __enter__(self): 

729 self._start_monitor() 

730 return self 

731 

732 def __exit__(self, *args): 

733 self.connection.disconnect() 

734 self.connection_pool.release(self.connection) 

735 

736 def next_command(self): 

737 """Parse the response from a monitor command""" 

738 response = self.connection.read_response() 

739 

740 if response is None: 

741 return None 

742 

743 if isinstance(response, bytes): 

744 response = self.connection.encoder.decode(response, force=True) 

745 

746 command_time, command_data = response.split(" ", 1) 

747 m = self.monitor_re.match(command_data) 

748 db_id, client_info, command = m.groups() 

749 command = " ".join(self.command_re.findall(command)) 

750 # Redis escapes double quotes because each piece of the command 

751 # string is surrounded by double quotes. We don't have that 

752 # requirement so remove the escaping and leave the quote. 

753 command = command.replace('\\"', '"') 

754 

755 if client_info == "lua": 

756 client_address = "lua" 

757 client_port = "" 

758 client_type = "lua" 

759 elif client_info.startswith("unix"): 

760 client_address = "unix" 

761 client_port = client_info[5:] 

762 client_type = "unix" 

763 else: 

764 # use rsplit as ipv6 addresses contain colons 

765 client_address, client_port = client_info.rsplit(":", 1) 

766 client_type = "tcp" 

767 return { 

768 "time": float(command_time), 

769 "db": int(db_id), 

770 "client_address": client_address, 

771 "client_port": client_port, 

772 "client_type": client_type, 

773 "command": command, 

774 } 

775 

776 def listen(self): 

777 """Listen for commands coming to the server.""" 

778 while True: 

779 yield self.next_command() 

780 

781 def _start_monitor(self): 

782 self.connection.send_command("MONITOR") 

783 # check that monitor returns 'OK', but don't return it to user 

784 response = self.connection.read_response() 

785 

786 if not bool_ok(response): 

787 raise RedisError(f"MONITOR failed: {response}") 

788 

789 

790class PubSub: 

791 """ 

792 PubSub provides publish, subscribe and listen support to Redis channels. 

793 

794 After subscribing to one or more channels, the listen() method will block 

795 until a message arrives on one of the subscribed channels. That message 

796 will be returned and it's safe to start listening again. 

797 """ 

798 

799 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage") 

800 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe") 

801 HEALTH_CHECK_MESSAGE = "redis-py-health-check" 

802 

803 def __init__( 

804 self, 

805 connection_pool, 

806 shard_hint=None, 

807 ignore_subscribe_messages: bool = False, 

808 encoder: Optional["Encoder"] = None, 

809 push_handler_func: Union[None, Callable[[str], None]] = None, 

810 event_dispatcher: Optional["EventDispatcher"] = None, 

811 ): 

812 self.connection_pool = connection_pool 

813 self.shard_hint = shard_hint 

814 self.ignore_subscribe_messages = ignore_subscribe_messages 

815 self.connection = None 

816 self.subscribed_event = threading.Event() 

817 # we need to know the encoding options for this connection in order 

818 # to lookup channel and pattern names for callback handlers. 

819 self.encoder = encoder 

820 self.push_handler_func = push_handler_func 

821 if event_dispatcher is None: 

822 self._event_dispatcher = EventDispatcher() 

823 else: 

824 self._event_dispatcher = event_dispatcher 

825 

826 self._lock = threading.RLock() 

827 if self.encoder is None: 

828 self.encoder = self.connection_pool.get_encoder() 

829 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) 

830 if self.encoder.decode_responses: 

831 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] 

832 else: 

833 self.health_check_response = [b"pong", self.health_check_response_b] 

834 if self.push_handler_func is None: 

835 _set_info_logger() 

836 self.reset() 

837 

838 def __enter__(self) -> "PubSub": 

839 return self 

840 

841 def __exit__(self, exc_type, exc_value, traceback) -> None: 

842 self.reset() 

843 

844 def __del__(self) -> None: 

845 try: 

846 # if this object went out of scope prior to shutting down 

847 # subscriptions, close the connection manually before 

848 # returning it to the connection pool 

849 self.reset() 

850 except Exception: 

851 pass 

852 

853 def reset(self) -> None: 

854 if self.connection: 

855 self.connection.disconnect() 

856 self.connection.deregister_connect_callback(self.on_connect) 

857 self.connection_pool.release(self.connection) 

858 self.connection = None 

859 self.health_check_response_counter = 0 

860 self.channels = {} 

861 self.pending_unsubscribe_channels = set() 

862 self.shard_channels = {} 

863 self.pending_unsubscribe_shard_channels = set() 

864 self.patterns = {} 

865 self.pending_unsubscribe_patterns = set() 

866 self.subscribed_event.clear() 

867 

868 def close(self) -> None: 

869 self.reset() 

870 

871 def on_connect(self, connection) -> None: 

872 "Re-subscribe to any channels and patterns previously subscribed to" 

873 # NOTE: for python3, we can't pass bytestrings as keyword arguments 

874 # so we need to decode channel/pattern names back to unicode strings 

875 # before passing them to [p]subscribe. 

876 self.pending_unsubscribe_channels.clear() 

877 self.pending_unsubscribe_patterns.clear() 

878 self.pending_unsubscribe_shard_channels.clear() 

879 if self.channels: 

880 channels = { 

881 self.encoder.decode(k, force=True): v for k, v in self.channels.items() 

882 } 

883 self.subscribe(**channels) 

884 if self.patterns: 

885 patterns = { 

886 self.encoder.decode(k, force=True): v for k, v in self.patterns.items() 

887 } 

888 self.psubscribe(**patterns) 

889 if self.shard_channels: 

890 shard_channels = { 

891 self.encoder.decode(k, force=True): v 

892 for k, v in self.shard_channels.items() 

893 } 

894 self.ssubscribe(**shard_channels) 

895 

896 @property 

897 def subscribed(self) -> bool: 

898 """Indicates if there are subscriptions to any channels or patterns""" 

899 return self.subscribed_event.is_set() 

900 

901 def execute_command(self, *args): 

902 """Execute a publish/subscribe command""" 

903 

904 # NOTE: don't parse the response in this function -- it could pull a 

905 # legitimate message off the stack if the connection is already 

906 # subscribed to one or more channels 

907 

908 if self.connection is None: 

909 self.connection = self.connection_pool.get_connection() 

910 # register a callback that re-subscribes to any channels we 

911 # were listening to when we were disconnected 

912 self.connection.register_connect_callback(self.on_connect) 

913 if self.push_handler_func is not None: 

914 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

915 self._event_dispatcher.dispatch( 

916 AfterPubSubConnectionInstantiationEvent( 

917 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

918 ) 

919 ) 

920 connection = self.connection 

921 kwargs = {"check_health": not self.subscribed} 

922 if not self.subscribed: 

923 self.clean_health_check_responses() 

924 with self._lock: 

925 self._execute(connection, connection.send_command, *args, **kwargs) 

926 

927 def clean_health_check_responses(self) -> None: 

928 """ 

929 If any health check responses are present, clean them 

930 """ 

931 ttl = 10 

932 conn = self.connection 

933 while conn and self.health_check_response_counter > 0 and ttl > 0: 

934 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout): 

935 response = self._execute(conn, conn.read_response) 

936 if self.is_health_check_response(response): 

937 self.health_check_response_counter -= 1 

938 else: 

939 raise PubSubError( 

940 "A non health check response was cleaned by " 

941 "execute_command: {}".format(response) 

942 ) 

943 ttl -= 1 

944 

945 def _reconnect(self, conn) -> None: 

946 """ 

947 The supported exceptions are already checked in the 

948 retry object so we don't need to do it here. 

949 

950 In this error handler we are trying to reconnect to the server. 

951 """ 

952 conn.disconnect() 

953 conn.connect() 

954 

955 def _execute(self, conn, command, *args, **kwargs): 

956 """ 

957 Connect manually upon disconnection. If the Redis server is down, 

958 this will fail and raise a ConnectionError as desired. 

959 After reconnection, the ``on_connect`` callback should have been 

960 called by the # connection to resubscribe us to any channels and 

961 patterns we were previously listening to 

962 """ 

963 

964 if conn.should_reconnect(): 

965 self._reconnect(conn) 

966 

967 response = conn.retry.call_with_retry( 

968 lambda: command(*args, **kwargs), 

969 lambda _: self._reconnect(conn), 

970 ) 

971 

972 return response 

973 

974 def parse_response(self, block=True, timeout=0): 

975 """Parse the response from a publish/subscribe command""" 

976 conn = self.connection 

977 if conn is None: 

978 raise RuntimeError( 

979 "pubsub connection not set: " 

980 "did you forget to call subscribe() or psubscribe()?" 

981 ) 

982 

983 self.check_health() 

984 

985 def try_read(): 

986 if not block: 

987 if not conn.can_read(timeout=timeout): 

988 return None 

989 else: 

990 conn.connect() 

991 return conn.read_response(disconnect_on_error=False, push_request=True) 

992 

993 response = self._execute(conn, try_read) 

994 

995 if self.is_health_check_response(response): 

996 # ignore the health check message as user might not expect it 

997 self.health_check_response_counter -= 1 

998 return None 

999 return response 

1000 

1001 def is_health_check_response(self, response) -> bool: 

1002 """ 

1003 Check if the response is a health check response. 

1004 If there are no subscriptions redis responds to PING command with a 

1005 bulk response, instead of a multi-bulk with "pong" and the response. 

1006 """ 

1007 return response in [ 

1008 self.health_check_response, # If there was a subscription 

1009 self.health_check_response_b, # If there wasn't 

1010 ] 

1011 

1012 def check_health(self) -> None: 

1013 conn = self.connection 

1014 if conn is None: 

1015 raise RuntimeError( 

1016 "pubsub connection not set: " 

1017 "did you forget to call subscribe() or psubscribe()?" 

1018 ) 

1019 

1020 if conn.health_check_interval and time.monotonic() > conn.next_health_check: 

1021 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False) 

1022 self.health_check_response_counter += 1 

1023 

1024 def _normalize_keys(self, data) -> Dict: 

1025 """ 

1026 normalize channel/pattern names to be either bytes or strings 

1027 based on whether responses are automatically decoded. this saves us 

1028 from coercing the value for each message coming in. 

1029 """ 

1030 encode = self.encoder.encode 

1031 decode = self.encoder.decode 

1032 return {decode(encode(k)): v for k, v in data.items()} 

1033 

1034 def psubscribe(self, *args, **kwargs): 

1035 """ 

1036 Subscribe to channel patterns. Patterns supplied as keyword arguments 

1037 expect a pattern name as the key and a callable as the value. A 

1038 pattern's callable will be invoked automatically when a message is 

1039 received on that pattern rather than producing a message via 

1040 ``listen()``. 

1041 """ 

1042 if args: 

1043 args = list_or_args(args[0], args[1:]) 

1044 new_patterns = dict.fromkeys(args) 

1045 new_patterns.update(kwargs) 

1046 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys()) 

1047 # update the patterns dict AFTER we send the command. we don't want to 

1048 # subscribe twice to these patterns, once for the command and again 

1049 # for the reconnection. 

1050 new_patterns = self._normalize_keys(new_patterns) 

1051 self.patterns.update(new_patterns) 

1052 if not self.subscribed: 

1053 # Set the subscribed_event flag to True 

1054 self.subscribed_event.set() 

1055 # Clear the health check counter 

1056 self.health_check_response_counter = 0 

1057 self.pending_unsubscribe_patterns.difference_update(new_patterns) 

1058 return ret_val 

1059 

1060 def punsubscribe(self, *args): 

1061 """ 

1062 Unsubscribe from the supplied patterns. If empty, unsubscribe from 

1063 all patterns. 

1064 """ 

1065 if args: 

1066 args = list_or_args(args[0], args[1:]) 

1067 patterns = self._normalize_keys(dict.fromkeys(args)) 

1068 else: 

1069 patterns = self.patterns 

1070 self.pending_unsubscribe_patterns.update(patterns) 

1071 return self.execute_command("PUNSUBSCRIBE", *args) 

1072 

1073 def subscribe(self, *args, **kwargs): 

1074 """ 

1075 Subscribe to channels. Channels supplied as keyword arguments expect 

1076 a channel name as the key and a callable as the value. A channel's 

1077 callable will be invoked automatically when a message is received on 

1078 that channel rather than producing a message via ``listen()`` or 

1079 ``get_message()``. 

1080 """ 

1081 if args: 

1082 args = list_or_args(args[0], args[1:]) 

1083 new_channels = dict.fromkeys(args) 

1084 new_channels.update(kwargs) 

1085 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys()) 

1086 # update the channels dict AFTER we send the command. we don't want to 

1087 # subscribe twice to these channels, once for the command and again 

1088 # for the reconnection. 

1089 new_channels = self._normalize_keys(new_channels) 

1090 self.channels.update(new_channels) 

1091 if not self.subscribed: 

1092 # Set the subscribed_event flag to True 

1093 self.subscribed_event.set() 

1094 # Clear the health check counter 

1095 self.health_check_response_counter = 0 

1096 self.pending_unsubscribe_channels.difference_update(new_channels) 

1097 return ret_val 

1098 

1099 def unsubscribe(self, *args): 

1100 """ 

1101 Unsubscribe from the supplied channels. If empty, unsubscribe from 

1102 all channels 

1103 """ 

1104 if args: 

1105 args = list_or_args(args[0], args[1:]) 

1106 channels = self._normalize_keys(dict.fromkeys(args)) 

1107 else: 

1108 channels = self.channels 

1109 self.pending_unsubscribe_channels.update(channels) 

1110 return self.execute_command("UNSUBSCRIBE", *args) 

1111 

1112 def ssubscribe(self, *args, target_node=None, **kwargs): 

1113 """ 

1114 Subscribes the client to the specified shard channels. 

1115 Channels supplied as keyword arguments expect a channel name as the key 

1116 and a callable as the value. A channel's callable will be invoked automatically 

1117 when a message is received on that channel rather than producing a message via 

1118 ``listen()`` or ``get_sharded_message()``. 

1119 """ 

1120 if args: 

1121 args = list_or_args(args[0], args[1:]) 

1122 new_s_channels = dict.fromkeys(args) 

1123 new_s_channels.update(kwargs) 

1124 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys()) 

1125 # update the s_channels dict AFTER we send the command. we don't want to 

1126 # subscribe twice to these channels, once for the command and again 

1127 # for the reconnection. 

1128 new_s_channels = self._normalize_keys(new_s_channels) 

1129 self.shard_channels.update(new_s_channels) 

1130 if not self.subscribed: 

1131 # Set the subscribed_event flag to True 

1132 self.subscribed_event.set() 

1133 # Clear the health check counter 

1134 self.health_check_response_counter = 0 

1135 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels) 

1136 return ret_val 

1137 

1138 def sunsubscribe(self, *args, target_node=None): 

1139 """ 

1140 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from 

1141 all shard_channels 

1142 """ 

1143 if args: 

1144 args = list_or_args(args[0], args[1:]) 

1145 s_channels = self._normalize_keys(dict.fromkeys(args)) 

1146 else: 

1147 s_channels = self.shard_channels 

1148 self.pending_unsubscribe_shard_channels.update(s_channels) 

1149 return self.execute_command("SUNSUBSCRIBE", *args) 

1150 

1151 def listen(self): 

1152 "Listen for messages on channels this client has been subscribed to" 

1153 while self.subscribed: 

1154 response = self.handle_message(self.parse_response(block=True)) 

1155 if response is not None: 

1156 yield response 

1157 

1158 def get_message( 

1159 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0 

1160 ): 

1161 """ 

1162 Get the next message if one is available, otherwise None. 

1163 

1164 If timeout is specified, the system will wait for `timeout` seconds 

1165 before returning. Timeout should be specified as a floating point 

1166 number, or None, to wait indefinitely. 

1167 """ 

1168 if not self.subscribed: 

1169 # Wait for subscription 

1170 start_time = time.monotonic() 

1171 if self.subscribed_event.wait(timeout) is True: 

1172 # The connection was subscribed during the timeout time frame. 

1173 # The timeout should be adjusted based on the time spent 

1174 # waiting for the subscription 

1175 time_spent = time.monotonic() - start_time 

1176 timeout = max(0.0, timeout - time_spent) 

1177 else: 

1178 # The connection isn't subscribed to any channels or patterns, 

1179 # so no messages are available 

1180 return None 

1181 

1182 response = self.parse_response(block=(timeout is None), timeout=timeout) 

1183 

1184 if response: 

1185 return self.handle_message(response, ignore_subscribe_messages) 

1186 return None 

1187 

1188 get_sharded_message = get_message 

1189 

1190 def ping(self, message: Union[str, None] = None) -> bool: 

1191 """ 

1192 Ping the Redis server to test connectivity. 

1193 

1194 Sends a PING command to the Redis server and returns True if the server 

1195 responds with "PONG". 

1196 """ 

1197 args = ["PING", message] if message is not None else ["PING"] 

1198 return self.execute_command(*args) 

1199 

1200 def handle_message(self, response, ignore_subscribe_messages=False): 

1201 """ 

1202 Parses a pub/sub message. If the channel or pattern was subscribed to 

1203 with a message handler, the handler is invoked instead of a parsed 

1204 message being returned. 

1205 """ 

1206 if response is None: 

1207 return None 

1208 if isinstance(response, bytes): 

1209 response = [b"pong", response] if response != b"PONG" else [b"pong", b""] 

1210 

1211 message_type = str_if_bytes(response[0]) 

1212 if message_type == "pmessage": 

1213 message = { 

1214 "type": message_type, 

1215 "pattern": response[1], 

1216 "channel": response[2], 

1217 "data": response[3], 

1218 } 

1219 elif message_type == "pong": 

1220 message = { 

1221 "type": message_type, 

1222 "pattern": None, 

1223 "channel": None, 

1224 "data": response[1], 

1225 } 

1226 else: 

1227 message = { 

1228 "type": message_type, 

1229 "pattern": None, 

1230 "channel": response[1], 

1231 "data": response[2], 

1232 } 

1233 

1234 # if this is an unsubscribe message, remove it from memory 

1235 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 

1236 if message_type == "punsubscribe": 

1237 pattern = response[1] 

1238 if pattern in self.pending_unsubscribe_patterns: 

1239 self.pending_unsubscribe_patterns.remove(pattern) 

1240 self.patterns.pop(pattern, None) 

1241 elif message_type == "sunsubscribe": 

1242 s_channel = response[1] 

1243 if s_channel in self.pending_unsubscribe_shard_channels: 

1244 self.pending_unsubscribe_shard_channels.remove(s_channel) 

1245 self.shard_channels.pop(s_channel, None) 

1246 else: 

1247 channel = response[1] 

1248 if channel in self.pending_unsubscribe_channels: 

1249 self.pending_unsubscribe_channels.remove(channel) 

1250 self.channels.pop(channel, None) 

1251 if not self.channels and not self.patterns and not self.shard_channels: 

1252 # There are no subscriptions anymore, set subscribed_event flag 

1253 # to false 

1254 self.subscribed_event.clear() 

1255 

1256 if message_type in self.PUBLISH_MESSAGE_TYPES: 

1257 # if there's a message handler, invoke it 

1258 if message_type == "pmessage": 

1259 handler = self.patterns.get(message["pattern"], None) 

1260 elif message_type == "smessage": 

1261 handler = self.shard_channels.get(message["channel"], None) 

1262 else: 

1263 handler = self.channels.get(message["channel"], None) 

1264 if handler: 

1265 handler(message) 

1266 return None 

1267 elif message_type != "pong": 

1268 # this is a subscribe/unsubscribe message. ignore if we don't 

1269 # want them 

1270 if ignore_subscribe_messages or self.ignore_subscribe_messages: 

1271 return None 

1272 

1273 return message 

1274 

1275 def run_in_thread( 

1276 self, 

1277 sleep_time: float = 0.0, 

1278 daemon: bool = False, 

1279 exception_handler: Optional[Callable] = None, 

1280 pubsub=None, 

1281 sharded_pubsub: bool = False, 

1282 ) -> "PubSubWorkerThread": 

1283 for channel, handler in self.channels.items(): 

1284 if handler is None: 

1285 raise PubSubError(f"Channel: '{channel}' has no handler registered") 

1286 for pattern, handler in self.patterns.items(): 

1287 if handler is None: 

1288 raise PubSubError(f"Pattern: '{pattern}' has no handler registered") 

1289 for s_channel, handler in self.shard_channels.items(): 

1290 if handler is None: 

1291 raise PubSubError( 

1292 f"Shard Channel: '{s_channel}' has no handler registered" 

1293 ) 

1294 

1295 pubsub = self if pubsub is None else pubsub 

1296 thread = PubSubWorkerThread( 

1297 pubsub, 

1298 sleep_time, 

1299 daemon=daemon, 

1300 exception_handler=exception_handler, 

1301 sharded_pubsub=sharded_pubsub, 

1302 ) 

1303 thread.start() 

1304 return thread 

1305 

1306 

1307class PubSubWorkerThread(threading.Thread): 

1308 def __init__( 

1309 self, 

1310 pubsub, 

1311 sleep_time: float, 

1312 daemon: bool = False, 

1313 exception_handler: Union[ 

1314 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None 

1315 ] = None, 

1316 sharded_pubsub: bool = False, 

1317 ): 

1318 super().__init__() 

1319 self.daemon = daemon 

1320 self.pubsub = pubsub 

1321 self.sleep_time = sleep_time 

1322 self.exception_handler = exception_handler 

1323 self.sharded_pubsub = sharded_pubsub 

1324 self._running = threading.Event() 

1325 

1326 def run(self) -> None: 

1327 if self._running.is_set(): 

1328 return 

1329 self._running.set() 

1330 pubsub = self.pubsub 

1331 sleep_time = self.sleep_time 

1332 while self._running.is_set(): 

1333 try: 

1334 if not self.sharded_pubsub: 

1335 pubsub.get_message( 

1336 ignore_subscribe_messages=True, timeout=sleep_time 

1337 ) 

1338 else: 

1339 pubsub.get_sharded_message( 

1340 ignore_subscribe_messages=True, timeout=sleep_time 

1341 ) 

1342 except BaseException as e: 

1343 if self.exception_handler is None: 

1344 raise 

1345 self.exception_handler(e, pubsub, self) 

1346 pubsub.close() 

1347 

1348 def stop(self) -> None: 

1349 # trip the flag so the run loop exits. the run loop will 

1350 # close the pubsub connection, which disconnects the socket 

1351 # and returns the connection to the pool. 

1352 self._running.clear() 

1353 

1354 

1355class Pipeline(Redis): 

1356 """ 

1357 Pipelines provide a way to transmit multiple commands to the Redis server 

1358 in one transmission. This is convenient for batch processing, such as 

1359 saving all the values in a list to Redis. 

1360 

1361 All commands executed within a pipeline(when running in transactional mode, 

1362 which is the default behavior) are wrapped with MULTI and EXEC 

1363 calls. This guarantees all commands executed in the pipeline will be 

1364 executed atomically. 

1365 

1366 Any command raising an exception does *not* halt the execution of 

1367 subsequent commands in the pipeline. Instead, the exception is caught 

1368 and its instance is placed into the response list returned by execute(). 

1369 Code iterating over the response list should be able to deal with an 

1370 instance of an exception as a potential value. In general, these will be 

1371 ResponseError exceptions, such as those raised when issuing a command 

1372 on a key of a different datatype. 

1373 """ 

1374 

1375 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

1376 

1377 def __init__( 

1378 self, 

1379 connection_pool: ConnectionPool, 

1380 response_callbacks, 

1381 transaction, 

1382 shard_hint, 

1383 ): 

1384 self.connection_pool = connection_pool 

1385 self.connection: Optional[Connection] = None 

1386 self.response_callbacks = response_callbacks 

1387 self.transaction = transaction 

1388 self.shard_hint = shard_hint 

1389 self.watching = False 

1390 self.command_stack = [] 

1391 self.scripts: Set[Script] = set() 

1392 self.explicit_transaction = False 

1393 

1394 def __enter__(self) -> "Pipeline": 

1395 return self 

1396 

1397 def __exit__(self, exc_type, exc_value, traceback): 

1398 self.reset() 

1399 

1400 def __del__(self): 

1401 try: 

1402 self.reset() 

1403 except Exception: 

1404 pass 

1405 

1406 def __len__(self) -> int: 

1407 return len(self.command_stack) 

1408 

1409 def __bool__(self) -> bool: 

1410 """Pipeline instances should always evaluate to True""" 

1411 return True 

1412 

1413 def reset(self) -> None: 

1414 self.command_stack = [] 

1415 self.scripts = set() 

1416 # make sure to reset the connection state in the event that we were 

1417 # watching something 

1418 if self.watching and self.connection: 

1419 try: 

1420 # call this manually since our unwatch or 

1421 # immediate_execute_command methods can call reset() 

1422 self.connection.send_command("UNWATCH") 

1423 self.connection.read_response() 

1424 except ConnectionError: 

1425 # disconnect will also remove any previous WATCHes 

1426 self.connection.disconnect() 

1427 # clean up the other instance attributes 

1428 self.watching = False 

1429 self.explicit_transaction = False 

1430 

1431 # we can safely return the connection to the pool here since we're 

1432 # sure we're no longer WATCHing anything 

1433 if self.connection: 

1434 self.connection_pool.release(self.connection) 

1435 self.connection = None 

1436 

1437 def close(self) -> None: 

1438 """Close the pipeline""" 

1439 self.reset() 

1440 

1441 def multi(self) -> None: 

1442 """ 

1443 Start a transactional block of the pipeline after WATCH commands 

1444 are issued. End the transactional block with `execute`. 

1445 """ 

1446 if self.explicit_transaction: 

1447 raise RedisError("Cannot issue nested calls to MULTI") 

1448 if self.command_stack: 

1449 raise RedisError( 

1450 "Commands without an initial WATCH have already been issued" 

1451 ) 

1452 self.explicit_transaction = True 

1453 

1454 def execute_command(self, *args, **kwargs): 

1455 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction: 

1456 return self.immediate_execute_command(*args, **kwargs) 

1457 return self.pipeline_execute_command(*args, **kwargs) 

1458 

1459 def _disconnect_reset_raise_on_watching( 

1460 self, 

1461 conn: AbstractConnection, 

1462 error: Exception, 

1463 ) -> None: 

1464 """ 

1465 Close the connection reset watching state and 

1466 raise an exception if we were watching. 

1467 

1468 The supported exceptions are already checked in the 

1469 retry object so we don't need to do it here. 

1470 

1471 After we disconnect the connection, it will try to reconnect and 

1472 do a health check as part of the send_command logic(on connection level). 

1473 """ 

1474 conn.disconnect() 

1475 

1476 # if we were already watching a variable, the watch is no longer 

1477 # valid since this connection has died. raise a WatchError, which 

1478 # indicates the user should retry this transaction. 

1479 if self.watching: 

1480 self.reset() 

1481 raise WatchError( 

1482 f"A {type(error).__name__} occurred while watching one or more keys" 

1483 ) 

1484 

1485 def immediate_execute_command(self, *args, **options): 

1486 """ 

1487 Execute a command immediately, but don't auto-retry on the supported 

1488 errors for retry if we're already WATCHing a variable. 

1489 Used when issuing WATCH or subsequent commands retrieving their values but before 

1490 MULTI is called. 

1491 """ 

1492 command_name = args[0] 

1493 conn = self.connection 

1494 # if this is the first call, we need a connection 

1495 if not conn: 

1496 conn = self.connection_pool.get_connection() 

1497 self.connection = conn 

1498 

1499 return conn.retry.call_with_retry( 

1500 lambda: self._send_command_parse_response( 

1501 conn, command_name, *args, **options 

1502 ), 

1503 lambda error: self._disconnect_reset_raise_on_watching(conn, error), 

1504 ) 

1505 

1506 def pipeline_execute_command(self, *args, **options) -> "Pipeline": 

1507 """ 

1508 Stage a command to be executed when execute() is next called 

1509 

1510 Returns the current Pipeline object back so commands can be 

1511 chained together, such as: 

1512 

1513 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

1514 

1515 At some other point, you can then run: pipe.execute(), 

1516 which will execute all commands queued in the pipe. 

1517 """ 

1518 self.command_stack.append((args, options)) 

1519 return self 

1520 

1521 def _execute_transaction( 

1522 self, connection: Connection, commands, raise_on_error 

1523 ) -> List: 

1524 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) 

1525 all_cmds = connection.pack_commands( 

1526 [args for args, options in cmds if EMPTY_RESPONSE not in options] 

1527 ) 

1528 connection.send_packed_command(all_cmds) 

1529 errors = [] 

1530 

1531 # parse off the response for MULTI 

1532 # NOTE: we need to handle ResponseErrors here and continue 

1533 # so that we read all the additional command messages from 

1534 # the socket 

1535 try: 

1536 self.parse_response(connection, "_") 

1537 except ResponseError as e: 

1538 errors.append((0, e)) 

1539 

1540 # and all the other commands 

1541 for i, command in enumerate(commands): 

1542 if EMPTY_RESPONSE in command[1]: 

1543 errors.append((i, command[1][EMPTY_RESPONSE])) 

1544 else: 

1545 try: 

1546 self.parse_response(connection, "_") 

1547 except ResponseError as e: 

1548 self.annotate_exception(e, i + 1, command[0]) 

1549 errors.append((i, e)) 

1550 

1551 # parse the EXEC. 

1552 try: 

1553 response = self.parse_response(connection, "_") 

1554 except ExecAbortError: 

1555 if errors: 

1556 raise errors[0][1] 

1557 raise 

1558 

1559 # EXEC clears any watched keys 

1560 self.watching = False 

1561 

1562 if response is None: 

1563 raise WatchError("Watched variable changed.") 

1564 

1565 # put any parse errors into the response 

1566 for i, e in errors: 

1567 response.insert(i, e) 

1568 

1569 if len(response) != len(commands): 

1570 self.connection.disconnect() 

1571 raise ResponseError( 

1572 "Wrong number of response items from pipeline execution" 

1573 ) 

1574 

1575 # find any errors in the response and raise if necessary 

1576 if raise_on_error: 

1577 self.raise_first_error(commands, response) 

1578 

1579 # We have to run response callbacks manually 

1580 data = [] 

1581 for r, cmd in zip(response, commands): 

1582 if not isinstance(r, Exception): 

1583 args, options = cmd 

1584 # Remove keys entry, it needs only for cache. 

1585 options.pop("keys", None) 

1586 command_name = args[0] 

1587 if command_name in self.response_callbacks: 

1588 r = self.response_callbacks[command_name](r, **options) 

1589 data.append(r) 

1590 

1591 return data 

1592 

1593 def _execute_pipeline(self, connection, commands, raise_on_error): 

1594 # build up all commands into a single request to increase network perf 

1595 all_cmds = connection.pack_commands([args for args, _ in commands]) 

1596 connection.send_packed_command(all_cmds) 

1597 

1598 responses = [] 

1599 for args, options in commands: 

1600 try: 

1601 responses.append(self.parse_response(connection, args[0], **options)) 

1602 except ResponseError as e: 

1603 responses.append(e) 

1604 

1605 if raise_on_error: 

1606 self.raise_first_error(commands, responses) 

1607 

1608 return responses 

1609 

1610 def raise_first_error(self, commands, response): 

1611 for i, r in enumerate(response): 

1612 if isinstance(r, ResponseError): 

1613 self.annotate_exception(r, i + 1, commands[i][0]) 

1614 raise r 

1615 

1616 def annotate_exception(self, exception, number, command): 

1617 cmd = " ".join(map(safe_str, command)) 

1618 msg = ( 

1619 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

1620 f"caused error: {exception.args[0]}" 

1621 ) 

1622 exception.args = (msg,) + exception.args[1:] 

1623 

1624 def parse_response(self, connection, command_name, **options): 

1625 result = Redis.parse_response(self, connection, command_name, **options) 

1626 if command_name in self.UNWATCH_COMMANDS: 

1627 self.watching = False 

1628 elif command_name == "WATCH": 

1629 self.watching = True 

1630 return result 

1631 

1632 def load_scripts(self): 

1633 # make sure all scripts that are about to be run on this pipeline exist 

1634 scripts = list(self.scripts) 

1635 immediate = self.immediate_execute_command 

1636 shas = [s.sha for s in scripts] 

1637 # we can't use the normal script_* methods because they would just 

1638 # get buffered in the pipeline. 

1639 exists = immediate("SCRIPT EXISTS", *shas) 

1640 if not all(exists): 

1641 for s, exist in zip(scripts, exists): 

1642 if not exist: 

1643 s.sha = immediate("SCRIPT LOAD", s.script) 

1644 

1645 def _disconnect_raise_on_watching( 

1646 self, 

1647 conn: AbstractConnection, 

1648 error: Exception, 

1649 ) -> None: 

1650 """ 

1651 Close the connection, raise an exception if we were watching. 

1652 

1653 The supported exceptions are already checked in the 

1654 retry object so we don't need to do it here. 

1655 

1656 After we disconnect the connection, it will try to reconnect and 

1657 do a health check as part of the send_command logic(on connection level). 

1658 """ 

1659 conn.disconnect() 

1660 # if we were watching a variable, the watch is no longer valid 

1661 # since this connection has died. raise a WatchError, which 

1662 # indicates the user should retry this transaction. 

1663 if self.watching: 

1664 raise WatchError( 

1665 f"A {type(error).__name__} occurred while watching one or more keys" 

1666 ) 

1667 

1668 def execute(self, raise_on_error: bool = True) -> List[Any]: 

1669 """Execute all the commands in the current pipeline""" 

1670 stack = self.command_stack 

1671 if not stack and not self.watching: 

1672 return [] 

1673 if self.scripts: 

1674 self.load_scripts() 

1675 if self.transaction or self.explicit_transaction: 

1676 execute = self._execute_transaction 

1677 else: 

1678 execute = self._execute_pipeline 

1679 

1680 conn = self.connection 

1681 if not conn: 

1682 conn = self.connection_pool.get_connection() 

1683 # assign to self.connection so reset() releases the connection 

1684 # back to the pool after we're done 

1685 self.connection = conn 

1686 

1687 try: 

1688 return conn.retry.call_with_retry( 

1689 lambda: execute(conn, stack, raise_on_error), 

1690 lambda error: self._disconnect_raise_on_watching(conn, error), 

1691 ) 

1692 finally: 

1693 # in reset() the connection is disconnected before returned to the pool if 

1694 # it is marked for reconnect. 

1695 self.reset() 

1696 

1697 def discard(self): 

1698 """ 

1699 Flushes all previously queued commands 

1700 See: https://redis.io/commands/DISCARD 

1701 """ 

1702 self.execute_command("DISCARD") 

1703 

1704 def watch(self, *names): 

1705 """Watches the values at keys ``names``""" 

1706 if self.explicit_transaction: 

1707 raise RedisError("Cannot issue a WATCH after a MULTI") 

1708 return self.execute_command("WATCH", *names) 

1709 

1710 def unwatch(self) -> bool: 

1711 """Unwatches all previously specified keys""" 

1712 return self.watching and self.execute_command("UNWATCH") or True