Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

701 statements  

1import copy 

2import re 

3import threading 

4import time 

5from itertools import chain 

6from typing import ( 

7 TYPE_CHECKING, 

8 Any, 

9 Callable, 

10 Dict, 

11 List, 

12 Mapping, 

13 Optional, 

14 Set, 

15 Type, 

16 Union, 

17) 

18 

19from redis._parsers.encoders import Encoder 

20from redis._parsers.helpers import ( 

21 _RedisCallbacks, 

22 _RedisCallbacksRESP2, 

23 _RedisCallbacksRESP3, 

24 bool_ok, 

25) 

26from redis.backoff import ExponentialWithJitterBackoff 

27from redis.cache import CacheConfig, CacheInterface 

28from redis.commands import ( 

29 CoreCommands, 

30 RedisModuleCommands, 

31 SentinelCommands, 

32 list_or_args, 

33) 

34from redis.commands.core import Script 

35from redis.connection import ( 

36 AbstractConnection, 

37 Connection, 

38 ConnectionPool, 

39 SSLConnection, 

40 UnixDomainSocketConnection, 

41) 

42from redis.credentials import CredentialProvider 

43from redis.event import ( 

44 AfterPooledConnectionsInstantiationEvent, 

45 AfterPubSubConnectionInstantiationEvent, 

46 AfterSingleConnectionInstantiationEvent, 

47 ClientType, 

48 EventDispatcher, 

49) 

50from redis.exceptions import ( 

51 ConnectionError, 

52 ExecAbortError, 

53 PubSubError, 

54 RedisError, 

55 ResponseError, 

56 WatchError, 

57) 

58from redis.lock import Lock 

59from redis.maintenance_events import ( 

60 MaintenanceEventPoolHandler, 

61 MaintenanceEventsConfig, 

62) 

63from redis.retry import Retry 

64from redis.utils import ( 

65 _set_info_logger, 

66 deprecated_args, 

67 get_lib_version, 

68 safe_str, 

69 str_if_bytes, 

70 truncate_text, 

71) 

72 

73if TYPE_CHECKING: 

74 import ssl 

75 

76 import OpenSSL 

77 

78SYM_EMPTY = b"" 

79EMPTY_RESPONSE = "EMPTY_RESPONSE" 

80 

81# some responses (ie. dump) are binary, and just meant to never be decoded 

82NEVER_DECODE = "NEVER_DECODE" 

83 

84 

85class CaseInsensitiveDict(dict): 

86 "Case insensitive dict implementation. Assumes string keys only." 

87 

88 def __init__(self, data: Dict[str, str]) -> None: 

89 for k, v in data.items(): 

90 self[k.upper()] = v 

91 

92 def __contains__(self, k): 

93 return super().__contains__(k.upper()) 

94 

95 def __delitem__(self, k): 

96 super().__delitem__(k.upper()) 

97 

98 def __getitem__(self, k): 

99 return super().__getitem__(k.upper()) 

100 

101 def get(self, k, default=None): 

102 return super().get(k.upper(), default) 

103 

104 def __setitem__(self, k, v): 

105 super().__setitem__(k.upper(), v) 

106 

107 def update(self, data): 

108 data = CaseInsensitiveDict(data) 

109 super().update(data) 

110 

111 

112class AbstractRedis: 

113 pass 

114 

115 

116class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): 

117 """ 

118 Implementation of the Redis protocol. 

119 

120 This abstract class provides a Python interface to all Redis commands 

121 and an implementation of the Redis protocol. 

122 

123 Pipelines derive from this, implementing how 

124 the commands are sent and received to the Redis server. Based on 

125 configuration, an instance will either use a ConnectionPool, or 

126 Connection object to talk to redis. 

127 

128 It is not safe to pass PubSub or Pipeline objects between threads. 

129 """ 

130 

131 @classmethod 

132 def from_url(cls, url: str, **kwargs) -> "Redis": 

133 """ 

134 Return a Redis client object configured from the given URL 

135 

136 For example:: 

137 

138 redis://[[username]:[password]]@localhost:6379/0 

139 rediss://[[username]:[password]]@localhost:6379/0 

140 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

141 

142 Three URL schemes are supported: 

143 

144 - `redis://` creates a TCP socket connection. See more at: 

145 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

146 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

147 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

148 - ``unix://``: creates a Unix Domain Socket connection. 

149 

150 The username, password, hostname, path and all querystring values 

151 are passed through urllib.parse.unquote in order to replace any 

152 percent-encoded values with their corresponding characters. 

153 

154 There are several ways to specify a database number. The first value 

155 found will be used: 

156 

157 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

158 2. If using the redis:// or rediss:// schemes, the path argument 

159 of the url, e.g. redis://localhost/0 

160 3. A ``db`` keyword argument to this function. 

161 

162 If none of these options are specified, the default db=0 is used. 

163 

164 All querystring options are cast to their appropriate Python types. 

165 Boolean arguments can be specified with string values "True"/"False" 

166 or "Yes"/"No". Values that cannot be properly cast cause a 

167 ``ValueError`` to be raised. Once parsed, the querystring arguments 

168 and keyword arguments are passed to the ``ConnectionPool``'s 

169 class initializer. In the case of conflicting arguments, querystring 

170 arguments always win. 

171 

172 """ 

173 single_connection_client = kwargs.pop("single_connection_client", False) 

174 connection_pool = ConnectionPool.from_url(url, **kwargs) 

175 client = cls( 

176 connection_pool=connection_pool, 

177 single_connection_client=single_connection_client, 

178 ) 

179 client.auto_close_connection_pool = True 

180 return client 

181 

182 @classmethod 

183 def from_pool( 

184 cls: Type["Redis"], 

185 connection_pool: ConnectionPool, 

186 ) -> "Redis": 

187 """ 

188 Return a Redis client from the given connection pool. 

189 The Redis client will take ownership of the connection pool and 

190 close it when the Redis client is closed. 

191 """ 

192 client = cls( 

193 connection_pool=connection_pool, 

194 ) 

195 client.auto_close_connection_pool = True 

196 return client 

197 

198 @deprecated_args( 

199 args_to_warn=["retry_on_timeout"], 

200 reason="TimeoutError is included by default.", 

201 version="6.0.0", 

202 ) 

203 def __init__( 

204 self, 

205 host: str = "localhost", 

206 port: int = 6379, 

207 db: int = 0, 

208 password: Optional[str] = None, 

209 socket_timeout: Optional[float] = None, 

210 socket_connect_timeout: Optional[float] = None, 

211 socket_keepalive: Optional[bool] = None, 

212 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, 

213 connection_pool: Optional[ConnectionPool] = None, 

214 unix_socket_path: Optional[str] = None, 

215 encoding: str = "utf-8", 

216 encoding_errors: str = "strict", 

217 decode_responses: bool = False, 

218 retry_on_timeout: bool = False, 

219 retry: Retry = Retry( 

220 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 

221 ), 

222 retry_on_error: Optional[List[Type[Exception]]] = None, 

223 ssl: bool = False, 

224 ssl_keyfile: Optional[str] = None, 

225 ssl_certfile: Optional[str] = None, 

226 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required", 

227 ssl_ca_certs: Optional[str] = None, 

228 ssl_ca_path: Optional[str] = None, 

229 ssl_ca_data: Optional[str] = None, 

230 ssl_check_hostname: bool = True, 

231 ssl_password: Optional[str] = None, 

232 ssl_validate_ocsp: bool = False, 

233 ssl_validate_ocsp_stapled: bool = False, 

234 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None, 

235 ssl_ocsp_expected_cert: Optional[str] = None, 

236 ssl_min_version: Optional["ssl.TLSVersion"] = None, 

237 ssl_ciphers: Optional[str] = None, 

238 max_connections: Optional[int] = None, 

239 single_connection_client: bool = False, 

240 health_check_interval: int = 0, 

241 client_name: Optional[str] = None, 

242 lib_name: Optional[str] = "redis-py", 

243 lib_version: Optional[str] = get_lib_version(), 

244 username: Optional[str] = None, 

245 redis_connect_func: Optional[Callable[[], None]] = None, 

246 credential_provider: Optional[CredentialProvider] = None, 

247 protocol: Optional[int] = 2, 

248 cache: Optional[CacheInterface] = None, 

249 cache_config: Optional[CacheConfig] = None, 

250 event_dispatcher: Optional[EventDispatcher] = None, 

251 maintenance_events_config: Optional[MaintenanceEventsConfig] = None, 

252 ) -> None: 

