Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

683 statements  

1import copy 

2import re 

3import threading 

4import time 

5from itertools import chain 

6from typing import ( 

7 TYPE_CHECKING, 

8 Any, 

9 Callable, 

10 Dict, 

11 List, 

12 Mapping, 

13 Optional, 

14 Set, 

15 Type, 

16 Union, 

17) 

18 

19from redis._parsers.encoders import Encoder 

20from redis._parsers.helpers import ( 

21 _RedisCallbacks, 

22 _RedisCallbacksRESP2, 

23 _RedisCallbacksRESP3, 

24 bool_ok, 

25) 

26from redis.backoff import ExponentialWithJitterBackoff 

27from redis.cache import CacheConfig, CacheInterface 

28from redis.commands import ( 

29 CoreCommands, 

30 RedisModuleCommands, 

31 SentinelCommands, 

32 list_or_args, 

33) 

34from redis.commands.core import Script 

35from redis.connection import ( 

36 AbstractConnection, 

37 Connection, 

38 ConnectionPool, 

39 SSLConnection, 

40 UnixDomainSocketConnection, 

41) 

42from redis.credentials import CredentialProvider 

43from redis.event import ( 

44 AfterPooledConnectionsInstantiationEvent, 

45 AfterPubSubConnectionInstantiationEvent, 

46 AfterSingleConnectionInstantiationEvent, 

47 ClientType, 

48 EventDispatcher, 

49) 

50from redis.exceptions import ( 

51 ConnectionError, 

52 ExecAbortError, 

53 PubSubError, 

54 RedisError, 

55 ResponseError, 

56 WatchError, 

57) 

58from redis.lock import Lock 

59from redis.retry import Retry 

60from redis.utils import ( 

61 _set_info_logger, 

62 deprecated_args, 

63 get_lib_version, 

64 safe_str, 

65 str_if_bytes, 

66 truncate_text, 

67) 

68 

69if TYPE_CHECKING: 

70 import ssl 

71 

72 import OpenSSL 

73 

74SYM_EMPTY = b"" 

75EMPTY_RESPONSE = "EMPTY_RESPONSE" 

76 

77# some responses (ie. dump) are binary, and just meant to never be decoded 

78NEVER_DECODE = "NEVER_DECODE" 

79 

80 

81class CaseInsensitiveDict(dict): 

82 "Case insensitive dict implementation. Assumes string keys only." 

83 

84 def __init__(self, data: Dict[str, str]) -> None: 

85 for k, v in data.items(): 

86 self[k.upper()] = v 

87 

88 def __contains__(self, k): 

89 return super().__contains__(k.upper()) 

90 

91 def __delitem__(self, k): 

92 super().__delitem__(k.upper()) 

93 

94 def __getitem__(self, k): 

95 return super().__getitem__(k.upper()) 

96 

97 def get(self, k, default=None): 

98 return super().get(k.upper(), default) 

99 

100 def __setitem__(self, k, v): 

101 super().__setitem__(k.upper(), v) 

102 

103 def update(self, data): 

104 data = CaseInsensitiveDict(data) 

105 super().update(data) 

106 

107 

108class AbstractRedis: 

109 pass 

110 

111 

112class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): 

113 """ 

114 Implementation of the Redis protocol. 

115 

116 This abstract class provides a Python interface to all Redis commands 

117 and an implementation of the Redis protocol. 

118 

119 Pipelines derive from this, implementing how 

120 the commands are sent and received to the Redis server. Based on 

121 configuration, an instance will either use a ConnectionPool, or 

122 Connection object to talk to redis. 

123 

124 It is not safe to pass PubSub or Pipeline objects between threads. 

125 """ 

126 

127 @classmethod 

128 def from_url(cls, url: str, **kwargs) -> "Redis": 

129 """ 

130 Return a Redis client object configured from the given URL 

131 

132 For example:: 

133 

134 redis://[[username]:[password]]@localhost:6379/0 

135 rediss://[[username]:[password]]@localhost:6379/0 

136 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

137 

138 Three URL schemes are supported: 

139 

140 - `redis://` creates a TCP socket connection. See more at: 

141 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

142 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

143 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

144 - ``unix://``: creates a Unix Domain Socket connection. 

145 

146 The username, password, hostname, path and all querystring values 

147 are passed through urllib.parse.unquote in order to replace any 

148 percent-encoded values with their corresponding characters. 

149 

150 There are several ways to specify a database number. The first value 

151 found will be used: 

152 

153 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

154 2. If using the redis:// or rediss:// schemes, the path argument 

155 of the url, e.g. redis://localhost/0 

156 3. A ``db`` keyword argument to this function. 

157 

158 If none of these options are specified, the default db=0 is used. 

159 

160 All querystring options are cast to their appropriate Python types. 

161 Boolean arguments can be specified with string values "True"/"False" 

162 or "Yes"/"No". Values that cannot be properly cast cause a 

163 ``ValueError`` to be raised. Once parsed, the querystring arguments 

164 and keyword arguments are passed to the ``ConnectionPool``'s 

165 class initializer. In the case of conflicting arguments, querystring 

166 arguments always win. 

167 

168 """ 

169 single_connection_client = kwargs.pop("single_connection_client", False) 

170 connection_pool = ConnectionPool.from_url(url, **kwargs) 

171 client = cls( 

172 connection_pool=connection_pool, 

173 single_connection_client=single_connection_client, 

174 ) 

175 client.auto_close_connection_pool = True 

176 return client 

177 

178 @classmethod 

179 def from_pool( 

180 cls: Type["Redis"], 

181 connection_pool: ConnectionPool, 

182 ) -> "Redis": 

183 """ 

184 Return a Redis client from the given connection pool. 

185 The Redis client will take ownership of the connection pool and 

186 close it when the Redis client is closed. 

187 """ 

188 client = cls( 

189 connection_pool=connection_pool, 

190 ) 

191 client.auto_close_connection_pool = True 

192 return client 

193 

194 @deprecated_args( 

195 args_to_warn=["retry_on_timeout"], 

196 reason="TimeoutError is included by default.", 

197 version="6.0.0", 

198 ) 

199 def __init__( 

200 self, 

201 host: str = "localhost", 

202 port: int = 6379, 

203 db: int = 0, 

204 password: Optional[str] = None, 

205 socket_timeout: Optional[float] = None, 

206 socket_connect_timeout: Optional[float] = None, 

207 socket_keepalive: Optional[bool] = None, 

208 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, 

209 connection_pool: Optional[ConnectionPool] = None, 

210 unix_socket_path: Optional[str] = None, 

211 encoding: str = "utf-8", 

212 encoding_errors: str = "strict", 

213 decode_responses: bool = False, 

214 retry_on_timeout: bool = False, 

215 retry: Retry = Retry( 

216 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 

217 ), 

218 retry_on_error: Optional[List[Type[Exception]]] = None, 

219 ssl: bool = False, 

220 ssl_keyfile: Optional[str] = None, 

221 ssl_certfile: Optional[str] = None, 

222 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required", 

223 ssl_ca_certs: Optional[str] = None, 

224 ssl_ca_path: Optional[str] = None, 

225 ssl_ca_data: Optional[str] = None, 

226 ssl_check_hostname: bool = True, 

227 ssl_password: Optional[str] = None, 

228 ssl_validate_ocsp: bool = False, 

229 ssl_validate_ocsp_stapled: bool = False, 

230 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None, 

231 ssl_ocsp_expected_cert: Optional[str] = None, 

232 ssl_min_version: Optional["ssl.TLSVersion"] = None, 

233 ssl_ciphers: Optional[str] = None, 

234 max_connections: Optional[int] = None, 

235 single_connection_client: bool = False, 

236 health_check_interval: int = 0, 

237 client_name: Optional[str] = None, 

238 lib_name: Optional[str] = "redis-py", 

239 lib_version: Optional[str] = get_lib_version(), 

240 username: Optional[str] = None, 

241 redis_connect_func: Optional[Callable[[], None]] = None, 

242 credential_provider: Optional[CredentialProvider] = None, 

243 protocol: Optional[int] = 2, 

244 cache: Optional[CacheInterface] = None, 

245 cache_config: Optional[CacheConfig] = None, 

246 event_dispatcher: Optional[EventDispatcher] = None, 

247 ) -> None: 