253 """ 

254 Initialize a new Redis client. 

255 

256 To specify a retry policy for specific errors, you have two options: 

257 

258 1. Set the `retry_on_error` to a list of the error/s to retry on, and 

259 you can also set `retry` to a valid `Retry` object(in case the default 

260 one is not appropriate) - with this approach the retries will be triggered 

261 on the default errors specified in the Retry object enriched with the 

262 errors specified in `retry_on_error`. 

263 

264 2. Define a `Retry` object with configured 'supported_errors' and set 

265 it to the `retry` parameter - with this approach you completely redefine 

266 the errors on which retries will happen. 

267 

268 `retry_on_timeout` is deprecated - please include the TimeoutError 

269 either in the Retry object or in the `retry_on_error` list. 

270 

271 When 'connection_pool' is provided - the retry configuration of the 

272 provided pool will be used. 

273 

274 Args: 

275 

276 single_connection_client: 

277 if `True`, connection pool is not used. In that case `Redis` 

278 instance use is not thread safe. 

279 """ 

280 if event_dispatcher is None: 

281 self._event_dispatcher = EventDispatcher() 

282 else: 

283 self._event_dispatcher = event_dispatcher 

284 if not connection_pool: 

285 if not retry_on_error: 

286 retry_on_error = [] 

287 kwargs = { 

288 "db": db, 

289 "username": username, 

290 "password": password, 

291 "socket_timeout": socket_timeout, 

292 "encoding": encoding, 

293 "encoding_errors": encoding_errors, 

294 "decode_responses": decode_responses, 

295 "retry_on_error": retry_on_error, 

296 "retry": copy.deepcopy(retry), 

297 "max_connections": max_connections, 

298 "health_check_interval": health_check_interval, 

299 "client_name": client_name, 

300 "lib_name": lib_name, 

301 "lib_version": lib_version, 

302 "redis_connect_func": redis_connect_func, 

303 "credential_provider": credential_provider, 

304 "protocol": protocol, 

305 } 

306 # based on input, setup appropriate connection args 

307 if unix_socket_path is not None: 

308 kwargs.update( 

309 { 

310 "path": unix_socket_path, 

311 "connection_class": UnixDomainSocketConnection, 

312 } 

313 ) 

314 else: 

315 # TCP specific options 

316 kwargs.update( 

317 { 

318 "host": host, 

319 "port": port, 

320 "socket_connect_timeout": socket_connect_timeout, 

321 "socket_keepalive": socket_keepalive, 

322 "socket_keepalive_options": socket_keepalive_options, 

323 } 

324 ) 

325 

326 if ssl: 

327 kwargs.update( 

328 { 

329 "connection_class": SSLConnection, 

330 "ssl_keyfile": ssl_keyfile, 

331 "ssl_certfile": ssl_certfile, 

332 "ssl_cert_reqs": ssl_cert_reqs, 

333 "ssl_ca_certs": ssl_ca_certs, 

334 "ssl_ca_data": ssl_ca_data, 

335 "ssl_check_hostname": ssl_check_hostname, 

336 "ssl_password": ssl_password, 

337 "ssl_ca_path": ssl_ca_path, 

338 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled, 

339 "ssl_validate_ocsp": ssl_validate_ocsp, 

340 "ssl_ocsp_context": ssl_ocsp_context, 

341 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert, 

342 "ssl_min_version": ssl_min_version, 

343 "ssl_ciphers": ssl_ciphers, 

344 } 

345 ) 

346 if (cache_config or cache) and protocol in [3, "3"]: 

347 kwargs.update( 

348 { 

349 "cache": cache, 

350 "cache_config": cache_config, 

351 } 

352 ) 

353 connection_pool = ConnectionPool(**kwargs) 

354 self._event_dispatcher.dispatch( 

355 AfterPooledConnectionsInstantiationEvent( 

356 [connection_pool], ClientType.SYNC, credential_provider 

357 ) 

358 ) 

359 self.auto_close_connection_pool = True 

360 else: 

361 self.auto_close_connection_pool = False 

362 self._event_dispatcher.dispatch( 

363 AfterPooledConnectionsInstantiationEvent( 

364 [connection_pool], ClientType.SYNC, credential_provider 

365 ) 

366 ) 

367 

368 self.connection_pool = connection_pool 

369 

370 if (cache_config or cache) and self.connection_pool.get_protocol() not in [ 

371 3, 

372 "3", 

373 ]: 

374 raise RedisError("Client caching is only supported with RESP version 3") 

375 

376 if maintenance_events_config and self.connection_pool.get_protocol() not in [ 

377 3, 

378 "3", 

379 ]: 

380 raise RedisError( 

381 "Push handlers on connection are only supported with RESP version 3" 

382 ) 

383 if maintenance_events_config and maintenance_events_config.enabled: 

384 self.maintenance_events_pool_handler = MaintenanceEventPoolHandler( 

385 self.connection_pool, maintenance_events_config 

386 ) 

387 self.connection_pool.set_maintenance_events_pool_handler( 

388 self.maintenance_events_pool_handler 

389 ) 

390 else: 

391 self.maintenance_events_pool_handler = None 

392 

393 self.single_connection_lock = threading.RLock() 

394 self.connection = None 

395 self._single_connection_client = single_connection_client 

396 if self._single_connection_client: 

397 self.connection = self.connection_pool.get_connection() 

398 self._event_dispatcher.dispatch( 

399 AfterSingleConnectionInstantiationEvent( 

400 self.connection, ClientType.SYNC, self.single_connection_lock 

401 ) 

402 ) 

403 

404 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks) 

405 

406 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]: 

407 self.response_callbacks.update(_RedisCallbacksRESP3) 

408 else: 

409 self.response_callbacks.update(_RedisCallbacksRESP2) 

410 

411 def __repr__(self) -> str: 

412 return ( 

413 f"<{type(self).__module__}.{type(self).__name__}" 

414 f"({repr(self.connection_pool)})>" 

415 ) 

416 

417 def get_encoder(self) -> "Encoder": 

418 """Get the connection pool's encoder""" 

419 return self.connection_pool.get_encoder() 

420 

421 def get_connection_kwargs(self) -> Dict: 

422 """Get the connection's key-word arguments""" 

423 return self.connection_pool.connection_kwargs 

424 

425 def get_retry(self) -> Optional[Retry]: 

426 return self.get_connection_kwargs().get("retry") 

427 

428 def set_retry(self, retry: Retry) -> None: 

429 self.get_connection_kwargs().update({"retry": retry}) 

430 self.connection_pool.set_retry(retry) 

431 

432 def set_response_callback(self, command: str, callback: Callable) -> None: 

433 """Set a custom Response Callback""" 

434 self.response_callbacks[command] = callback 

435 

436 def load_external_module(self, funcname, func) -> None: 

437 """ 

438 This function can be used to add externally defined redis modules, 

439 and their namespaces to the redis client. 

440 

441 funcname - A string containing the name of the function to create 

442 func - The function, being added to this class. 

443 

444 ex: Assume that one has a custom redis module named foomod that 

445 creates command named 'foo.dothing' and 'foo.anotherthing' in redis. 

446 To load function functions into this namespace: 

447 

448 from redis import Redis 

449 from foomodule import F 

450 r = Redis() 

451 r.load_external_module("foo", F) 

452 r.foo().dothing('your', 'arguments') 

453 

454 For a concrete example see the reimport of the redisjson module in 

455 tests/test_connection.py::test_loading_external_modules 

456 """ 

457 setattr(self, funcname, func) 

458 

459 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline": 

460 """ 

461 Return a new pipeline object that can queue multiple commands for 

462 later execution. ``transaction`` indicates whether all commands 

463 should be executed atomically. Apart from making a group of operations 

464 atomic, pipelines are useful for reducing the back-and-forth overhead 

465 between the client and server. 

466 """ 

467 return Pipeline( 

468 self.connection_pool, self.response_callbacks, transaction, shard_hint 

469 ) 

470 

471 def transaction( 

472 self, func: Callable[["Pipeline"], None], *watches, **kwargs 

473 ) -> Union[List[Any], Any, None]: 

474 """ 

475 Convenience method for executing the callable `func` as a transaction 

476 while watching all keys specified in `watches`. The 'func' callable 

477 should expect a single argument which is a Pipeline object. 

478 """ 

479 shard_hint = kwargs.pop("shard_hint", None) 

480 value_from_callable = kwargs.pop("value_from_callable", False) 

481 watch_delay = kwargs.pop("watch_delay", None) 

482 with self.pipeline(True, shard_hint) as pipe: 

483 while True: 

484 try: 

485 if watches: 

486 pipe.watch(*watches) 

487 func_value = func(pipe) 

488 exec_value = pipe.execute() 

489 return func_value if value_from_callable else exec_value 

490 except WatchError: 

491 if watch_delay is not None and watch_delay > 0: 

492 time.sleep(watch_delay) 

493 continue 

494 

495 def lock( 

496 self, 

497 name: str, 

498 timeout: Optional[float] = None, 

499 sleep: float = 0.1, 

500 blocking: bool = True, 

501 blocking_timeout: Optional[float] = None, 

502 lock_class: Union[None, Any] = None, 

503 thread_local: bool = True, 

504 raise_on_release_error: bool = True, 

505 ): 

506 """ 

507 Return a new Lock object using key ``name`` that mimics 

508 the behavior of threading.Lock. 

509 

510 If specified, ``timeout`` indicates a maximum life for the lock. 

511 By default, it will remain locked until release() is called. 

512 

513 ``sleep`` indicates the amount of time to sleep per loop iteration 

514 when the lock is in blocking mode and another client is currently 

515 holding the lock. 

516 

517 ``blocking`` indicates whether calling ``acquire`` should block until 

518 the lock has been acquired or to fail immediately, causing ``acquire`` 

519 to return False and the lock not being acquired. Defaults to True. 

520 Note this value can be overridden by passing a ``blocking`` 

521 argument to ``acquire``. 

522 

523 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

524 spend trying to acquire the lock. A value of ``None`` indicates 

525 continue trying forever. ``blocking_timeout`` can be specified as a 

526 float or integer, both representing the number of seconds to wait. 

527 

528 ``lock_class`` forces the specified lock implementation. Note that as 

529 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

530 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

531 you have created your own custom lock class. 

532 

533 ``thread_local`` indicates whether the lock token is placed in 

534 thread-local storage. By default, the token is placed in thread local 

535 storage so that a thread only sees its token, not a token set by 

536 another thread. Consider the following timeline: 

537 

538 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

539 thread-1 sets the token to "abc" 

540 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

541 Lock instance. 

542 time: 5, thread-1 has not yet completed. redis expires the lock 

543 key. 

544 time: 5, thread-2 acquired `my-lock` now that it's available. 

545 thread-2 sets the token to "xyz" 

546 time: 6, thread-1 finishes its work and calls release(). if the 

547 token is *not* stored in thread local storage, then 

548 thread-1 would see the token value as "xyz" and would be 

549 able to successfully release the thread-2's lock. 

550 

551 ``raise_on_release_error`` indicates whether to raise an exception when 

552 the lock is no longer owned when exiting the context manager. By default, 

553 this is True, meaning an exception will be raised. If False, the warning 

554 will be logged and the exception will be suppressed. 

555 

556 In some use cases it's necessary to disable thread local storage. For 

557 example, if you have code where one thread acquires a lock and passes 

558 that lock instance to a worker thread to release later. If thread 

559 local storage isn't disabled in this case, the worker thread won't see 

560 the token set by the thread that acquired the lock. Our assumption 

561 is that these cases aren't common and as such default to using 

562 thread local storage.""" 

563 if lock_class is None: 

564 lock_class = Lock 

565 return lock_class( 

566 self, 

567 name, 

568 timeout=timeout, 

569 sleep=sleep, 

570 blocking=blocking, 

571 blocking_timeout=blocking_timeout, 

572 thread_local=thread_local, 

573 raise_on_release_error=raise_on_release_error, 

574 ) 

575 

576 def pubsub(self, **kwargs): 

577 """ 

578 Return a Publish/Subscribe object. With this object, you can 

579 subscribe to channels and listen for messages that get published to 

580 them. 

581 """ 

582 return PubSub( 

583 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs 

584 ) 

585 

586 def monitor(self): 

587 return Monitor(self.connection_pool) 

588 

589 def client(self): 

590 maintenance_events_config = ( 

591 None 

592 if self.maintenance_events_pool_handler is None 

593 else self.maintenance_events_pool_handler.config 

594 ) 

595 return self.__class__( 

596 connection_pool=self.connection_pool, 

597 single_connection_client=True, 

598 maintenance_events_config=maintenance_events_config, 

599 ) 

600 

601 def __enter__(self): 

602 return self 

603 

604 def __exit__(self, exc_type, exc_value, traceback): 

605 self.close() 

606 

607 def __del__(self): 

608 try: 

609 self.close() 

610 except Exception: 

611 pass 

612 

613 def close(self) -> None: 

614 # In case a connection property does not yet exist 

615 # (due to a crash earlier in the Redis() constructor), return 

616 # immediately as there is nothing to clean-up. 

617 if not hasattr(self, "connection"): 

618 return 

619 

620 conn = self.connection 

621 if conn: 

622 self.connection = None 

623 self.connection_pool.release(conn) 

624 

625 if self.auto_close_connection_pool: 

626 self.connection_pool.disconnect() 

627 

628 def _send_command_parse_response(self, conn, command_name, *args, **options): 

629 """ 

630 Send a command and parse the response 

631 """ 

632 conn.send_command(*args, **options) 

633 return self.parse_response(conn, command_name, **options) 

634 

635 def _close_connection(self, conn) -> None: 

636 """ 

637 Close the connection before retrying. 

638 

639 The supported exceptions are already checked in the 

640 retry object so we don't need to do it here. 

641 

642 After we disconnect the connection, it will try to reconnect and 

643 do a health check as part of the send_command logic(on connection level). 

644 """ 