248 """ 

249 Initialize a new Redis client. 

250 

251 To specify a retry policy for specific errors, you have two options: 

252 

253 1. Set the `retry_on_error` to a list of the error/s to retry on, and 

254 you can also set `retry` to a valid `Retry` object(in case the default 

255 one is not appropriate) - with this approach the retries will be triggered 

256 on the default errors specified in the Retry object enriched with the 

257 errors specified in `retry_on_error`. 

258 

259 2. Define a `Retry` object with configured 'supported_errors' and set 

260 it to the `retry` parameter - with this approach you completely redefine 

261 the errors on which retries will happen. 

262 

263 `retry_on_timeout` is deprecated - please include the TimeoutError 

264 either in the Retry object or in the `retry_on_error` list. 

265 

266 When 'connection_pool' is provided - the retry configuration of the 

267 provided pool will be used. 

268 

269 Args: 

270 

271 single_connection_client: 

272 if `True`, connection pool is not used. In that case `Redis` 

273 instance use is not thread safe. 

274 """ 

275 if event_dispatcher is None: 

276 self._event_dispatcher = EventDispatcher() 

277 else: 

278 self._event_dispatcher = event_dispatcher 

279 if not connection_pool: 

280 if not retry_on_error: 

281 retry_on_error = [] 

282 kwargs = { 

283 "db": db, 

284 "username": username, 

285 "password": password, 

286 "socket_timeout": socket_timeout, 

287 "encoding": encoding, 

288 "encoding_errors": encoding_errors, 

289 "decode_responses": decode_responses, 

290 "retry_on_error": retry_on_error, 

291 "retry": copy.deepcopy(retry), 

292 "max_connections": max_connections, 

293 "health_check_interval": health_check_interval, 

294 "client_name": client_name, 

295 "lib_name": lib_name, 

296 "lib_version": lib_version, 

297 "redis_connect_func": redis_connect_func, 

298 "credential_provider": credential_provider, 

299 "protocol": protocol, 

300 } 

301 # based on input, setup appropriate connection args 

302 if unix_socket_path is not None: 

303 kwargs.update( 

304 { 

305 "path": unix_socket_path, 

306 "connection_class": UnixDomainSocketConnection, 

307 } 

308 ) 

309 else: 

310 # TCP specific options 

311 kwargs.update( 

312 { 

313 "host": host, 

314 "port": port, 

315 "socket_connect_timeout": socket_connect_timeout, 

316 "socket_keepalive": socket_keepalive, 

317 "socket_keepalive_options": socket_keepalive_options, 

318 } 

319 ) 

320 

321 if ssl: 

322 kwargs.update( 

323 { 

324 "connection_class": SSLConnection, 

325 "ssl_keyfile": ssl_keyfile, 

326 "ssl_certfile": ssl_certfile, 

327 "ssl_cert_reqs": ssl_cert_reqs, 

328 "ssl_ca_certs": ssl_ca_certs, 

329 "ssl_ca_data": ssl_ca_data, 

330 "ssl_check_hostname": ssl_check_hostname, 

331 "ssl_password": ssl_password, 

332 "ssl_ca_path": ssl_ca_path, 

333 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled, 

334 "ssl_validate_ocsp": ssl_validate_ocsp, 

335 "ssl_ocsp_context": ssl_ocsp_context, 

336 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert, 

337 "ssl_min_version": ssl_min_version, 

338 "ssl_ciphers": ssl_ciphers, 

339 } 

340 ) 

341 if (cache_config or cache) and protocol in [3, "3"]: 

342 kwargs.update( 

343 { 

344 "cache": cache, 

345 "cache_config": cache_config, 

346 } 

347 ) 

348 connection_pool = ConnectionPool(**kwargs) 

349 self._event_dispatcher.dispatch( 

350 AfterPooledConnectionsInstantiationEvent( 

351 [connection_pool], ClientType.SYNC, credential_provider 

352 ) 

353 ) 

354 self.auto_close_connection_pool = True 

355 else: 

356 self.auto_close_connection_pool = False 

357 self._event_dispatcher.dispatch( 

358 AfterPooledConnectionsInstantiationEvent( 

359 [connection_pool], ClientType.SYNC, credential_provider 

360 ) 

361 ) 

362 

363 self.connection_pool = connection_pool 

364 

365 if (cache_config or cache) and self.connection_pool.get_protocol() not in [ 

366 3, 

367 "3", 

368 ]: 

369 raise RedisError("Client caching is only supported with RESP version 3") 

370 

371 self.single_connection_lock = threading.RLock() 

372 self.connection = None 

373 self._single_connection_client = single_connection_client 

374 if self._single_connection_client: 

375 self.connection = self.connection_pool.get_connection() 

376 self._event_dispatcher.dispatch( 

377 AfterSingleConnectionInstantiationEvent( 

378 self.connection, ClientType.SYNC, self.single_connection_lock 

379 ) 

380 ) 

381 

382 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks) 

383 

384 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]: 

385 self.response_callbacks.update(_RedisCallbacksRESP3) 

386 else: 

387 self.response_callbacks.update(_RedisCallbacksRESP2) 

388 

389 def __repr__(self) -> str: 

390 return ( 

391 f"<{type(self).__module__}.{type(self).__name__}" 

392 f"({repr(self.connection_pool)})>" 

393 ) 

394 

395 def get_encoder(self) -> "Encoder": 

396 """Get the connection pool's encoder""" 

397 return self.connection_pool.get_encoder() 

398 

399 def get_connection_kwargs(self) -> Dict: 

400 """Get the connection's key-word arguments""" 

401 return self.connection_pool.connection_kwargs 

402 

403 def get_retry(self) -> Optional[Retry]: 

404 return self.get_connection_kwargs().get("retry") 

405 

406 def set_retry(self, retry: Retry) -> None: 

407 self.get_connection_kwargs().update({"retry": retry}) 

408 self.connection_pool.set_retry(retry) 

409 

410 def set_response_callback(self, command: str, callback: Callable) -> None: 

411 """Set a custom Response Callback""" 

412 self.response_callbacks[command] = callback 

413 

414 def load_external_module(self, funcname, func) -> None: 

415 """ 

416 This function can be used to add externally defined redis modules, 

417 and their namespaces to the redis client. 

418 

419 funcname - A string containing the name of the function to create 

420 func - The function, being added to this class. 

421 

422 ex: Assume that one has a custom redis module named foomod that 

423 creates command named 'foo.dothing' and 'foo.anotherthing' in redis. 

424 To load function functions into this namespace: 

425 

426 from redis import Redis 

427 from foomodule import F 

428 r = Redis() 

429 r.load_external_module("foo", F) 

430 r.foo().dothing('your', 'arguments') 

431 

432 For a concrete example see the reimport of the redisjson module in 

433 tests/test_connection.py::test_loading_external_modules 

434 """ 

435 setattr(self, funcname, func) 

436 

437 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline": 

438 """ 

439 Return a new pipeline object that can queue multiple commands for 

440 later execution. ``transaction`` indicates whether all commands 

441 should be executed atomically. Apart from making a group of operations 

442 atomic, pipelines are useful for reducing the back-and-forth overhead 

443 between the client and server. 

444 """ 

445 return Pipeline( 

446 self.connection_pool, self.response_callbacks, transaction, shard_hint 

447 ) 

448 

449 def transaction( 

450 self, func: Callable[["Pipeline"], None], *watches, **kwargs 

451 ) -> Union[List[Any], Any, None]: 

452 """ 

453 Convenience method for executing the callable `func` as a transaction 

454 while watching all keys specified in `watches`. The 'func' callable 

455 should expect a single argument which is a Pipeline object. 

456 """ 

457 shard_hint = kwargs.pop("shard_hint", None) 

458 value_from_callable = kwargs.pop("value_from_callable", False) 

459 watch_delay = kwargs.pop("watch_delay", None) 

460 with self.pipeline(True, shard_hint) as pipe: 

461 while True: 

462 try: 

463 if watches: 

464 pipe.watch(*watches) 

465 func_value = func(pipe) 

466 exec_value = pipe.execute() 

467 return func_value if value_from_callable else exec_value 

468 except WatchError: 

469 if watch_delay is not None and watch_delay > 0: 

470 time.sleep(watch_delay) 

471 continue 

472 

473 def lock( 

474 self, 

475 name: str, 

476 timeout: Optional[float] = None, 

477 sleep: float = 0.1, 

478 blocking: bool = True, 

479 blocking_timeout: Optional[float] = None, 

480 lock_class: Union[None, Any] = None, 

481 thread_local: bool = True, 

482 raise_on_release_error: bool = True, 

483 ): 

484 """ 

485 Return a new Lock object using key ``name`` that mimics 

486 the behavior of threading.Lock. 

487 

488 If specified, ``timeout`` indicates a maximum life for the lock. 

489 By default, it will remain locked until release() is called. 

490 

491 ``sleep`` indicates the amount of time to sleep per loop iteration 

492 when the lock is in blocking mode and another client is currently 

493 holding the lock. 

494 

495 ``blocking`` indicates whether calling ``acquire`` should block until 

496 the lock has been acquired or to fail immediately, causing ``acquire`` 

497 to return False and the lock not being acquired. Defaults to True. 

498 Note this value can be overridden by passing a ``blocking`` 

499 argument to ``acquire``. 

500 

501 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

502 spend trying to acquire the lock. A value of ``None`` indicates 

503 continue trying forever. ``blocking_timeout`` can be specified as a 

504 float or integer, both representing the number of seconds to wait. 

505 

506 ``lock_class`` forces the specified lock implementation. Note that as 

507 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

508 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

509 you have created your own custom lock class. 

510 

511 ``thread_local`` indicates whether the lock token is placed in 

512 thread-local storage. By default, the token is placed in thread local 

513 storage so that a thread only sees its token, not a token set by 

514 another thread. Consider the following timeline: 

515 

516 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

517 thread-1 sets the token to "abc" 

518 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

519 Lock instance. 

520 time: 5, thread-1 has not yet completed. redis expires the lock 

521 key. 

522 time: 5, thread-2 acquired `my-lock` now that it's available. 

523 thread-2 sets the token to "xyz" 

524 time: 6, thread-1 finishes its work and calls release(). if the 

525 token is *not* stored in thread local storage, then 

526 thread-1 would see the token value as "xyz" and would be 

527 able to successfully release the thread-2's lock. 

528 

529 ``raise_on_release_error`` indicates whether to raise an exception when 

530 the lock is no longer owned when exiting the context manager. By default, 

531 this is True, meaning an exception will be raised. If False, the warning 

532 will be logged and the exception will be suppressed. 

533 

534 In some use cases it's necessary to disable thread local storage. For 

535 example, if you have code where one thread acquires a lock and passes 

536 that lock instance to a worker thread to release later. If thread 

537 local storage isn't disabled in this case, the worker thread won't see 

538 the token set by the thread that acquired the lock. Our assumption 

539 is that these cases aren't common and as such default to using 

540 thread local storage.""" 

541 if lock_class is None: 

542 lock_class = Lock 

543 return lock_class( 

544 self, 

545 name, 

546 timeout=timeout, 

547 sleep=sleep, 

548 blocking=blocking, 

549 blocking_timeout=blocking_timeout, 

550 thread_local=thread_local, 

551 raise_on_release_error=raise_on_release_error, 

552 ) 

553 

554 def pubsub(self, **kwargs): 

555 """ 

556 Return a Publish/Subscribe object. With this object, you can 

557 subscribe to channels and listen for messages that get published to 

558 them. 

559 """ 

560 return PubSub( 

561 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs 

562 ) 

563 

564 def monitor(self): 

565 return Monitor(self.connection_pool) 

566 

567 def client(self): 

568 return self.__class__( 

569 connection_pool=self.connection_pool, single_connection_client=True 

570 ) 

571 

572 def __enter__(self): 

573 return self 

574 

575 def __exit__(self, exc_type, exc_value, traceback): 

576 self.close() 

577 

578 def __del__(self): 

579 try: 

580 self.close() 

581 except Exception: 

582 pass 

583 

584 def close(self) -> None: 

585 # In case a connection property does not yet exist 

586 # (due to a crash earlier in the Redis() constructor), return 

587 # immediately as there is nothing to clean-up. 

588 if not hasattr(self, "connection"): 

589 return 

590 

591 conn = self.connection 

592 if conn: 

593 self.connection = None 

594 self.connection_pool.release(conn) 

595 

596 if self.auto_close_connection_pool: 

597 self.connection_pool.disconnect() 

598 

599 def _send_command_parse_response(self, conn, command_name, *args, **options): 

600 """ 

601 Send a command and parse the response 

602 """ 

603 conn.send_command(*args, **options) 

604 return self.parse_response(conn, command_name, **options) 

605 

606 def _close_connection(self, conn) -> None: 

607 """ 

608 Close the connection before retrying. 

609 

610 The supported exceptions are already checked in the 

611 retry object so we don't need to do it here. 

612 

613 After we disconnect the connection, it will try to reconnect and 

614 do a health check as part of the send_command logic(on connection level). 

615 """ 

616 

617 conn.disconnect() 

618 

619 # COMMAND EXECUTION AND PROTOCOL PARSING 

620 def execute_command(self, *args, **options): 

621 return self._execute_command(*args, **options) 

622 

623 def _execute_command(self, *args, **options): 

624 """Execute a command and return a parsed response""" 

625 pool = self.connection_pool 

626 command_name = args[0] 

627 conn = self.connection or pool.get_connection() 