645 

646 conn.disconnect() 

647 

648 # COMMAND EXECUTION AND PROTOCOL PARSING 

649 def execute_command(self, *args, **options): 

650 return self._execute_command(*args, **options) 

651 

652 def _execute_command(self, *args, **options): 

653 """Execute a command and return a parsed response""" 

654 pool = self.connection_pool 

655 command_name = args[0] 

656 conn = self.connection or pool.get_connection() 

657 

658 if self._single_connection_client: 

659 self.single_connection_lock.acquire() 

660 try: 

661 return conn.retry.call_with_retry( 

662 lambda: self._send_command_parse_response( 

663 conn, command_name, *args, **options 

664 ), 

665 lambda _: self._close_connection(conn), 

666 ) 

667 

668 finally: 

669 if conn and conn.should_reconnect(): 

670 self._close_connection(conn) 

671 conn.connect() 

672 if self._single_connection_client: 

673 self.single_connection_lock.release() 

674 if not self.connection: 

675 pool.release(conn) 

676 

677 def parse_response(self, connection, command_name, **options): 

678 """Parses a response from the Redis server""" 

679 try: 

680 if NEVER_DECODE in options: 

681 response = connection.read_response(disable_decoding=True) 

682 options.pop(NEVER_DECODE) 

683 else: 

684 response = connection.read_response() 

685 except ResponseError: 

686 if EMPTY_RESPONSE in options: 

687 return options[EMPTY_RESPONSE] 

688 raise 

689 

690 if EMPTY_RESPONSE in options: 

691 options.pop(EMPTY_RESPONSE) 

692 

693 # Remove keys entry, it needs only for cache. 

694 options.pop("keys", None) 

695 

696 if command_name in self.response_callbacks: 

697 return self.response_callbacks[command_name](response, **options) 

698 return response 

699 

700 def get_cache(self) -> Optional[CacheInterface]: 

701 return self.connection_pool.cache 

702 

703 

704StrictRedis = Redis 

705 

706 

707class Monitor: 

708 """ 

709 Monitor is useful for handling the MONITOR command to the redis server. 

710 next_command() method returns one command from monitor 

711 listen() method yields commands from monitor. 

712 """ 

713 

714 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)") 

715 command_re = re.compile(r'"(.*?)(?<!\\)"') 

716 

717 def __init__(self, connection_pool): 

718 self.connection_pool = connection_pool 

719 self.connection = self.connection_pool.get_connection() 

720 

721 def __enter__(self): 

722 self._start_monitor() 

723 return self 

724 

725 def __exit__(self, *args): 

726 self.connection.disconnect() 

727 self.connection_pool.release(self.connection) 

728 

729 def next_command(self): 

730 """Parse the response from a monitor command""" 

731 response = self.connection.read_response() 

732 

733 if response is None: 

734 return None 

735 

736 if isinstance(response, bytes): 

737 response = self.connection.encoder.decode(response, force=True) 

738 

739 command_time, command_data = response.split(" ", 1) 

740 m = self.monitor_re.match(command_data) 

741 db_id, client_info, command = m.groups() 

742 command = " ".join(self.command_re.findall(command)) 

743 # Redis escapes double quotes because each piece of the command 

744 # string is surrounded by double quotes. We don't have that 

745 # requirement so remove the escaping and leave the quote. 

746 command = command.replace('\\"', '"') 

747 

748 if client_info == "lua": 

749 client_address = "lua" 

750 client_port = "" 

751 client_type = "lua" 

752 elif client_info.startswith("unix"): 

753 client_address = "unix" 

754 client_port = client_info[5:] 

755 client_type = "unix" 

756 else: 

757 # use rsplit as ipv6 addresses contain colons 

758 client_address, client_port = client_info.rsplit(":", 1) 

759 client_type = "tcp" 

760 return { 

761 "time": float(command_time), 

762 "db": int(db_id), 

763 "client_address": client_address, 

764 "client_port": client_port, 

765 "client_type": client_type, 

766 "command": command, 

767 } 

768 

769 def listen(self): 

770 """Listen for commands coming to the server.""" 

771 while True: 

772 yield self.next_command() 

773 

774 def _start_monitor(self): 

775 self.connection.send_command("MONITOR") 

776 # check that monitor returns 'OK', but don't return it to user 

777 response = self.connection.read_response() 

778 

779 if not bool_ok(response): 

780 raise RedisError(f"MONITOR failed: {response}") 

781 

782 

783class PubSub: 

784 """ 

785 PubSub provides publish, subscribe and listen support to Redis channels. 

786 

787 After subscribing to one or more channels, the listen() method will block 

788 until a message arrives on one of the subscribed channels. That message 

789 will be returned and it's safe to start listening again. 

790 """ 

791 

792 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage") 

793 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe") 

794 HEALTH_CHECK_MESSAGE = "redis-py-health-check" 

795 

796 def __init__( 

797 self, 

798 connection_pool, 

799 shard_hint=None, 

800 ignore_subscribe_messages: bool = False, 

801 encoder: Optional["Encoder"] = None, 

802 push_handler_func: Union[None, Callable[[str], None]] = None, 

803 event_dispatcher: Optional["EventDispatcher"] = None, 

804 ): 

805 self.connection_pool = connection_pool 

806 self.shard_hint = shard_hint 

807 self.ignore_subscribe_messages = ignore_subscribe_messages 

808 self.connection = None 

809 self.subscribed_event = threading.Event() 

810 # we need to know the encoding options for this connection in order 

811 # to lookup channel and pattern names for callback handlers. 

812 self.encoder = encoder 

813 self.push_handler_func = push_handler_func 

814 if event_dispatcher is None: 

815 self._event_dispatcher = EventDispatcher() 

816 else: 

817 self._event_dispatcher = event_dispatcher 

818 

819 self._lock = threading.RLock() 

820 if self.encoder is None: 

821 self.encoder = self.connection_pool.get_encoder() 

822 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) 

823 if self.encoder.decode_responses: 

824 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] 

825 else: 

826 self.health_check_response = [b"pong", self.health_check_response_b] 

827 if self.push_handler_func is None: 

828 _set_info_logger() 

829 self.reset() 

830 

831 def __enter__(self) -> "PubSub": 

832 return self 

833 

834 def __exit__(self, exc_type, exc_value, traceback) -> None: 

835 self.reset() 

836 

837 def __del__(self) -> None: 

838 try: 

839 # if this object went out of scope prior to shutting down 

840 # subscriptions, close the connection manually before 

841 # returning it to the connection pool 

842 self.reset() 

843 except Exception: 

844 pass 

845 

846 def reset(self) -> None: 

847 if self.connection: 

848 self.connection.disconnect() 

849 self.connection.deregister_connect_callback(self.on_connect) 

850 self.connection_pool.release(self.connection) 

851 self.connection = None 

852 self.health_check_response_counter = 0 

853 self.channels = {} 

854 self.pending_unsubscribe_channels = set() 

855 self.shard_channels = {} 

856 self.pending_unsubscribe_shard_channels = set() 

857 self.patterns = {} 

858 self.pending_unsubscribe_patterns = set() 

859 self.subscribed_event.clear() 

860 

861 def close(self) -> None: 

862 self.reset() 

863 

864 def on_connect(self, connection) -> None: 

865 "Re-subscribe to any channels and patterns previously subscribed to" 

866 # NOTE: for python3, we can't pass bytestrings as keyword arguments 

867 # so we need to decode channel/pattern names back to unicode strings 

868 # before passing them to [p]subscribe. 