628 

629 if self._single_connection_client: 

630 self.single_connection_lock.acquire() 

631 try: 

632 return conn.retry.call_with_retry( 

633 lambda: self._send_command_parse_response( 

634 conn, command_name, *args, **options 

635 ), 

636 lambda _: self._close_connection(conn), 

637 ) 

638 finally: 

639 if self._single_connection_client: 

640 self.single_connection_lock.release() 

641 if not self.connection: 

642 pool.release(conn) 

643 

644 def parse_response(self, connection, command_name, **options): 

645 """Parses a response from the Redis server""" 

646 try: 

647 if NEVER_DECODE in options: 

648 response = connection.read_response(disable_decoding=True) 

649 options.pop(NEVER_DECODE) 

650 else: 

651 response = connection.read_response() 

652 except ResponseError: 

653 if EMPTY_RESPONSE in options: 

654 return options[EMPTY_RESPONSE] 

655 raise 

656 

657 if EMPTY_RESPONSE in options: 

658 options.pop(EMPTY_RESPONSE) 

659 

660 # Remove keys entry, it needs only for cache. 

661 options.pop("keys", None) 

662 

663 if command_name in self.response_callbacks: 

664 return self.response_callbacks[command_name](response, **options) 

665 return response 

666 

667 def get_cache(self) -> Optional[CacheInterface]: 

668 return self.connection_pool.cache 

669 

670 

671StrictRedis = Redis 

672 

673 

674class Monitor: 

675 """ 

676 Monitor is useful for handling the MONITOR command to the redis server. 

677 next_command() method returns one command from monitor 

678 listen() method yields commands from monitor. 

679 """ 

680 

681 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)") 

682 command_re = re.compile(r'"(.*?)(?<!\\)"') 

683 

684 def __init__(self, connection_pool): 

685 self.connection_pool = connection_pool 

686 self.connection = self.connection_pool.get_connection() 

687 

688 def __enter__(self): 

689 self.connection.send_command("MONITOR") 

690 # check that monitor returns 'OK', but don't return it to user 

691 response = self.connection.read_response() 

692 if not bool_ok(response): 

693 raise RedisError(f"MONITOR failed: {response}") 

694 return self 

695 

696 def __exit__(self, *args): 

697 self.connection.disconnect() 

698 self.connection_pool.release(self.connection) 

699 

700 def next_command(self): 

701 """Parse the response from a monitor command""" 

702 response = self.connection.read_response() 

703 if isinstance(response, bytes): 

704 response = self.connection.encoder.decode(response, force=True) 

705 command_time, command_data = response.split(" ", 1) 

706 m = self.monitor_re.match(command_data) 

707 db_id, client_info, command = m.groups() 

708 command = " ".join(self.command_re.findall(command)) 

709 # Redis escapes double quotes because each piece of the command 

710 # string is surrounded by double quotes. We don't have that 

711 # requirement so remove the escaping and leave the quote. 

712 command = command.replace('\\"', '"') 

713 

714 if client_info == "lua": 

715 client_address = "lua" 

716 client_port = "" 

717 client_type = "lua" 

718 elif client_info.startswith("unix"): 

719 client_address = "unix" 

720 client_port = client_info[5:] 

721 client_type = "unix" 

722 else: 

723 # use rsplit as ipv6 addresses contain colons 

724 client_address, client_port = client_info.rsplit(":", 1) 

725 client_type = "tcp" 

726 return { 

727 "time": float(command_time), 

728 "db": int(db_id), 

729 "client_address": client_address, 

730 "client_port": client_port, 

731 "client_type": client_type, 

732 "command": command, 

733 } 

734 

735 def listen(self): 

736 """Listen for commands coming to the server.""" 

737 while True: 

738 yield self.next_command() 

739 

740 

741class PubSub: 

742 """ 

743 PubSub provides publish, subscribe and listen support to Redis channels. 

744 

745 After subscribing to one or more channels, the listen() method will block 

746 until a message arrives on one of the subscribed channels. That message 

747 will be returned and it's safe to start listening again. 

748 """ 

749 

750 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage") 

751 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe") 

752 HEALTH_CHECK_MESSAGE = "redis-py-health-check" 

753 

754 def __init__( 

755 self, 

756 connection_pool, 

757 shard_hint=None, 

758 ignore_subscribe_messages: bool = False, 

759 encoder: Optional["Encoder"] = None, 

760 push_handler_func: Union[None, Callable[[str], None]] = None, 

761 event_dispatcher: Optional["EventDispatcher"] = None, 

762 ): 

763 self.connection_pool = connection_pool 

764 self.shard_hint = shard_hint 

765 self.ignore_subscribe_messages = ignore_subscribe_messages 

766 self.connection = None 

767 self.subscribed_event = threading.Event() 

768 # we need to know the encoding options for this connection in order 

769 # to lookup channel and pattern names for callback handlers. 

770 self.encoder = encoder 

771 self.push_handler_func = push_handler_func 

772 if event_dispatcher is None: 

773 self._event_dispatcher = EventDispatcher() 

774 else: 

775 self._event_dispatcher = event_dispatcher 

776 

777 self._lock = threading.RLock() 

778 if self.encoder is None: 

779 self.encoder = self.connection_pool.get_encoder() 

780 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) 

781 if self.encoder.decode_responses: 

782 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] 

783 else: 

784 self.health_check_response = [b"pong", self.health_check_response_b] 

785 if self.push_handler_func is None: 

786 _set_info_logger() 

787 self.reset() 

788 

789 def __enter__(self) -> "PubSub": 

790 return self 

791 

792 def __exit__(self, exc_type, exc_value, traceback) -> None: 

793 self.reset() 

794 

795 def __del__(self) -> None: 

796 try: 

797 # if this object went out of scope prior to shutting down 

798 # subscriptions, close the connection manually before 

799 # returning it to the connection pool 

800 self.reset() 

801 except Exception: 

802 pass 

803 

804 def reset(self) -> None: 

805 if self.connection: 

806 self.connection.disconnect() 

807 self.connection.deregister_connect_callback(self.on_connect) 

808 self.connection_pool.release(self.connection) 

809 self.connection = None 

810 self.health_check_response_counter = 0 

811 self.channels = {} 

812 self.pending_unsubscribe_channels = set() 

813 self.shard_channels = {} 

814 self.pending_unsubscribe_shard_channels = set() 

815 self.patterns = {} 

816 self.pending_unsubscribe_patterns = set() 

817 self.subscribed_event.clear() 

818 

819 def close(self) -> None: 

820 self.reset() 

821 

822 def on_connect(self, connection) -> None: 

823 "Re-subscribe to any channels and patterns previously subscribed to" 

824 # NOTE: for python3, we can't pass bytestrings as keyword arguments 

825 # so we need to decode channel/pattern names back to unicode strings 

826 # before passing them to [p]subscribe. 

827 self.pending_unsubscribe_channels.clear() 

828 self.pending_unsubscribe_patterns.clear() 

829 self.pending_unsubscribe_shard_channels.clear() 

830 if self.channels: 

831 channels = { 

832 self.encoder.decode(k, force=True): v for k, v in self.channels.items() 

833 } 

834 self.subscribe(**channels) 

835 if self.patterns: 

836 patterns = { 

837 self.encoder.decode(k, force=True): v for k, v in self.patterns.items() 

838 } 