869 self.pending_unsubscribe_channels.clear() 

870 self.pending_unsubscribe_patterns.clear() 

871 self.pending_unsubscribe_shard_channels.clear() 

872 if self.channels: 

873 channels = { 

874 self.encoder.decode(k, force=True): v for k, v in self.channels.items() 

875 } 

876 self.subscribe(**channels) 

877 if self.patterns: 

878 patterns = { 

879 self.encoder.decode(k, force=True): v for k, v in self.patterns.items() 

880 } 

881 self.psubscribe(**patterns) 

882 if self.shard_channels: 

883 shard_channels = { 

884 self.encoder.decode(k, force=True): v 

885 for k, v in self.shard_channels.items() 

886 } 

887 self.ssubscribe(**shard_channels) 

888 

889 @property 

890 def subscribed(self) -> bool: 

891 """Indicates if there are subscriptions to any channels or patterns""" 

892 return self.subscribed_event.is_set() 

893 

894 def execute_command(self, *args): 

895 """Execute a publish/subscribe command""" 

896 

897 # NOTE: don't parse the response in this function -- it could pull a 

898 # legitimate message off the stack if the connection is already 

899 # subscribed to one or more channels 

900 

901 if self.connection is None: 

902 self.connection = self.connection_pool.get_connection() 

903 # register a callback that re-subscribes to any channels we 

904 # were listening to when we were disconnected 

905 self.connection.register_connect_callback(self.on_connect) 

906 if self.push_handler_func is not None: 

907 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

908 self._event_dispatcher.dispatch( 

909 AfterPubSubConnectionInstantiationEvent( 

910 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

911 ) 

912 ) 

913 connection = self.connection 

914 kwargs = {"check_health": not self.subscribed} 

915 if not self.subscribed: 

916 self.clean_health_check_responses() 

917 with self._lock: 

918 self._execute(connection, connection.send_command, *args, **kwargs) 

919 

920 def clean_health_check_responses(self) -> None: 

921 """ 

922 If any health check responses are present, clean them 

923 """ 

924 ttl = 10 

925 conn = self.connection 

926 while conn and self.health_check_response_counter > 0 and ttl > 0: 

927 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout): 

928 response = self._execute(conn, conn.read_response) 

929 if self.is_health_check_response(response): 

930 self.health_check_response_counter -= 1 

931 else: 

932 raise PubSubError( 

933 "A non health check response was cleaned by " 

934 "execute_command: {}".format(response) 

935 ) 

936 ttl -= 1 

937 

938 def _reconnect(self, conn) -> None: 

939 """ 

940 The supported exceptions are already checked in the 

941 retry object so we don't need to do it here. 

942 

943 In this error handler we are trying to reconnect to the server. 

944 """ 

945 conn.disconnect() 

946 conn.connect() 

947 

948 def _execute(self, conn, command, *args, **kwargs): 

949 """ 

950 Connect manually upon disconnection. If the Redis server is down, 

951 this will fail and raise a ConnectionError as desired. 

952 After reconnection, the ``on_connect`` callback should have been 

953 called by the # connection to resubscribe us to any channels and 

954 patterns we were previously listening to 

955 """ 

956 

957 if conn.should_reconnect(): 

958 self._reconnect(conn) 

959 

960 response = conn.retry.call_with_retry( 

961 lambda: command(*args, **kwargs), 

962 lambda _: self._reconnect(conn), 

963 ) 

964 

965 return response 

966 

967 def parse_response(self, block=True, timeout=0): 

968 """Parse the response from a publish/subscribe command""" 

969 conn = self.connection 

970 if conn is None: 

971 raise RuntimeError( 

972 "pubsub connection not set: " 

973 "did you forget to call subscribe() or psubscribe()?" 

974 ) 

975 

976 self.check_health() 

977 

978 def try_read(): 

979 if not block: 

980 if not conn.can_read(timeout=timeout): 

981 return None 

982 else: 

983 conn.connect() 

984 return conn.read_response(disconnect_on_error=False, push_request=True) 

985 

986 response = self._execute(conn, try_read) 

987 

988 if self.is_health_check_response(response): 

989 # ignore the health check message as user might not expect it 

990 self.health_check_response_counter -= 1 

991 return None 

992 return response 

993 

994 def is_health_check_response(self, response) -> bool: 

995 """ 

996 Check if the response is a health check response. 

997 If there are no subscriptions redis responds to PING command with a 

998 bulk response, instead of a multi-bulk with "pong" and the response. 

999 """ 

1000 return response in [ 

1001 self.health_check_response, # If there was a subscription 

1002 self.health_check_response_b, # If there wasn't 

1003 ] 

1004 

1005 def check_health(self) -> None: 

1006 conn = self.connection 

1007 if conn is None: 

1008 raise RuntimeError( 

1009 "pubsub connection not set: " 

1010 "did you forget to call subscribe() or psubscribe()?" 

1011 ) 

1012 

1013 if conn.health_check_interval and time.monotonic() > conn.next_health_check: 

1014 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False) 

1015 self.health_check_response_counter += 1 

1016 

1017 def _normalize_keys(self, data) -> Dict: 

1018 """ 

1019 normalize channel/pattern names to be either bytes or strings 

1020 based on whether responses are automatically decoded. this saves us 

1021 from coercing the value for each message coming in. 

1022 """ 

1023 encode = self.encoder.encode 

1024 decode = self.encoder.decode 

1025 return {decode(encode(k)): v for k, v in data.items()} 

1026 

1027 def psubscribe(self, *args, **kwargs): 

1028 """ 

1029 Subscribe to channel patterns. Patterns supplied as keyword arguments 

1030 expect a pattern name as the key and a callable as the value. A 

1031 pattern's callable will be invoked automatically when a message is 

1032 received on that pattern rather than producing a message via 

1033 ``listen()``. 

1034 """ 

1035 if args: 

1036 args = list_or_args(args[0], args[1:]) 

1037 new_patterns = dict.fromkeys(args) 

1038 new_patterns.update(kwargs) 

1039 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys()) 

1040 # update the patterns dict AFTER we send the command. we don't want to 

1041 # subscribe twice to these patterns, once for the command and again 

1042 # for the reconnection. 

1043 new_patterns = self._normalize_keys(new_patterns) 

1044 self.patterns.update(new_patterns) 

1045 if not self.subscribed: 

1046 # Set the subscribed_event flag to True 

1047 self.subscribed_event.set() 

1048 # Clear the health check counter 

1049 self.health_check_response_counter = 0 

1050 self.pending_unsubscribe_patterns.difference_update(new_patterns) 

1051 return ret_val 

1052 

1053 def punsubscribe(self, *args): 

1054 """ 

1055 Unsubscribe from the supplied patterns. If empty, unsubscribe from 

1056 all patterns. 

1057 """ 

1058 if args: 

1059 args = list_or_args(args[0], args[1:]) 

1060 patterns = self._normalize_keys(dict.fromkeys(args)) 

1061 else: 

1062 patterns = self.patterns 

1063 self.pending_unsubscribe_patterns.update(patterns) 

1064 return self.execute_command("PUNSUBSCRIBE", *args) 

1065 

1066 def subscribe(self, *args, **kwargs): 

1067 """ 

1068 Subscribe to channels. Channels supplied as keyword arguments expect 

1069 a channel name as the key and a callable as the value. A channel's 

1070 callable will be invoked automatically when a message is received on 

1071 that channel rather than producing a message via ``listen()`` or 

1072 ``get_message()``. 

1073 """ 

1074 if args: 