839 self.psubscribe(**patterns) 

840 if self.shard_channels: 

841 shard_channels = { 

842 self.encoder.decode(k, force=True): v 

843 for k, v in self.shard_channels.items() 

844 } 

845 self.ssubscribe(**shard_channels) 

846 

847 @property 

848 def subscribed(self) -> bool: 

849 """Indicates if there are subscriptions to any channels or patterns""" 

850 return self.subscribed_event.is_set() 

851 

852 def execute_command(self, *args): 

853 """Execute a publish/subscribe command""" 

854 

855 # NOTE: don't parse the response in this function -- it could pull a 

856 # legitimate message off the stack if the connection is already 

857 # subscribed to one or more channels 

858 

859 if self.connection is None: 

860 self.connection = self.connection_pool.get_connection() 

861 # register a callback that re-subscribes to any channels we 

862 # were listening to when we were disconnected 

863 self.connection.register_connect_callback(self.on_connect) 

864 if self.push_handler_func is not None: 

865 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

866 self._event_dispatcher.dispatch( 

867 AfterPubSubConnectionInstantiationEvent( 

868 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

869 ) 

870 ) 

871 connection = self.connection 

872 kwargs = {"check_health": not self.subscribed} 

873 if not self.subscribed: 

874 self.clean_health_check_responses() 

875 with self._lock: 

876 self._execute(connection, connection.send_command, *args, **kwargs) 

877 

878 def clean_health_check_responses(self) -> None: 

879 """ 

880 If any health check responses are present, clean them 

881 """ 

882 ttl = 10 

883 conn = self.connection 

884 while self.health_check_response_counter > 0 and ttl > 0: 

885 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout): 

886 response = self._execute(conn, conn.read_response) 

887 if self.is_health_check_response(response): 

888 self.health_check_response_counter -= 1 

889 else: 

890 raise PubSubError( 

891 "A non health check response was cleaned by " 

892 "execute_command: {}".format(response) 

893 ) 

894 ttl -= 1 

895 

896 def _reconnect(self, conn) -> None: 

897 """ 

898 The supported exceptions are already checked in the 

899 retry object so we don't need to do it here. 

900 

901 In this error handler we are trying to reconnect to the server. 

902 """ 

903 conn.disconnect() 

904 conn.connect() 

905 

906 def _execute(self, conn, command, *args, **kwargs): 

907 """ 

908 Connect manually upon disconnection. If the Redis server is down, 

909 this will fail and raise a ConnectionError as desired. 

910 After reconnection, the ``on_connect`` callback should have been 

911 called by the # connection to resubscribe us to any channels and 

912 patterns we were previously listening to 

913 """ 

914 return conn.retry.call_with_retry( 

915 lambda: command(*args, **kwargs), 

916 lambda _: self._reconnect(conn), 

917 ) 

918 

919 def parse_response(self, block=True, timeout=0): 

920 """Parse the response from a publish/subscribe command""" 

921 conn = self.connection 

922 if conn is None: 

923 raise RuntimeError( 

924 "pubsub connection not set: " 

925 "did you forget to call subscribe() or psubscribe()?" 

926 ) 

927 

928 self.check_health() 

929 

930 def try_read(): 

931 if not block: 

932 if not conn.can_read(timeout=timeout): 

933 return None 

934 else: 

935 conn.connect() 

936 return conn.read_response(disconnect_on_error=False, push_request=True) 

937 

938 response = self._execute(conn, try_read) 

939 

940 if self.is_health_check_response(response): 

941 # ignore the health check message as user might not expect it 

942 self.health_check_response_counter -= 1 

943 return None 

944 return response 

945 

946 def is_health_check_response(self, response) -> bool: 

947 """ 

948 Check if the response is a health check response. 

949 If there are no subscriptions redis responds to PING command with a 

950 bulk response, instead of a multi-bulk with "pong" and the response. 

951 """ 

952 return response in [ 

953 self.health_check_response, # If there was a subscription 

954 self.health_check_response_b, # If there wasn't 

955 ] 

956 

957 def check_health(self) -> None: 

958 conn = self.connection 

959 if conn is None: 

960 raise RuntimeError( 

961 "pubsub connection not set: " 

962 "did you forget to call subscribe() or psubscribe()?" 

963 ) 

964 

965 if conn.health_check_interval and time.monotonic() > conn.next_health_check: 

966 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False) 

967 self.health_check_response_counter += 1 

968 

969 def _normalize_keys(self, data) -> Dict: 

970 """ 

971 normalize channel/pattern names to be either bytes or strings 

972 based on whether responses are automatically decoded. this saves us 

973 from coercing the value for each message coming in. 

974 """ 

975 encode = self.encoder.encode 

976 decode = self.encoder.decode 

977 return {decode(encode(k)): v for k, v in data.items()} 

978 

979 def psubscribe(self, *args, **kwargs): 

980 """ 

981 Subscribe to channel patterns. Patterns supplied as keyword arguments 

982 expect a pattern name as the key and a callable as the value. A 

983 pattern's callable will be invoked automatically when a message is 

984 received on that pattern rather than producing a message via 

985 ``listen()``. 

986 """ 

987 if args: 

988 args = list_or_args(args[0], args[1:]) 

989 new_patterns = dict.fromkeys(args) 

990 new_patterns.update(kwargs) 

991 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys()) 

992 # update the patterns dict AFTER we send the command. we don't want to 

993 # subscribe twice to these patterns, once for the command and again 

994 # for the reconnection. 

995 new_patterns = self._normalize_keys(new_patterns) 

996 self.patterns.update(new_patterns) 

997 if not self.subscribed: 

998 # Set the subscribed_event flag to True 

999 self.subscribed_event.set() 

1000 # Clear the health check counter 

1001 self.health_check_response_counter = 0 

1002 self.pending_unsubscribe_patterns.difference_update(new_patterns) 

1003 return ret_val 

1004 

1005 def punsubscribe(self, *args): 

1006 """ 

1007 Unsubscribe from the supplied patterns. If empty, unsubscribe from 

1008 all patterns. 

1009 """ 

1010 if args: 

1011 args = list_or_args(args[0], args[1:]) 

1012 patterns = self._normalize_keys(dict.fromkeys(args)) 

1013 else: 

1014 patterns = self.patterns 

1015 self.pending_unsubscribe_patterns.update(patterns) 

1016 return self.execute_command("PUNSUBSCRIBE", *args) 

1017 

1018 def subscribe(self, *args, **kwargs): 

1019 """ 

1020 Subscribe to channels. Channels supplied as keyword arguments expect 

1021 a channel name as the key and a callable as the value. A channel's 

1022 callable will be invoked automatically when a message is received on 

1023 that channel rather than producing a message via ``listen()`` or 

1024 ``get_message()``. 

1025 """ 

1026 if args: 

1027 args = list_or_args(args[0], args[1:]) 

1028 new_channels = dict.fromkeys(args) 

1029 new_channels.update(kwargs) 

1030 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys()) 

1031 # update the channels dict AFTER we send the command. we don't want to 

1032 # subscribe twice to these channels, once for the command and again 

1033 # for the reconnection. 

1034 new_channels = self._normalize_keys(new_channels) 

1035 self.channels.update(new_channels) 

1036 if not self.subscribed: 

1037 # Set the subscribed_event flag to True 