1075 args = list_or_args(args[0], args[1:]) 

1076 new_channels = dict.fromkeys(args) 

1077 new_channels.update(kwargs) 

1078 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys()) 

1079 # update the channels dict AFTER we send the command. we don't want to 

1080 # subscribe twice to these channels, once for the command and again 

1081 # for the reconnection. 

1082 new_channels = self._normalize_keys(new_channels) 

1083 self.channels.update(new_channels) 

1084 if not self.subscribed: 

1085 # Set the subscribed_event flag to True 

1086 self.subscribed_event.set() 

1087 # Clear the health check counter 

1088 self.health_check_response_counter = 0 

1089 self.pending_unsubscribe_channels.difference_update(new_channels) 

1090 return ret_val 

1091 

1092 def unsubscribe(self, *args): 

1093 """ 

1094 Unsubscribe from the supplied channels. If empty, unsubscribe from 

1095 all channels 

1096 """ 

1097 if args: 

1098 args = list_or_args(args[0], args[1:]) 

1099 channels = self._normalize_keys(dict.fromkeys(args)) 

1100 else: 

1101 channels = self.channels 

1102 self.pending_unsubscribe_channels.update(channels) 

1103 return self.execute_command("UNSUBSCRIBE", *args) 

1104 

1105 def ssubscribe(self, *args, target_node=None, **kwargs): 

1106 """ 

1107 Subscribes the client to the specified shard channels. 

1108 Channels supplied as keyword arguments expect a channel name as the key 

1109 and a callable as the value. A channel's callable will be invoked automatically 

1110 when a message is received on that channel rather than producing a message via 

1111 ``listen()`` or ``get_sharded_message()``. 

1112 """ 

1113 if args: 

1114 args = list_or_args(args[0], args[1:]) 

1115 new_s_channels = dict.fromkeys(args) 

1116 new_s_channels.update(kwargs) 

1117 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys()) 

1118 # update the s_channels dict AFTER we send the command. we don't want to 

1119 # subscribe twice to these channels, once for the command and again 

1120 # for the reconnection. 

1121 new_s_channels = self._normalize_keys(new_s_channels) 

1122 self.shard_channels.update(new_s_channels) 

1123 if not self.subscribed: 

1124 # Set the subscribed_event flag to True 

1125 self.subscribed_event.set() 

1126 # Clear the health check counter 

1127 self.health_check_response_counter = 0 

1128 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels) 

1129 return ret_val 

1130 

1131 def sunsubscribe(self, *args, target_node=None): 

1132 """ 

1133 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from 

1134 all shard_channels 

1135 """ 

1136 if args: 

1137 args = list_or_args(args[0], args[1:]) 

1138 s_channels = self._normalize_keys(dict.fromkeys(args)) 

1139 else: 

1140 s_channels = self.shard_channels 

1141 self.pending_unsubscribe_shard_channels.update(s_channels) 

1142 return self.execute_command("SUNSUBSCRIBE", *args) 

1143 

1144 def listen(self): 

1145 "Listen for messages on channels this client has been subscribed to" 

1146 while self.subscribed: 

1147 response = self.handle_message(self.parse_response(block=True)) 

1148 if response is not None: 

1149 yield response 

1150 

1151 def get_message( 

1152 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0 

1153 ): 

1154 """ 

1155 Get the next message if one is available, otherwise None. 

1156 

1157 If timeout is specified, the system will wait for `timeout` seconds 

1158 before returning. Timeout should be specified as a floating point 

1159 number, or None, to wait indefinitely. 

1160 """ 

1161 if not self.subscribed: 

1162 # Wait for subscription 

1163 start_time = time.monotonic() 

1164 if self.subscribed_event.wait(timeout) is True: 

1165 # The connection was subscribed during the timeout time frame. 

1166 # The timeout should be adjusted based on the time spent 

1167 # waiting for the subscription 

1168 time_spent = time.monotonic() - start_time 

1169 timeout = max(0.0, timeout - time_spent) 

1170 else: 

1171 # The connection isn't subscribed to any channels or patterns, 

1172 # so no messages are available 

1173 return None 

1174 

1175 response = self.parse_response(block=(timeout is None), timeout=timeout) 

1176 

1177 if response: 

1178 return self.handle_message(response, ignore_subscribe_messages) 

1179 return None 

1180 

1181 get_sharded_message = get_message 

1182 

1183 def ping(self, message: Union[str, None] = None) -> bool: 

1184 """ 

1185 Ping the Redis server 

1186 """ 

1187 args = ["PING", message] if message is not None else ["PING"] 

1188 return self.execute_command(*args) 

1189 

1190 def handle_message(self, response, ignore_subscribe_messages=False): 

1191 """ 

1192 Parses a pub/sub message. If the channel or pattern was subscribed to 

1193 with a message handler, the handler is invoked instead of a parsed 

1194 message being returned. 

1195 """ 

1196 if response is None: 

1197 return None 

1198 if isinstance(response, bytes): 

1199 response = [b"pong", response] if response != b"PONG" else [b"pong", b""] 

1200 

1201 message_type = str_if_bytes(response[0]) 

1202 if message_type == "pmessage": 

1203 message = { 

1204 "type": message_type, 

1205 "pattern": response[1], 

1206 "channel": response[2], 

1207 "data": response[3], 

1208 } 

1209 elif message_type == "pong": 

1210 message = { 

1211 "type": message_type, 

1212 "pattern": None, 

1213 "channel": None, 

1214 "data": response[1], 

1215 } 

1216 else: 

1217 message = { 

1218 "type": message_type, 

1219 "pattern": None, 

1220 "channel": response[1], 

1221 "data": response[2], 

1222 } 

1223 

1224 # if this is an unsubscribe message, remove it from memory 

1225 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 

1226 if message_type == "punsubscribe": 

1227 pattern = response[1] 

1228 if pattern in self.pending_unsubscribe_patterns: 

1229 self.pending_unsubscribe_patterns.remove(pattern) 

1230 self.patterns.pop(pattern, None) 

1231 elif message_type == "sunsubscribe": 

1232 s_channel = response[1] 

1233 if s_channel in self.pending_unsubscribe_shard_channels: 

1234 self.pending_unsubscribe_shard_channels.remove(s_channel) 

1235 self.shard_channels.pop(s_channel, None) 

1236 else: 

1237 channel = response[1] 

1238 if channel in self.pending_unsubscribe_channels: 

1239 self.pending_unsubscribe_channels.remove(channel) 

1240 self.channels.pop(channel, None) 

1241 if not self.channels and not self.patterns and not self.shard_channels: 

1242 # There are no subscriptions anymore, set subscribed_event flag 

1243 # to false 

1244 self.subscribed_event.clear() 

1245 

1246 if message_type in self.PUBLISH_MESSAGE_TYPES: 

1247 # if there's a message handler, invoke it 

1248 if message_type == "pmessage": 

1249 handler = self.patterns.get(message["pattern"], None) 

1250 elif message_type == "smessage": 

1251 handler = self.shard_channels.get(message["channel"], None) 

1252 else: 

1253 handler = self.channels.get(message["channel"], None) 

1254 if handler: 

1255 handler(message) 

1256 return None 

1257 elif message_type != "pong": 

1258 # this is a subscribe/unsubscribe message. ignore if we don't 

1259 # want them 

1260 if ignore_subscribe_messages or self.ignore_subscribe_messages: 

1261 return None 

1262 

1263 return message 

1264 