1038 self.subscribed_event.set() 

1039 # Clear the health check counter 

1040 self.health_check_response_counter = 0 

1041 self.pending_unsubscribe_channels.difference_update(new_channels) 

1042 return ret_val 

1043 

1044 def unsubscribe(self, *args): 

1045 """ 

1046 Unsubscribe from the supplied channels. If empty, unsubscribe from 

1047 all channels 

1048 """ 

1049 if args: 

1050 args = list_or_args(args[0], args[1:]) 

1051 channels = self._normalize_keys(dict.fromkeys(args)) 

1052 else: 

1053 channels = self.channels 

1054 self.pending_unsubscribe_channels.update(channels) 

1055 return self.execute_command("UNSUBSCRIBE", *args) 

1056 

1057 def ssubscribe(self, *args, target_node=None, **kwargs): 

1058 """ 

1059 Subscribes the client to the specified shard channels. 

1060 Channels supplied as keyword arguments expect a channel name as the key 

1061 and a callable as the value. A channel's callable will be invoked automatically 

1062 when a message is received on that channel rather than producing a message via 

1063 ``listen()`` or ``get_sharded_message()``. 

1064 """ 

1065 if args: 

1066 args = list_or_args(args[0], args[1:]) 

1067 new_s_channels = dict.fromkeys(args) 

1068 new_s_channels.update(kwargs) 

1069 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys()) 

1070 # update the s_channels dict AFTER we send the command. we don't want to 

1071 # subscribe twice to these channels, once for the command and again 

1072 # for the reconnection. 

1073 new_s_channels = self._normalize_keys(new_s_channels) 

1074 self.shard_channels.update(new_s_channels) 

1075 if not self.subscribed: 

1076 # Set the subscribed_event flag to True 

1077 self.subscribed_event.set() 

1078 # Clear the health check counter 

1079 self.health_check_response_counter = 0 

1080 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels) 

1081 return ret_val 

1082 

1083 def sunsubscribe(self, *args, target_node=None): 

1084 """ 

1085 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from 

1086 all shard_channels 

1087 """ 

1088 if args: 

1089 args = list_or_args(args[0], args[1:]) 

1090 s_channels = self._normalize_keys(dict.fromkeys(args)) 

1091 else: 

1092 s_channels = self.shard_channels 

1093 self.pending_unsubscribe_shard_channels.update(s_channels) 

1094 return self.execute_command("SUNSUBSCRIBE", *args) 

1095 

1096 def listen(self): 

1097 "Listen for messages on channels this client has been subscribed to" 

1098 while self.subscribed: 

1099 response = self.handle_message(self.parse_response(block=True)) 

1100 if response is not None: 

1101 yield response 

1102 

1103 def get_message( 

1104 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0 

1105 ): 

1106 """ 

1107 Get the next message if one is available, otherwise None. 

1108 

1109 If timeout is specified, the system will wait for `timeout` seconds 

1110 before returning. Timeout should be specified as a floating point 

1111 number, or None, to wait indefinitely. 

1112 """ 

1113 if not self.subscribed: 

1114 # Wait for subscription 

1115 start_time = time.monotonic() 

1116 if self.subscribed_event.wait(timeout) is True: 

1117 # The connection was subscribed during the timeout time frame. 

1118 # The timeout should be adjusted based on the time spent 

1119 # waiting for the subscription 

1120 time_spent = time.monotonic() - start_time 

1121 timeout = max(0.0, timeout - time_spent) 

1122 else: 

1123 # The connection isn't subscribed to any channels or patterns, 

1124 # so no messages are available 

1125 return None 

1126 

1127 response = self.parse_response(block=(timeout is None), timeout=timeout) 

1128 if response: 

1129 return self.handle_message(response, ignore_subscribe_messages) 

1130 return None 

1131 

1132 get_sharded_message = get_message 

1133 

1134 def ping(self, message: Union[str, None] = None) -> bool: 

1135 """ 

1136 Ping the Redis server 

1137 """ 

1138 args = ["PING", message] if message is not None else ["PING"] 

1139 return self.execute_command(*args) 

1140 

1141 def handle_message(self, response, ignore_subscribe_messages=False): 

1142 """ 

1143 Parses a pub/sub message. If the channel or pattern was subscribed to 

1144 with a message handler, the handler is invoked instead of a parsed 

1145 message being returned. 

1146 """ 

1147 if response is None: 

1148 return None 

1149 if isinstance(response, bytes): 

1150 response = [b"pong", response] if response != b"PONG" else [b"pong", b""] 

1151 message_type = str_if_bytes(response[0]) 

1152 if message_type == "pmessage": 

1153 message = { 

1154 "type": message_type, 

1155 "pattern": response[1], 

1156 "channel": response[2], 

1157 "data": response[3], 

1158 } 

1159 elif message_type == "pong": 

1160 message = { 

1161 "type": message_type, 

1162 "pattern": None, 

1163 "channel": None, 

1164 "data": response[1], 

1165 } 

1166 else: 

1167 message = { 

1168 "type": message_type, 

1169 "pattern": None, 

1170 "channel": response[1], 

1171 "data": response[2], 

1172 } 

1173 

1174 # if this is an unsubscribe message, remove it from memory 

1175 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 

1176 if message_type == "punsubscribe": 

1177 pattern = response[1] 

1178 if pattern in self.pending_unsubscribe_patterns: 

1179 self.pending_unsubscribe_patterns.remove(pattern) 

1180 self.patterns.pop(pattern, None) 

1181 elif message_type == "sunsubscribe": 

1182 s_channel = response[1] 

1183 if s_channel in self.pending_unsubscribe_shard_channels: 

1184 self.pending_unsubscribe_shard_channels.remove(s_channel) 

1185 self.shard_channels.pop(s_channel, None) 

1186 else: 

1187 channel = response[1] 

1188 if channel in self.pending_unsubscribe_channels: 

1189 self.pending_unsubscribe_channels.remove(channel) 

1190 self.channels.pop(channel, None) 

1191 if not self.channels and not self.patterns and not self.shard_channels: 

1192 # There are no subscriptions anymore, set subscribed_event flag 

1193 # to false 

1194 self.subscribed_event.clear() 

1195 

1196 if message_type in self.PUBLISH_MESSAGE_TYPES: 

1197 # if there's a message handler, invoke it 

1198 if message_type == "pmessage": 

1199 handler = self.patterns.get(message["pattern"], None) 

1200 elif message_type == "smessage": 

1201 handler = self.shard_channels.get(message["channel"], None) 

1202 else: 

1203 handler = self.channels.get(message["channel"], None) 

1204 if handler: 

1205 handler(message) 

1206 return None 

1207 elif message_type != "pong": 

1208 # this is a subscribe/unsubscribe message. ignore if we don't 

1209 # want them 

1210 if ignore_subscribe_messages or self.ignore_subscribe_messages: 

1211 return None 

1212 

1213 return message 

1214 

1215 def run_in_thread( 

1216 self, 

1217 sleep_time: float = 0.0, 

1218 daemon: bool = False, 

1219 exception_handler: Optional[Callable] = None, 

1220 ) -> "PubSubWorkerThread": 

1221 for channel, handler in self.channels.items(): 

1222 if handler is None: 

1223 raise PubSubError(f"Channel: '{channel}' has no handler registered") 

1224 for pattern, handler in self.patterns.items(): 

1225 if handler is None: 

1226 raise PubSubError(f"Pattern: '{pattern}' has no handler registered") 

1227 for s_channel, handler in self.shard_channels.items(): 

1228 if handler is None: 

1229 raise PubSubError( 

1230 f"Shard Channel: '{s_channel}' has no handler registered" 

1231 ) 

1232 

1233 thread = PubSubWorkerThread( 

1234 self, sleep_time, daemon=daemon, exception_handler=exception_handler 

1235 ) 

1236 thread.start() 

1237 return thread 

1238 

1239 

1240class PubSubWorkerThread(threading.Thread): 

1241 def __init__( 

1242 self, 

1243 pubsub, 

1244 sleep_time: float, 

1245 daemon: bool = False, 

1246 exception_handler: Union[ 

1247 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None 

1248 ] = None, 

1249 ): 

1250 super().__init__() 

1251 self.daemon = daemon 

1252 self.pubsub = pubsub 

1253 self.sleep_time = sleep_time 

1254 self.exception_handler = exception_handler 

1255 self._running = threading.Event() 

1256 

1257 def run(self) -> None: 

1258 if self._running.is_set(): 

1259 return 

1260 self._running.set() 

1261 pubsub = self.pubsub 

1262 sleep_time = self.sleep_time 

1263 while self._running.is_set(): 

1264 try: 

1265 pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time) 

1266 except BaseException as e: 

1267 if self.exception_handler is None: 

1268 raise 

1269 self.exception_handler(e, pubsub, self) 

1270 pubsub.close() 

1271 

1272 def stop(self) -> None: 

1273 # trip the flag so the run loop exits. the run loop will 

1274 # close the pubsub connection, which disconnects the socket 

1275 # and returns the connection to the pool. 

1276 self._running.clear() 

1277 

1278 

1279class Pipeline(Redis): 

1280 """ 

1281 Pipelines provide a way to transmit multiple commands to the Redis server 

1282 in one transmission. This is convenient for batch processing, such as 

1283 saving all the values in a list to Redis. 

1284 

1285 All commands executed within a pipeline(when running in transactional mode, 

1286 which is the default behavior) are wrapped with MULTI and EXEC 

1287 calls. This guarantees all commands executed in the pipeline will be 

1288 executed atomically. 

1289 

1290 Any command raising an exception does *not* halt the execution of 

1291 subsequent commands in the pipeline. Instead, the exception is caught 

1292 and its instance is placed into the response list returned by execute(). 

1293 Code iterating over the response list should be able to deal with an 

1294 instance of an exception as a potential value. In general, these will be 

1295 ResponseError exceptions, such as those raised when issuing a command 

1296 on a key of a different datatype. 

1297 """ 

1298 

1299 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

1300 

1301 def __init__( 

1302 self, 

1303 connection_pool: ConnectionPool, 

1304 response_callbacks, 

1305 transaction, 

1306 shard_hint, 

1307 ): 

1308 self.connection_pool = connection_pool 

1309 self.connection: Optional[Connection] = None 

1310 self.response_callbacks = response_callbacks 

1311 self.transaction = transaction 

1312 self.shard_hint = shard_hint 

1313 self.watching = False 

1314 self.command_stack = [] 

1315 self.scripts: Set[Script] = set() 

1316 self.explicit_transaction = False 

1317 

1318 def __enter__(self) -> "Pipeline": 

1319 return self 

1320 

1321 def __exit__(self, exc_type, exc_value, traceback): 

1322 self.reset() 

1323 

1324 def __del__(self): 

1325 try: 

1326 self.reset() 

1327 except Exception: 

1328 pass 

1329 

1330 def __len__(self) -> int: 

1331 return len(self.command_stack) 

1332 

1333 def __bool__(self) -> bool: 

1334 """Pipeline instances should always evaluate to True""" 

1335 return True 

1336 

1337 def reset(self) -> None: 

1338 self.command_stack = [] 

1339 self.scripts = set() 

1340 # make sure to reset the connection state in the event that we were 

1341 # watching something 

1342 if self.watching and self.connection: 

1343 try: 

1344 # call this manually since our unwatch or 

1345 # immediate_execute_command methods can call reset() 

1346 self.connection.send_command("UNWATCH") 

1347 self.connection.read_response() 

1348 except ConnectionError: 

1349 # disconnect will also remove any previous WATCHes 

1350 self.connection.disconnect() 

1351 # clean up the other instance attributes 

1352 self.watching = False 

1353 self.explicit_transaction = False 

1354 # we can safely return the connection to the pool here since we're 

1355 # sure we're no longer WATCHing anything 

1356 if self.connection: 

1357 self.connection_pool.release(self.connection) 

1358 self.connection = None 

1359 

1360 def close(self) -> None: 

1361 """Close the pipeline""" 

1362 self.reset() 

1363 

1364 def multi(self) -> None: 

1365 """ 

1366 Start a transactional block of the pipeline after WATCH commands 

1367 are issued. End the transactional block with `execute`. 

1368 """ 

1369 if self.explicit_transaction: 

1370 raise RedisError("Cannot issue nested calls to MULTI") 

1371 if self.command_stack: 

1372 raise RedisError( 

1373 "Commands without an initial WATCH have already been issued" 

1374 ) 

1375 self.explicit_transaction = True 

1376 

1377 def execute_command(self, *args, **kwargs): 

1378 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction: 

1379 return self.immediate_execute_command(*args, **kwargs) 

1380 return self.pipeline_execute_command(*args, **kwargs) 

1381 

1382 def _disconnect_reset_raise_on_watching( 

1383 self, 

1384 conn: AbstractConnection, 

1385 error: Exception, 

1386 ) -> None: 

1387 """ 

1388 Close the connection reset watching state and 

1389 raise an exception if we were watching. 

1390 

1391 The supported exceptions are already checked in the 

1392 retry object so we don't need to do it here. 

1393 

1394 After we disconnect the connection, it will try to reconnect and 

1395 do a health check as part of the send_command logic(on connection level). 

1396 """ 

1397 conn.disconnect() 

1398 

1399 # if we were already watching a variable, the watch is no longer 

1400 # valid since this connection has died. raise a WatchError, which 

1401 # indicates the user should retry this transaction. 

1402 if self.watching: 

1403 self.reset() 

1404 raise WatchError( 

1405 f"A {type(error).__name__} occurred while watching one or more keys" 

1406 ) 

1407 

1408 def immediate_execute_command(self, *args, **options): 

1409 """ 

1410 Execute a command immediately, but don't auto-retry on the supported 

1411 errors for retry if we're already WATCHing a variable. 

1412 Used when issuing WATCH or subsequent commands retrieving their values but before 

1413 MULTI is called. 

1414 """ 

1415 command_name = args[0] 

1416 conn = self.connection 

1417 # if this is the first call, we need a connection 

1418 if not conn: 

1419 conn = self.connection_pool.get_connection() 

1420 self.connection = conn 

1421 

1422 return conn.retry.call_with_retry( 

1423 lambda: self._send_command_parse_response( 

1424 conn, command_name, *args, **options 

1425 ), 

1426 lambda error: self._disconnect_reset_raise_on_watching(conn, error), 

1427 ) 

1428 

1429 def pipeline_execute_command(self, *args, **options) -> "Pipeline": 