1265 def run_in_thread( 

1266 self, 

1267 sleep_time: float = 0.0, 

1268 daemon: bool = False, 

1269 exception_handler: Optional[Callable] = None, 

1270 ) -> "PubSubWorkerThread": 

1271 for channel, handler in self.channels.items(): 

1272 if handler is None: 

1273 raise PubSubError(f"Channel: '{channel}' has no handler registered") 

1274 for pattern, handler in self.patterns.items(): 

1275 if handler is None: 

1276 raise PubSubError(f"Pattern: '{pattern}' has no handler registered") 

1277 for s_channel, handler in self.shard_channels.items(): 

1278 if handler is None: 

1279 raise PubSubError( 

1280 f"Shard Channel: '{s_channel}' has no handler registered" 

1281 ) 

1282 

1283 thread = PubSubWorkerThread( 

1284 self, sleep_time, daemon=daemon, exception_handler=exception_handler 

1285 ) 

1286 thread.start() 

1287 return thread 

1288 

1289 

1290class PubSubWorkerThread(threading.Thread): 

1291 def __init__( 

1292 self, 

1293 pubsub, 

1294 sleep_time: float, 

1295 daemon: bool = False, 

1296 exception_handler: Union[ 

1297 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None 

1298 ] = None, 

1299 ): 

1300 super().__init__() 

1301 self.daemon = daemon 

1302 self.pubsub = pubsub 

1303 self.sleep_time = sleep_time 

1304 self.exception_handler = exception_handler 

1305 self._running = threading.Event() 

1306 

1307 def run(self) -> None: 

1308 if self._running.is_set(): 

1309 return 

1310 self._running.set() 

1311 pubsub = self.pubsub 

1312 sleep_time = self.sleep_time 

1313 while self._running.is_set(): 

1314 try: 

1315 pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time) 

1316 except BaseException as e: 

1317 if self.exception_handler is None: 

1318 raise 

1319 self.exception_handler(e, pubsub, self) 

1320 pubsub.close() 

1321 

1322 def stop(self) -> None: 

1323 # trip the flag so the run loop exits. the run loop will 

1324 # close the pubsub connection, which disconnects the socket 

1325 # and returns the connection to the pool. 

1326 self._running.clear() 

1327 

1328 

1329class Pipeline(Redis): 

1330 """ 

1331 Pipelines provide a way to transmit multiple commands to the Redis server 

1332 in one transmission. This is convenient for batch processing, such as 

1333 saving all the values in a list to Redis. 

1334 

1335 All commands executed within a pipeline(when running in transactional mode, 

1336 which is the default behavior) are wrapped with MULTI and EXEC 

1337 calls. This guarantees all commands executed in the pipeline will be 

1338 executed atomically. 

1339 

1340 Any command raising an exception does *not* halt the execution of 

1341 subsequent commands in the pipeline. Instead, the exception is caught 

1342 and its instance is placed into the response list returned by execute(). 

1343 Code iterating over the response list should be able to deal with an 

1344 instance of an exception as a potential value. In general, these will be 

1345 ResponseError exceptions, such as those raised when issuing a command 

1346 on a key of a different datatype. 

1347 """ 

1348 

1349 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

1350 

1351 def __init__( 

1352 self, 

1353 connection_pool: ConnectionPool, 

1354 response_callbacks, 

1355 transaction, 

1356 shard_hint, 

1357 ): 

1358 self.connection_pool = connection_pool 

1359 self.connection: Optional[Connection] = None 

1360 self.response_callbacks = response_callbacks 

1361 self.transaction = transaction 

1362 self.shard_hint = shard_hint 

1363 self.watching = False 

1364 self.command_stack = [] 

1365 self.scripts: Set[Script] = set() 

1366 self.explicit_transaction = False 

1367 

1368 def __enter__(self) -> "Pipeline": 

1369 return self 

1370 

1371 def __exit__(self, exc_type, exc_value, traceback): 

1372 self.reset() 

1373 

1374 def __del__(self): 

1375 try: 

1376 self.reset() 

1377 except Exception: 

1378 pass 

1379 

1380 def __len__(self) -> int: 

1381 return len(self.command_stack) 

1382 

1383 def __bool__(self) -> bool: 

1384 """Pipeline instances should always evaluate to True""" 

1385 return True 

1386 

1387 def reset(self) -> None: 

1388 self.command_stack = [] 

1389 self.scripts = set() 

1390 # make sure to reset the connection state in the event that we were 

1391 # watching something 

1392 if self.watching and self.connection: 

1393 try: 

1394 # call this manually since our unwatch or 

1395 # immediate_execute_command methods can call reset() 

1396 self.connection.send_command("UNWATCH") 

1397 self.connection.read_response() 

1398 except ConnectionError: 

1399 # disconnect will also remove any previous WATCHes 

1400 self.connection.disconnect() 

1401 # clean up the other instance attributes 

1402 self.watching = False 

1403 self.explicit_transaction = False 

1404 

1405 # we can safely return the connection to the pool here since we're 

1406 # sure we're no longer WATCHing anything 

1407 if self.connection: 

1408 self.connection_pool.release(self.connection) 

1409 self.connection = None 

1410 

1411 def close(self) -> None: 

1412 """Close the pipeline""" 

1413 self.reset() 

1414 

1415 def multi(self) -> None: 

1416 """ 

1417 Start a transactional block of the pipeline after WATCH commands 

1418 are issued. End the transactional block with `execute`. 

1419 """ 

1420 if self.explicit_transaction: 

1421 raise RedisError("Cannot issue nested calls to MULTI") 

1422 if self.command_stack: 

1423 raise RedisError( 

1424 "Commands without an initial WATCH have already been issued" 

1425 ) 

1426 self.explicit_transaction = True 

1427 

1428 def execute_command(self, *args, **kwargs): 

1429 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction: 

1430 return self.immediate_execute_command(*args, **kwargs) 

1431 return self.pipeline_execute_command(*args, **kwargs) 

1432 

1433 def _disconnect_reset_raise_on_watching( 

1434 self, 

1435 conn: AbstractConnection, 

1436 error: Exception, 

1437 ) -> None: 

1438 """ 

1439 Close the connection reset watching state and 

1440 raise an exception if we were watching. 

1441 

1442 The supported exceptions are already checked in the 

1443 retry object so we don't need to do it here. 

1444 

1445 After we disconnect the connection, it will try to reconnect and 

1446 do a health check as part of the send_command logic(on connection level). 

1447 """ 

1448 conn.disconnect() 

1449 

1450 # if we were already watching a variable, the watch is no longer 

1451 # valid since this connection has died. raise a WatchError, which 

1452 # indicates the user should retry this transaction. 

1453 if self.watching: 

1454 self.reset() 

1455 raise WatchError( 

1456 f"A {type(error).__name__} occurred while watching one or more keys" 

1457 ) 

1458 

1459 def immediate_execute_command(self, *args, **options): 

1460 """ 

1461 Execute a command immediately, but don't auto-retry on the supported 

1462 errors for retry if we're already WATCHing a variable. 

1463 Used when issuing WATCH or subsequent commands retrieving their values but before 

1464 MULTI is called. 

1465 """ 

1466 command_name = args[0] 

1467 conn = self.connection 

1468 # if this is the first call, we need a connection 

1469 if not conn: 

1470 conn = self.connection_pool.get_connection() 

1471 self.connection = conn 

1472 

1473 return conn.retry.call_with_retry( 

1474 lambda: self._send_command_parse_response( 

1475 conn, command_name, *args, **options 

1476 ), 

1477 lambda error: self._disconnect_reset_raise_on_watching(conn, error), 

1478 ) 

1479 

1480 def pipeline_execute_command(self, *args, **options) -> "Pipeline": 

1481 """ 

1482 Stage a command to be executed when execute() is next called 

1483 

1484 Returns the current Pipeline object back so commands can be 

1485 chained together, such as: 

1486 

1487 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

1488 

1489 At some other point, you can then run: pipe.execute(), 

1490 which will execute all commands queued in the pipe. 

1491 """ 

1492 self.command_stack.append((args, options)) 

1493 return self 

1494 

1495 def _execute_transaction( 

1496 self, connection: Connection, commands, raise_on_error 

1497 ) -> List: 

1498 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) 

1499 all_cmds = connection.pack_commands( 

1500 [args for args, options in cmds if EMPTY_RESPONSE not in options] 

1501 ) 

1502 connection.send_packed_command(all_cmds) 

1503 errors = [] 

1504 

1505 # parse off the response for MULTI 

1506 # NOTE: we need to handle ResponseErrors here and continue 

1507 # so that we read all the additional command messages from 

1508 # the socket 

1509 try: 

1510 self.parse_response(connection, "_") 

1511 except ResponseError as e: 

1512 errors.append((0, e)) 

1513 

1514 # and all the other commands 

1515 for i, command in enumerate(commands): 

1516 if EMPTY_RESPONSE in command[1]: 

1517 errors.append((i, command[1][EMPTY_RESPONSE])) 

1518 else: 

1519 try: 

1520 self.parse_response(connection, "_") 

1521 except ResponseError as e: 

1522 self.annotate_exception(e, i + 1, command[0]) 

1523 errors.append((i, e)) 

1524 

1525 # parse the EXEC. 

1526 try: 

1527 response = self.parse_response(connection, "_") 

1528 except ExecAbortError: 

1529 if errors: 

1530 raise errors[0][1] 

1531 raise 

1532 

1533 # EXEC clears any watched keys 

1534 self.watching = False 

1535 

1536 if response is None: 

1537 raise WatchError("Watched variable changed.") 

1538 

1539 # put any parse errors into the response 

1540 for i, e in errors: 

1541 response.insert(i, e) 

1542 

1543 if len(response) != len(commands): 

1544 self.connection.disconnect() 

1545 raise ResponseError( 

1546 "Wrong number of response items from pipeline execution" 

1547 ) 

1548 

1549 # find any errors in the response and raise if necessary 

1550 if raise_on_error: 

1551 self.raise_first_error(commands, response) 

1552 

1553 # We have to run response callbacks manually 

1554 data = [] 

1555 for r, cmd in zip(response, commands): 

1556 if not isinstance(r, Exception): 

1557 args, options = cmd 

1558 # Remove keys entry, it needs only for cache. 

1559 options.pop("keys", None) 

1560 command_name = args[0] 

1561 if command_name in self.response_callbacks: 

1562 r = self.response_callbacks[command_name](r, **options) 

1563 data.append(r) 

1564 

1565 return data 

1566 

1567 def _execute_pipeline(self, connection, commands, raise_on_error): 

1568 # build up all commands into a single request to increase network perf 

1569 all_cmds = connection.pack_commands([args for args, _ in commands]) 

1570 connection.send_packed_command(all_cmds) 

1571 

1572 responses = [] 

1573 for args, options in commands: 

1574 try: 

1575 responses.append(self.parse_response(connection, args[0], **options)) 

1576 except ResponseError as e: 

1577 responses.append(e) 

1578 

1579 if raise_on_error: 

1580 self.raise_first_error(commands, responses) 

1581 

1582 return responses 

1583 

1584 def raise_first_error(self, commands, response): 

1585 for i, r in enumerate(response): 

1586 if isinstance(r, ResponseError): 

1587 self.annotate_exception(r, i + 1, commands[i][0]) 

1588 raise r 

1589 

1590 def annotate_exception(self, exception, number, command): 

1591 cmd = " ".join(map(safe_str, command)) 

1592 msg = ( 

1593 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

1594 f"caused error: {exception.args[0]}" 

1595 ) 

1596 exception.args = (msg,) + exception.args[1:] 

1597 

1598 def parse_response(self, connection, command_name, **options): 

1599 result = Redis.parse_response(self, connection, command_name, **options) 

1600 if command_name in self.UNWATCH_COMMANDS: 

1601 self.watching = False 

1602 elif command_name == "WATCH": 

1603 self.watching = True 

1604 return result 

1605 

1606 def load_scripts(self): 

1607 # make sure all scripts that are about to be run on this pipeline exist 

1608 scripts = list(self.scripts) 

1609 immediate = self.immediate_execute_command 

1610 shas = [s.sha for s in scripts] 

1611 # we can't use the normal script_* methods because they would just 

1612 # get buffered in the pipeline. 

1613 exists = immediate("SCRIPT EXISTS", *shas) 

1614 if not all(exists): 

1615 for s, exist in zip(scripts, exists): 

1616 if not exist: 

1617 s.sha = immediate("SCRIPT LOAD", s.script) 

1618 

1619 def _disconnect_raise_on_watching( 

1620 self, 

1621 conn: AbstractConnection, 

1622 error: Exception, 

1623 ) -> None: 

1624 """ 

1625 Close the connection, raise an exception if we were watching. 

1626 

1627 The supported exceptions are already checked in the 

1628 retry object so we don't need to do it here. 

1629 

1630 After we disconnect the connection, it will try to reconnect and 

1631 do a health check as part of the send_command logic(on connection level). 

1632 """ 

1633 conn.disconnect() 

1634 # if we were watching a variable, the watch is no longer valid 

1635 # since this connection has died. raise a WatchError, which 

1636 # indicates the user should retry this transaction. 

1637 if self.watching: 

1638 raise WatchError( 

1639 f"A {type(error).__name__} occurred while watching one or more keys" 

1640 ) 

1641 

1642 def execute(self, raise_on_error: bool = True) -> List[Any]: 

1643 """Execute all the commands in the current pipeline""" 

1644 stack = self.command_stack 

1645 if not stack and not self.watching: 

1646 return [] 

1647 if self.scripts: 

1648 self.load_scripts() 

1649 if self.transaction or self.explicit_transaction: 

1650 execute = self._execute_transaction 

1651 else: 

1652 execute = self._execute_pipeline 

1653 

1654 conn = self.connection 

1655 if not conn: 

1656 conn = self.connection_pool.get_connection() 

1657 # assign to self.connection so reset() releases the connection 

1658 # back to the pool after we're done 

1659 self.connection = conn 

1660 

1661 try: 

1662 return conn.retry.call_with_retry( 

1663 lambda: execute(conn, stack, raise_on_error), 

1664 lambda error: self._disconnect_raise_on_watching(conn, error), 

1665 ) 

1666 finally: 

1667 # in reset() the connection is disconnected before returned to the pool if 

1668 # it is marked for reconnect. 

1669 self.reset() 

1670 

1671 def discard(self): 

1672 """ 

1673 Flushes all previously queued commands 

1674 See: https://redis.io/commands/DISCARD 

1675 """ 

1676 self.execute_command("DISCARD") 

1677 

1678 def watch(self, *names): 

1679 """Watches the values at keys ``names``""" 

1680 if self.explicit_transaction: 

1681 raise RedisError("Cannot issue a WATCH after a MULTI") 

1682 return self.execute_command("WATCH", *names) 

1683 

1684 def unwatch(self) -> bool: 

1685 """Unwatches all previously specified keys""" 

1686 return self.watching and self.execute_command("UNWATCH") or True