1430 """ 

1431 Stage a command to be executed when execute() is next called 

1432 

1433 Returns the current Pipeline object back so commands can be 

1434 chained together, such as: 

1435 

1436 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

1437 

1438 At some other point, you can then run: pipe.execute(), 

1439 which will execute all commands queued in the pipe. 

1440 """ 

1441 self.command_stack.append((args, options)) 

1442 return self 

1443 

1444 def _execute_transaction( 

1445 self, connection: Connection, commands, raise_on_error 

1446 ) -> List: 

1447 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) 

1448 all_cmds = connection.pack_commands( 

1449 [args for args, options in cmds if EMPTY_RESPONSE not in options] 

1450 ) 

1451 connection.send_packed_command(all_cmds) 

1452 errors = [] 

1453 

1454 # parse off the response for MULTI 

1455 # NOTE: we need to handle ResponseErrors here and continue 

1456 # so that we read all the additional command messages from 

1457 # the socket 

1458 try: 

1459 self.parse_response(connection, "_") 

1460 except ResponseError as e: 

1461 errors.append((0, e)) 

1462 

1463 # and all the other commands 

1464 for i, command in enumerate(commands): 

1465 if EMPTY_RESPONSE in command[1]: 

1466 errors.append((i, command[1][EMPTY_RESPONSE])) 

1467 else: 

1468 try: 

1469 self.parse_response(connection, "_") 

1470 except ResponseError as e: 

1471 self.annotate_exception(e, i + 1, command[0]) 

1472 errors.append((i, e)) 

1473 

1474 # parse the EXEC. 

1475 try: 

1476 response = self.parse_response(connection, "_") 

1477 except ExecAbortError: 

1478 if errors: 

1479 raise errors[0][1] 

1480 raise 

1481 

1482 # EXEC clears any watched keys 

1483 self.watching = False 

1484 

1485 if response is None: 

1486 raise WatchError("Watched variable changed.") 

1487 

1488 # put any parse errors into the response 

1489 for i, e in errors: 

1490 response.insert(i, e) 

1491 

1492 if len(response) != len(commands): 

1493 self.connection.disconnect() 

1494 raise ResponseError( 

1495 "Wrong number of response items from pipeline execution" 

1496 ) 

1497 

1498 # find any errors in the response and raise if necessary 

1499 if raise_on_error: 

1500 self.raise_first_error(commands, response) 

1501 

1502 # We have to run response callbacks manually 

1503 data = [] 

1504 for r, cmd in zip(response, commands): 

1505 if not isinstance(r, Exception): 

1506 args, options = cmd 

1507 # Remove keys entry, it needs only for cache. 

1508 options.pop("keys", None) 

1509 command_name = args[0] 

1510 if command_name in self.response_callbacks: 

1511 r = self.response_callbacks[command_name](r, **options) 

1512 data.append(r) 

1513 return data 

1514 

1515 def _execute_pipeline(self, connection, commands, raise_on_error): 

1516 # build up all commands into a single request to increase network perf 

1517 all_cmds = connection.pack_commands([args for args, _ in commands]) 

1518 connection.send_packed_command(all_cmds) 

1519 

1520 response = [] 

1521 for args, options in commands: 

1522 try: 

1523 response.append(self.parse_response(connection, args[0], **options)) 

1524 except ResponseError as e: 

1525 response.append(e) 

1526 

1527 if raise_on_error: 

1528 self.raise_first_error(commands, response) 

1529 return response 

1530 

1531 def raise_first_error(self, commands, response): 

1532 for i, r in enumerate(response): 

1533 if isinstance(r, ResponseError): 

1534 self.annotate_exception(r, i + 1, commands[i][0]) 

1535 raise r 

1536 

1537 def annotate_exception(self, exception, number, command): 

1538 cmd = " ".join(map(safe_str, command)) 

1539 msg = ( 

1540 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

1541 f"caused error: {exception.args[0]}" 

1542 ) 

1543 exception.args = (msg,) + exception.args[1:] 

1544 

1545 def parse_response(self, connection, command_name, **options): 

1546 result = Redis.parse_response(self, connection, command_name, **options) 

1547 if command_name in self.UNWATCH_COMMANDS: 

1548 self.watching = False 

1549 elif command_name == "WATCH": 

1550 self.watching = True 

1551 return result 

1552 

1553 def load_scripts(self): 

1554 # make sure all scripts that are about to be run on this pipeline exist 

1555 scripts = list(self.scripts) 

1556 immediate = self.immediate_execute_command 

1557 shas = [s.sha for s in scripts] 

1558 # we can't use the normal script_* methods because they would just 

1559 # get buffered in the pipeline. 

1560 exists = immediate("SCRIPT EXISTS", *shas) 

1561 if not all(exists): 

1562 for s, exist in zip(scripts, exists): 

1563 if not exist: 

1564 s.sha = immediate("SCRIPT LOAD", s.script) 

1565 

1566 def _disconnect_raise_on_watching( 

1567 self, 

1568 conn: AbstractConnection, 

1569 error: Exception, 

1570 ) -> None: 

1571 """ 

1572 Close the connection, raise an exception if we were watching. 

1573 

1574 The supported exceptions are already checked in the 

1575 retry object so we don't need to do it here. 

1576 

1577 After we disconnect the connection, it will try to reconnect and 

1578 do a health check as part of the send_command logic(on connection level). 

1579 """ 

1580 conn.disconnect() 

1581 # if we were watching a variable, the watch is no longer valid 

1582 # since this connection has died. raise a WatchError, which 

1583 # indicates the user should retry this transaction. 

1584 if self.watching: 

1585 raise WatchError( 

1586 f"A {type(error).__name__} occurred while watching one or more keys" 

1587 ) 

1588 

1589 def execute(self, raise_on_error: bool = True) -> List[Any]: 

1590 """Execute all the commands in the current pipeline""" 

1591 stack = self.command_stack 

1592 if not stack and not self.watching: 

1593 return [] 

1594 if self.scripts: 

1595 self.load_scripts() 

1596 if self.transaction or self.explicit_transaction: 

1597 execute = self._execute_transaction 

1598 else: 

1599 execute = self._execute_pipeline 

1600 

1601 conn = self.connection 

1602 if not conn: 

1603 conn = self.connection_pool.get_connection() 

1604 # assign to self.connection so reset() releases the connection 

1605 # back to the pool after we're done 

1606 self.connection = conn 

1607 

1608 try: 

1609 return conn.retry.call_with_retry( 

1610 lambda: execute(conn, stack, raise_on_error), 

1611 lambda error: self._disconnect_raise_on_watching(conn, error), 

1612 ) 

1613 finally: 

1614 self.reset() 

1615 

1616 def discard(self): 

1617 """ 

1618 Flushes all previously queued commands 

1619 See: https://redis.io/commands/DISCARD 

1620 """ 

1621 self.execute_command("DISCARD") 

1622 

1623 def watch(self, *names): 

1624 """Watches the values at keys ``names``""" 

1625 if self.explicit_transaction: 

1626 raise RedisError("Cannot issue a WATCH after a MULTI") 

1627 return self.execute_command("WATCH", *names) 

1628 

1629 def unwatch(self) -> bool: 

1630 """Unwatches all previously specified keys""" 

1631 return self.watching and self.execute_command("UNWATCH") or True