Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

712 statements  

1import copy 

2import re 

3import threading 

4import time 

5from itertools import chain 

6from typing import ( 

7 TYPE_CHECKING, 

8 Any, 

9 Callable, 

10 Dict, 

11 List, 

12 Mapping, 

13 Optional, 

14 Set, 

15 Type, 

16 Union, 

17) 

18 

19from redis._parsers.encoders import Encoder 

20from redis._parsers.helpers import ( 

21 _RedisCallbacks, 

22 _RedisCallbacksRESP2, 

23 _RedisCallbacksRESP3, 

24 bool_ok, 

25) 

26from redis.backoff import ExponentialWithJitterBackoff 

27from redis.cache import CacheConfig, CacheInterface 

28from redis.commands import ( 

29 CoreCommands, 

30 RedisModuleCommands, 

31 SentinelCommands, 

32 list_or_args, 

33) 

34from redis.commands.core import Script 

35from redis.connection import ( 

36 AbstractConnection, 

37 Connection, 

38 ConnectionPool, 

39 SSLConnection, 

40 UnixDomainSocketConnection, 

41) 

42from redis.credentials import CredentialProvider 

43from redis.driver_info import DriverInfo, resolve_driver_info 

44from redis.event import ( 

45 AfterPooledConnectionsInstantiationEvent, 

46 AfterPubSubConnectionInstantiationEvent, 

47 AfterSingleConnectionInstantiationEvent, 

48 ClientType, 

49 EventDispatcher, 

50) 

51from redis.exceptions import ( 

52 ConnectionError, 

53 ExecAbortError, 

54 PubSubError, 

55 RedisError, 

56 ResponseError, 

57 WatchError, 

58) 

59from redis.lock import Lock 

60from redis.maint_notifications import ( 

61 MaintNotificationsConfig, 

62) 

63from redis.retry import Retry 

64from redis.utils import ( 

65 _set_info_logger, 

66 deprecated_args, 

67 safe_str, 

68 str_if_bytes, 

69 truncate_text, 

70) 

71 

72if TYPE_CHECKING: 

73 import ssl 

74 

75 import OpenSSL 

76 

77SYM_EMPTY = b"" 

78EMPTY_RESPONSE = "EMPTY_RESPONSE" 

79 

80# some responses (ie. dump) are binary, and just meant to never be decoded 

81NEVER_DECODE = "NEVER_DECODE" 

82 

83 

84class CaseInsensitiveDict(dict): 

85 "Case insensitive dict implementation. Assumes string keys only." 

86 

87 def __init__(self, data: Dict[str, str]) -> None: 

88 for k, v in data.items(): 

89 self[k.upper()] = v 

90 

91 def __contains__(self, k): 

92 return super().__contains__(k.upper()) 

93 

94 def __delitem__(self, k): 

95 super().__delitem__(k.upper()) 

96 

97 def __getitem__(self, k): 

98 return super().__getitem__(k.upper()) 

99 

100 def get(self, k, default=None): 

101 return super().get(k.upper(), default) 

102 

103 def __setitem__(self, k, v): 

104 super().__setitem__(k.upper(), v) 

105 

106 def update(self, data): 

107 data = CaseInsensitiveDict(data) 

108 super().update(data) 

109 

110 

111class AbstractRedis: 

112 pass 

113 

114 

115class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): 

116 """ 

117 Implementation of the Redis protocol. 

118 

119 This abstract class provides a Python interface to all Redis commands 

120 and an implementation of the Redis protocol. 

121 

122 Pipelines derive from this, implementing how 

123 the commands are sent and received to the Redis server. Based on 

124 configuration, an instance will either use a ConnectionPool, or 

125 Connection object to talk to redis. 

126 

127 It is not safe to pass PubSub or Pipeline objects between threads. 

128 """ 

129 

130 @classmethod 

131 def from_url(cls, url: str, **kwargs) -> "Redis": 

132 """ 

133 Return a Redis client object configured from the given URL 

134 

135 For example:: 

136 

137 redis://[[username]:[password]]@localhost:6379/0 

138 rediss://[[username]:[password]]@localhost:6379/0 

139 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

140 

141 Three URL schemes are supported: 

142 

143 - `redis://` creates a TCP socket connection. See more at: 

144 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

145 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

146 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

147 - ``unix://``: creates a Unix Domain Socket connection. 

148 

149 The username, password, hostname, path and all querystring values 

150 are passed through urllib.parse.unquote in order to replace any 

151 percent-encoded values with their corresponding characters. 

152 

153 There are several ways to specify a database number. The first value 

154 found will be used: 

155 

156 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

157 2. If using the redis:// or rediss:// schemes, the path argument 

158 of the url, e.g. redis://localhost/0 

159 3. A ``db`` keyword argument to this function. 

160 

161 If none of these options are specified, the default db=0 is used. 

162 

163 All querystring options are cast to their appropriate Python types. 

164 Boolean arguments can be specified with string values "True"/"False" 

165 or "Yes"/"No". Values that cannot be properly cast cause a 

166 ``ValueError`` to be raised. Once parsed, the querystring arguments 

167 and keyword arguments are passed to the ``ConnectionPool``'s 

168 class initializer. In the case of conflicting arguments, querystring 

169 arguments always win. 

170 

171 """ 

172 single_connection_client = kwargs.pop("single_connection_client", False) 

173 connection_pool = ConnectionPool.from_url(url, **kwargs) 

174 client = cls( 

175 connection_pool=connection_pool, 

176 single_connection_client=single_connection_client, 

177 ) 

178 client.auto_close_connection_pool = True 

179 return client 

180 

181 @classmethod 

182 def from_pool( 

183 cls: Type["Redis"], 

184 connection_pool: ConnectionPool, 

185 ) -> "Redis": 

186 """ 

187 Return a Redis client from the given connection pool. 

188 The Redis client will take ownership of the connection pool and 

189 close it when the Redis client is closed. 

190 """ 

191 client = cls( 

192 connection_pool=connection_pool, 

193 ) 

194 client.auto_close_connection_pool = True 

195 return client 

196 

197 @deprecated_args( 

198 args_to_warn=["retry_on_timeout"], 

199 reason="TimeoutError is included by default.", 

200 version="6.0.0", 

201 ) 

202 @deprecated_args( 

203 args_to_warn=["lib_name", "lib_version"], 

204 reason="Use 'driver_info' parameter instead. " 

205 "lib_name and lib_version will be removed in a future version.", 

206 ) 

207 def __init__( 

208 self, 

209 host: str = "localhost", 

210 port: int = 6379, 

211 db: int = 0, 

212 password: Optional[str] = None, 

213 socket_timeout: Optional[float] = None, 

214 socket_connect_timeout: Optional[float] = None, 

215 socket_keepalive: Optional[bool] = None, 

216 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, 

217 connection_pool: Optional[ConnectionPool] = None, 

218 unix_socket_path: Optional[str] = None, 

219 encoding: str = "utf-8", 

220 encoding_errors: str = "strict", 

221 decode_responses: bool = False, 

222 retry_on_timeout: bool = False, 

223 retry: Retry = Retry( 

224 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 

225 ), 

226 retry_on_error: Optional[List[Type[Exception]]] = None, 

227 ssl: bool = False, 

228 ssl_keyfile: Optional[str] = None, 

229 ssl_certfile: Optional[str] = None, 

230 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required", 

231 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

232 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

233 ssl_ca_certs: Optional[str] = None, 

234 ssl_ca_path: Optional[str] = None, 

235 ssl_ca_data: Optional[str] = None, 

236 ssl_check_hostname: bool = True, 

237 ssl_password: Optional[str] = None, 

238 ssl_validate_ocsp: bool = False, 

239 ssl_validate_ocsp_stapled: bool = False, 

240 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None, 

241 ssl_ocsp_expected_cert: Optional[str] = None, 

242 ssl_min_version: Optional["ssl.TLSVersion"] = None, 

243 ssl_ciphers: Optional[str] = None, 

244 max_connections: Optional[int] = None, 

245 single_connection_client: bool = False, 

246 health_check_interval: int = 0, 

247 client_name: Optional[str] = None, 

248 lib_name: Optional[str] = None, 

249 lib_version: Optional[str] = None, 

250 driver_info: Optional["DriverInfo"] = None, 

251 username: Optional[str] = None, 

252 redis_connect_func: Optional[Callable[[], None]] = None, 

253 credential_provider: Optional[CredentialProvider] = None, 

254 protocol: Optional[int] = 2, 

255 cache: Optional[CacheInterface] = None, 

256 cache_config: Optional[CacheConfig] = None, 

257 event_dispatcher: Optional[EventDispatcher] = None, 

258 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

259 ) -> None: 

260 """ 

261 Initialize a new Redis client. 

262 

263 To specify a retry policy for specific errors, you have two options: 

264 

265 1. Set the `retry_on_error` to a list of the error/s to retry on, and 

266 you can also set `retry` to a valid `Retry` object(in case the default 

267 one is not appropriate) - with this approach the retries will be triggered 

268 on the default errors specified in the Retry object enriched with the 

269 errors specified in `retry_on_error`. 

270 

271 2. Define a `Retry` object with configured 'supported_errors' and set 

272 it to the `retry` parameter - with this approach you completely redefine 

273 the errors on which retries will happen. 

274 

275 `retry_on_timeout` is deprecated - please include the TimeoutError 

276 either in the Retry object or in the `retry_on_error` list. 

277 

278 When 'connection_pool' is provided - the retry configuration of the 

279 provided pool will be used. 

280 

281 Args: 

282 

283 single_connection_client: 

284 if `True`, connection pool is not used. In that case `Redis` 

285 instance use is not thread safe. 

286 decode_responses: 

287 if `True`, the response will be decoded to utf-8. 

288 Argument is ignored when connection_pool is provided. 

289 driver_info: 

290 Optional DriverInfo object to identify upstream libraries. 

291 If provided, lib_name and lib_version are ignored. 

292 If not provided, a DriverInfo will be created from lib_name and lib_version. 

293 Argument is ignored when connection_pool is provided. 

294 lib_name: 

295 **Deprecated.** Use driver_info instead. Library name for CLIENT SETINFO. 

296 lib_version: 

297 **Deprecated.** Use driver_info instead. Library version for CLIENT SETINFO. 

298 maint_notifications_config: 

299 configuration the pool to support maintenance notifications - see 

300 `redis.maint_notifications.MaintNotificationsConfig` for details. 

301 Only supported with RESP3 

302 If not provided and protocol is RESP3, the maintenance notifications 

303 will be enabled by default (logic is included in the connection pool 

304 initialization). 

305 Argument is ignored when connection_pool is provided. 

306 """ 

307 if event_dispatcher is None: 

308 self._event_dispatcher = EventDispatcher() 

309 else: 

310 self._event_dispatcher = event_dispatcher 

311 if not connection_pool: 

312 if not retry_on_error: 

313 retry_on_error = [] 

314 

315 # Handle driver_info: if provided, use it; otherwise create from lib_name/lib_version 

316 computed_driver_info = resolve_driver_info( 

317 driver_info, lib_name, lib_version 

318 ) 

319 

320 kwargs = { 

321 "db": db, 

322 "username": username, 

323 "password": password, 

324 "socket_timeout": socket_timeout, 

325 "encoding": encoding, 

326 "encoding_errors": encoding_errors, 

327 "decode_responses": decode_responses, 

328 "retry_on_error": retry_on_error, 

329 "retry": copy.deepcopy(retry), 

330 "max_connections": max_connections, 

331 "health_check_interval": health_check_interval, 

332 "client_name": client_name, 

333 "driver_info": computed_driver_info, 

334 "redis_connect_func": redis_connect_func, 

335 "credential_provider": credential_provider, 

336 "protocol": protocol, 

337 } 

338 # based on input, setup appropriate connection args 

339 if unix_socket_path is not None: 

340 kwargs.update( 

341 { 

342 "path": unix_socket_path, 

343 "connection_class": UnixDomainSocketConnection, 

344 } 

345 ) 

346 else: 

347 # TCP specific options 

348 kwargs.update( 

349 { 

350 "host": host, 

351 "port": port, 

352 "socket_connect_timeout": socket_connect_timeout, 

353 "socket_keepalive": socket_keepalive, 

354 "socket_keepalive_options": socket_keepalive_options, 

355 } 

356 ) 

357 

358 if ssl: 

359 kwargs.update( 

360 { 

361 "connection_class": SSLConnection, 

362 "ssl_keyfile": ssl_keyfile, 

363 "ssl_certfile": ssl_certfile, 

364 "ssl_cert_reqs": ssl_cert_reqs, 

365 "ssl_include_verify_flags": ssl_include_verify_flags, 

366 "ssl_exclude_verify_flags": ssl_exclude_verify_flags, 

367 "ssl_ca_certs": ssl_ca_certs, 

368 "ssl_ca_data": ssl_ca_data, 

369 "ssl_check_hostname": ssl_check_hostname, 

370 "ssl_password": ssl_password, 

371 "ssl_ca_path": ssl_ca_path, 

372 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled, 

373 "ssl_validate_ocsp": ssl_validate_ocsp, 

374 "ssl_ocsp_context": ssl_ocsp_context, 

375 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert, 

376 "ssl_min_version": ssl_min_version, 

377 "ssl_ciphers": ssl_ciphers, 

378 } 

379 ) 

380 if (cache_config or cache) and protocol in [3, "3"]: 

381 kwargs.update( 

382 { 

383 "cache": cache, 

384 "cache_config": cache_config, 

385 } 

386 ) 

387 maint_notifications_enabled = ( 

388 maint_notifications_config and maint_notifications_config.enabled 

389 ) 

390 if maint_notifications_enabled and protocol not in [ 

391 3, 

392 "3", 

393 ]: 

394 raise RedisError( 

395 "Maintenance notifications handlers on connection are only supported with RESP version 3" 

396 ) 

397 if maint_notifications_config: 

398 kwargs.update( 

399 { 

400 "maint_notifications_config": maint_notifications_config, 

401 } 

402 ) 

403 connection_pool = ConnectionPool(**kwargs) 

404 self._event_dispatcher.dispatch( 

405 AfterPooledConnectionsInstantiationEvent( 

406 [connection_pool], ClientType.SYNC, credential_provider 

407 ) 

408 ) 

409 self.auto_close_connection_pool = True 

410 else: 

411 self.auto_close_connection_pool = False 

412 self._event_dispatcher.dispatch( 

413 AfterPooledConnectionsInstantiationEvent( 

414 [connection_pool], ClientType.SYNC, credential_provider 

415 ) 

416 ) 

417 

418 self.connection_pool = connection_pool 

419 

420 if (cache_config or cache) and self.connection_pool.get_protocol() not in [ 

421 3, 

422 "3", 

423 ]: 

424 raise RedisError("Client caching is only supported with RESP version 3") 

425 

426 self.single_connection_lock = threading.RLock() 

427 self.connection = None 

428 self._single_connection_client = single_connection_client 

429 if self._single_connection_client: 

430 self.connection = self.connection_pool.get_connection() 

431 self._event_dispatcher.dispatch( 

432 AfterSingleConnectionInstantiationEvent( 

433 self.connection, ClientType.SYNC, self.single_connection_lock 

434 ) 

435 ) 

436 

437 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks) 

438 

439 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]: 

440 self.response_callbacks.update(_RedisCallbacksRESP3) 

441 else: 

442 self.response_callbacks.update(_RedisCallbacksRESP2) 

443 

444 def __repr__(self) -> str: 

445 return ( 

446 f"<{type(self).__module__}.{type(self).__name__}" 

447 f"({repr(self.connection_pool)})>" 

448 ) 

449 

450 def get_encoder(self) -> "Encoder": 

451 """Get the connection pool's encoder""" 

452 return self.connection_pool.get_encoder() 

453 

454 def get_connection_kwargs(self) -> Dict: 

455 """Get the connection's key-word arguments""" 

456 return self.connection_pool.connection_kwargs 

457 

458 def get_retry(self) -> Optional[Retry]: 

459 return self.get_connection_kwargs().get("retry") 

460 

461 def set_retry(self, retry: Retry) -> None: 

462 self.get_connection_kwargs().update({"retry": retry}) 

463 self.connection_pool.set_retry(retry) 

464 

465 def set_response_callback(self, command: str, callback: Callable) -> None: 

466 """Set a custom Response Callback""" 

467 self.response_callbacks[command] = callback 

468 

469 def load_external_module(self, funcname, func) -> None: 

470 """ 

471 This function can be used to add externally defined redis modules, 

472 and their namespaces to the redis client. 

473 

474 funcname - A string containing the name of the function to create 

475 func - The function, being added to this class. 

476 

477 ex: Assume that one has a custom redis module named foomod that 

478 creates command named 'foo.dothing' and 'foo.anotherthing' in redis. 

479 To load function functions into this namespace: 

480 

481 from redis import Redis 

482 from foomodule import F 

483 r = Redis() 

484 r.load_external_module("foo", F) 

485 r.foo().dothing('your', 'arguments') 

486 

487 For a concrete example see the reimport of the redisjson module in 

488 tests/test_connection.py::test_loading_external_modules 

489 """ 

490 setattr(self, funcname, func) 

491 

492 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline": 

493 """ 

494 Return a new pipeline object that can queue multiple commands for 

495 later execution. ``transaction`` indicates whether all commands 

496 should be executed atomically. Apart from making a group of operations 

497 atomic, pipelines are useful for reducing the back-and-forth overhead 

498 between the client and server. 

499 """ 

500 return Pipeline( 

501 self.connection_pool, self.response_callbacks, transaction, shard_hint 

502 ) 

503 

504 def transaction( 

505 self, func: Callable[["Pipeline"], None], *watches, **kwargs 

506 ) -> Union[List[Any], Any, None]: 

507 """ 

508 Convenience method for executing the callable `func` as a transaction 

509 while watching all keys specified in `watches`. The 'func' callable 

510 should expect a single argument which is a Pipeline object. 

511 """ 

512 shard_hint = kwargs.pop("shard_hint", None) 

513 value_from_callable = kwargs.pop("value_from_callable", False) 

514 watch_delay = kwargs.pop("watch_delay", None) 

515 with self.pipeline(True, shard_hint) as pipe: 

516 while True: 

517 try: 

518 if watches: 

519 pipe.watch(*watches) 

520 func_value = func(pipe) 

521 exec_value = pipe.execute() 

522 return func_value if value_from_callable else exec_value 

523 except WatchError: 

524 if watch_delay is not None and watch_delay > 0: 

525 time.sleep(watch_delay) 

526 continue 

527 

528 def lock( 

529 self, 

530 name: str, 

531 timeout: Optional[float] = None, 

532 sleep: float = 0.1, 

533 blocking: bool = True, 

534 blocking_timeout: Optional[float] = None, 

535 lock_class: Union[None, Any] = None, 

536 thread_local: bool = True, 

537 raise_on_release_error: bool = True, 

538 ): 

539 """ 

540 Return a new Lock object using key ``name`` that mimics 

541 the behavior of threading.Lock. 

542 

543 If specified, ``timeout`` indicates a maximum life for the lock. 

544 By default, it will remain locked until release() is called. 

545 

546 ``sleep`` indicates the amount of time to sleep per loop iteration 

547 when the lock is in blocking mode and another client is currently 

548 holding the lock. 

549 

550 ``blocking`` indicates whether calling ``acquire`` should block until 

551 the lock has been acquired or to fail immediately, causing ``acquire`` 

552 to return False and the lock not being acquired. Defaults to True. 

553 Note this value can be overridden by passing a ``blocking`` 

554 argument to ``acquire``. 

555 

556 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

557 spend trying to acquire the lock. A value of ``None`` indicates 

558 continue trying forever. ``blocking_timeout`` can be specified as a 

559 float or integer, both representing the number of seconds to wait. 

560 

561 ``lock_class`` forces the specified lock implementation. Note that as 

562 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

563 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

564 you have created your own custom lock class. 

565 

566 ``thread_local`` indicates whether the lock token is placed in 

567 thread-local storage. By default, the token is placed in thread local 

568 storage so that a thread only sees its token, not a token set by 

569 another thread. Consider the following timeline: 

570 

571 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

572 thread-1 sets the token to "abc" 

573 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

574 Lock instance. 

575 time: 5, thread-1 has not yet completed. redis expires the lock 

576 key. 

577 time: 5, thread-2 acquired `my-lock` now that it's available. 

578 thread-2 sets the token to "xyz" 

579 time: 6, thread-1 finishes its work and calls release(). if the 

580 token is *not* stored in thread local storage, then 

581 thread-1 would see the token value as "xyz" and would be 

582 able to successfully release the thread-2's lock. 

583 

584 ``raise_on_release_error`` indicates whether to raise an exception when 

585 the lock is no longer owned when exiting the context manager. By default, 

586 this is True, meaning an exception will be raised. If False, the warning 

587 will be logged and the exception will be suppressed. 

588 

589 In some use cases it's necessary to disable thread local storage. For 

590 example, if you have code where one thread acquires a lock and passes 

591 that lock instance to a worker thread to release later. If thread 

592 local storage isn't disabled in this case, the worker thread won't see 

593 the token set by the thread that acquired the lock. Our assumption 

594 is that these cases aren't common and as such default to using 

595 thread local storage.""" 

596 if lock_class is None: 

597 lock_class = Lock 

598 return lock_class( 

599 self, 

600 name, 

601 timeout=timeout, 

602 sleep=sleep, 

603 blocking=blocking, 

604 blocking_timeout=blocking_timeout, 

605 thread_local=thread_local, 

606 raise_on_release_error=raise_on_release_error, 

607 ) 

608 

609 def pubsub(self, **kwargs): 

610 """ 

611 Return a Publish/Subscribe object. With this object, you can 

612 subscribe to channels and listen for messages that get published to 

613 them. 

614 """ 

615 return PubSub( 

616 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs 

617 ) 

618 

619 def monitor(self): 

620 return Monitor(self.connection_pool) 

621 

622 def client(self): 

623 return self.__class__( 

624 connection_pool=self.connection_pool, 

625 single_connection_client=True, 

626 ) 

627 

628 def __enter__(self): 

629 return self 

630 

631 def __exit__(self, exc_type, exc_value, traceback): 

632 self.close() 

633 

634 def __del__(self): 

635 try: 

636 self.close() 

637 except Exception: 

638 pass 

639 

640 def close(self) -> None: 

641 # In case a connection property does not yet exist 

642 # (due to a crash earlier in the Redis() constructor), return 

643 # immediately as there is nothing to clean-up. 

644 if not hasattr(self, "connection"): 

645 return 

646 

647 conn = self.connection 

648 if conn: 

649 self.connection = None 

650 self.connection_pool.release(conn) 

651 

652 if self.auto_close_connection_pool: 

653 self.connection_pool.disconnect() 

654 

655 def _send_command_parse_response(self, conn, command_name, *args, **options): 

656 """ 

657 Send a command and parse the response 

658 """ 

659 conn.send_command(*args, **options) 

660 return self.parse_response(conn, command_name, **options) 

661 

662 def _close_connection(self, conn) -> None: 

663 """ 

664 Close the connection before retrying. 

665 

666 The supported exceptions are already checked in the 

667 retry object so we don't need to do it here. 

668 

669 After we disconnect the connection, it will try to reconnect and 

670 do a health check as part of the send_command logic(on connection level). 

671 """ 

672 

673 conn.disconnect() 

674 

675 # COMMAND EXECUTION AND PROTOCOL PARSING 

676 def execute_command(self, *args, **options): 

677 return self._execute_command(*args, **options) 

678 

679 def _execute_command(self, *args, **options): 

680 """Execute a command and return a parsed response""" 

681 pool = self.connection_pool 

682 command_name = args[0] 

683 conn = self.connection or pool.get_connection() 

684 

685 if self._single_connection_client: 

686 self.single_connection_lock.acquire() 

687 try: 

688 return conn.retry.call_with_retry( 

689 lambda: self._send_command_parse_response( 

690 conn, command_name, *args, **options 

691 ), 

692 lambda _: self._close_connection(conn), 

693 ) 

694 

695 finally: 

696 if conn and conn.should_reconnect(): 

697 self._close_connection(conn) 

698 conn.connect() 

699 if self._single_connection_client: 

700 self.single_connection_lock.release() 

701 if not self.connection: 

702 pool.release(conn) 

703 

704 def parse_response(self, connection, command_name, **options): 

705 """Parses a response from the Redis server""" 

706 try: 

707 if NEVER_DECODE in options: 

708 response = connection.read_response(disable_decoding=True) 

709 options.pop(NEVER_DECODE) 

710 else: 

711 response = connection.read_response() 

712 except ResponseError: 

713 if EMPTY_RESPONSE in options: 

714 return options[EMPTY_RESPONSE] 

715 raise 

716 

717 if EMPTY_RESPONSE in options: 

718 options.pop(EMPTY_RESPONSE) 

719 

720 # Remove keys entry, it needs only for cache. 

721 options.pop("keys", None) 

722 

723 if command_name in self.response_callbacks: 

724 return self.response_callbacks[command_name](response, **options) 

725 return response 

726 

727 def get_cache(self) -> Optional[CacheInterface]: 

728 return self.connection_pool.cache 

729 

730 

731StrictRedis = Redis 

732 

733 

734class Monitor: 

735 """ 

736 Monitor is useful for handling the MONITOR command to the redis server. 

737 next_command() method returns one command from monitor 

738 listen() method yields commands from monitor. 

739 """ 

740 

741 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)") 

742 command_re = re.compile(r'"(.*?)(?<!\\)"') 

743 

744 def __init__(self, connection_pool): 

745 self.connection_pool = connection_pool 

746 self.connection = self.connection_pool.get_connection() 

747 

748 def __enter__(self): 

749 self._start_monitor() 

750 return self 

751 

752 def __exit__(self, *args): 

753 self.connection.disconnect() 

754 self.connection_pool.release(self.connection) 

755 

756 def next_command(self): 

757 """Parse the response from a monitor command""" 

758 response = self.connection.read_response() 

759 

760 if response is None: 

761 return None 

762 

763 if isinstance(response, bytes): 

764 response = self.connection.encoder.decode(response, force=True) 

765 

766 command_time, command_data = response.split(" ", 1) 

767 m = self.monitor_re.match(command_data) 

768 db_id, client_info, command = m.groups() 

769 command = " ".join(self.command_re.findall(command)) 

770 # Redis escapes double quotes because each piece of the command 

771 # string is surrounded by double quotes. We don't have that 

772 # requirement so remove the escaping and leave the quote. 

773 command = command.replace('\\"', '"') 

774 

775 if client_info == "lua": 

776 client_address = "lua" 

777 client_port = "" 

778 client_type = "lua" 

779 elif client_info.startswith("unix"): 

780 client_address = "unix" 

781 client_port = client_info[5:] 

782 client_type = "unix" 

783 else: 

784 if client_info == "": 

785 client_address = "" 

786 client_port = "" 

787 client_type = "unknown" 

788 else: 

789 # use rsplit as ipv6 addresses contain colons 

790 client_address, client_port = client_info.rsplit(":", 1) 

791 client_type = "tcp" 

792 return { 

793 "time": float(command_time), 

794 "db": int(db_id), 

795 "client_address": client_address, 

796 "client_port": client_port, 

797 "client_type": client_type, 

798 "command": command, 

799 } 

800 

801 def listen(self): 

802 """Listen for commands coming to the server.""" 

803 while True: 

804 yield self.next_command() 

805 

806 def _start_monitor(self): 

807 self.connection.send_command("MONITOR") 

808 # check that monitor returns 'OK', but don't return it to user 

809 response = self.connection.read_response() 

810 

811 if not bool_ok(response): 

812 raise RedisError(f"MONITOR failed: {response}") 

813 

814 

815class PubSub: 

816 """ 

817 PubSub provides publish, subscribe and listen support to Redis channels. 

818 

819 After subscribing to one or more channels, the listen() method will block 

820 until a message arrives on one of the subscribed channels. That message 

821 will be returned and it's safe to start listening again. 

822 """ 

823 

824 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage") 

825 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe") 

826 HEALTH_CHECK_MESSAGE = "redis-py-health-check" 

827 

828 def __init__( 

829 self, 

830 connection_pool, 

831 shard_hint=None, 

832 ignore_subscribe_messages: bool = False, 

833 encoder: Optional["Encoder"] = None, 

834 push_handler_func: Union[None, Callable[[str], None]] = None, 

835 event_dispatcher: Optional["EventDispatcher"] = None, 

836 ): 

837 self.connection_pool = connection_pool 

838 self.shard_hint = shard_hint 

839 self.ignore_subscribe_messages = ignore_subscribe_messages 

840 self.connection = None 

841 self.subscribed_event = threading.Event() 

842 # we need to know the encoding options for this connection in order 

843 # to lookup channel and pattern names for callback handlers. 

844 self.encoder = encoder 

845 self.push_handler_func = push_handler_func 

846 if event_dispatcher is None: 

847 self._event_dispatcher = EventDispatcher() 

848 else: 

849 self._event_dispatcher = event_dispatcher 

850 

851 self._lock = threading.RLock() 

852 if self.encoder is None: 

853 self.encoder = self.connection_pool.get_encoder() 

854 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) 

855 if self.encoder.decode_responses: 

856 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] 

857 else: 

858 self.health_check_response = [b"pong", self.health_check_response_b] 

859 if self.push_handler_func is None: 

860 _set_info_logger() 

861 self.reset() 

862 

863 def __enter__(self) -> "PubSub": 

864 return self 

865 

866 def __exit__(self, exc_type, exc_value, traceback) -> None: 

867 self.reset() 

868 

869 def __del__(self) -> None: 

870 try: 

871 # if this object went out of scope prior to shutting down 

872 # subscriptions, close the connection manually before 

873 # returning it to the connection pool 

874 self.reset() 

875 except Exception: 

876 pass 

877 

878 def reset(self) -> None: 

879 if self.connection: 

880 self.connection.disconnect() 

881 self.connection.deregister_connect_callback(self.on_connect) 

882 self.connection_pool.release(self.connection) 

883 self.connection = None 

884 self.health_check_response_counter = 0 

885 self.channels = {} 

886 self.pending_unsubscribe_channels = set() 

887 self.shard_channels = {} 

888 self.pending_unsubscribe_shard_channels = set() 

889 self.patterns = {} 

890 self.pending_unsubscribe_patterns = set() 

891 self.subscribed_event.clear() 

892 

893 def close(self) -> None: 

894 self.reset() 

895 

896 def on_connect(self, connection) -> None: 

897 "Re-subscribe to any channels and patterns previously subscribed to" 

898 # NOTE: for python3, we can't pass bytestrings as keyword arguments 

899 # so we need to decode channel/pattern names back to unicode strings 

900 # before passing them to [p]subscribe. 

901 self.pending_unsubscribe_channels.clear() 

902 self.pending_unsubscribe_patterns.clear() 

903 self.pending_unsubscribe_shard_channels.clear() 

904 if self.channels: 

905 channels = { 

906 self.encoder.decode(k, force=True): v for k, v in self.channels.items() 

907 } 

908 self.subscribe(**channels) 

909 if self.patterns: 

910 patterns = { 

911 self.encoder.decode(k, force=True): v for k, v in self.patterns.items() 

912 } 

913 self.psubscribe(**patterns) 

914 if self.shard_channels: 

915 shard_channels = { 

916 self.encoder.decode(k, force=True): v 

917 for k, v in self.shard_channels.items() 

918 } 

919 self.ssubscribe(**shard_channels) 

920 

921 @property 

922 def subscribed(self) -> bool: 

923 """Indicates if there are subscriptions to any channels or patterns""" 

924 return self.subscribed_event.is_set() 

925 

926 def execute_command(self, *args): 

927 """Execute a publish/subscribe command""" 

928 

929 # NOTE: don't parse the response in this function -- it could pull a 

930 # legitimate message off the stack if the connection is already 

931 # subscribed to one or more channels 

932 

933 if self.connection is None: 

934 self.connection = self.connection_pool.get_connection() 

935 # register a callback that re-subscribes to any channels we 

936 # were listening to when we were disconnected 

937 self.connection.register_connect_callback(self.on_connect) 

938 if self.push_handler_func is not None: 

939 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

940 self._event_dispatcher.dispatch( 

941 AfterPubSubConnectionInstantiationEvent( 

942 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

943 ) 

944 ) 

945 connection = self.connection 

946 kwargs = {"check_health": not self.subscribed} 

947 if not self.subscribed: 

948 self.clean_health_check_responses() 

949 with self._lock: 

950 self._execute(connection, connection.send_command, *args, **kwargs) 

951 

952 def clean_health_check_responses(self) -> None: 

953 """ 

954 If any health check responses are present, clean them 

955 """ 

956 ttl = 10 

957 conn = self.connection 

958 while conn and self.health_check_response_counter > 0 and ttl > 0: 

959 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout): 

960 response = self._execute(conn, conn.read_response) 

961 if self.is_health_check_response(response): 

962 self.health_check_response_counter -= 1 

963 else: 

964 raise PubSubError( 

965 "A non health check response was cleaned by " 

966 "execute_command: {}".format(response) 

967 ) 

968 ttl -= 1 

969 

970 def _reconnect(self, conn) -> None: 

971 """ 

972 The supported exceptions are already checked in the 

973 retry object so we don't need to do it here. 

974 

975 In this error handler we are trying to reconnect to the server. 

976 """ 

977 conn.disconnect() 

978 conn.connect() 

979 

980 def _execute(self, conn, command, *args, **kwargs): 

981 """ 

982 Connect manually upon disconnection. If the Redis server is down, 

983 this will fail and raise a ConnectionError as desired. 

984 After reconnection, the ``on_connect`` callback should have been 

985 called by the # connection to resubscribe us to any channels and 

986 patterns we were previously listening to 

987 """ 

988 

989 if conn.should_reconnect(): 

990 self._reconnect(conn) 

991 

992 response = conn.retry.call_with_retry( 

993 lambda: command(*args, **kwargs), 

994 lambda _: self._reconnect(conn), 

995 ) 

996 

997 return response 

998 

999 def parse_response(self, block=True, timeout=0): 

1000 """Parse the response from a publish/subscribe command""" 

1001 conn = self.connection 

1002 if conn is None: 

1003 raise RuntimeError( 

1004 "pubsub connection not set: " 

1005 "did you forget to call subscribe() or psubscribe()?" 

1006 ) 

1007 

1008 self.check_health() 

1009 

1010 def try_read(): 

1011 if not block: 

1012 if not conn.can_read(timeout=timeout): 

1013 return None 

1014 else: 

1015 conn.connect() 

1016 return conn.read_response(disconnect_on_error=False, push_request=True) 

1017 

1018 response = self._execute(conn, try_read) 

1019 

1020 if self.is_health_check_response(response): 

1021 # ignore the health check message as user might not expect it 

1022 self.health_check_response_counter -= 1 

1023 return None 

1024 return response 

1025 

1026 def is_health_check_response(self, response) -> bool: 

1027 """ 

1028 Check if the response is a health check response. 

1029 If there are no subscriptions redis responds to PING command with a 

1030 bulk response, instead of a multi-bulk with "pong" and the response. 

1031 """ 

1032 if self.encoder.decode_responses: 

1033 return ( 

1034 response 

1035 in [ 

1036 self.health_check_response, # If there is a subscription 

1037 self.HEALTH_CHECK_MESSAGE, # If there are no subscriptions and decode_responses=True 

1038 ] 

1039 ) 

1040 else: 

1041 return ( 

1042 response 

1043 in [ 

1044 self.health_check_response, # If there is a subscription 

1045 self.health_check_response_b, # If there isn't a subscription and decode_responses=False 

1046 ] 

1047 ) 

1048 

1049 def check_health(self) -> None: 

1050 conn = self.connection 

1051 if conn is None: 

1052 raise RuntimeError( 

1053 "pubsub connection not set: " 

1054 "did you forget to call subscribe() or psubscribe()?" 

1055 ) 

1056 

1057 if conn.health_check_interval and time.monotonic() > conn.next_health_check: 

1058 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False) 

1059 self.health_check_response_counter += 1 

1060 

1061 def _normalize_keys(self, data) -> Dict: 

1062 """ 

1063 normalize channel/pattern names to be either bytes or strings 

1064 based on whether responses are automatically decoded. this saves us 

1065 from coercing the value for each message coming in. 

1066 """ 

1067 encode = self.encoder.encode 

1068 decode = self.encoder.decode 

1069 return {decode(encode(k)): v for k, v in data.items()} 

1070 

1071 def psubscribe(self, *args, **kwargs): 

1072 """ 

1073 Subscribe to channel patterns. Patterns supplied as keyword arguments 

1074 expect a pattern name as the key and a callable as the value. A 

1075 pattern's callable will be invoked automatically when a message is 

1076 received on that pattern rather than producing a message via 

1077 ``listen()``. 

1078 """ 

1079 if args: 

1080 args = list_or_args(args[0], args[1:]) 

1081 new_patterns = dict.fromkeys(args) 

1082 new_patterns.update(kwargs) 

1083 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys()) 

1084 # update the patterns dict AFTER we send the command. we don't want to 

1085 # subscribe twice to these patterns, once for the command and again 

1086 # for the reconnection. 

1087 new_patterns = self._normalize_keys(new_patterns) 

1088 self.patterns.update(new_patterns) 

1089 if not self.subscribed: 

1090 # Set the subscribed_event flag to True 

1091 self.subscribed_event.set() 

1092 # Clear the health check counter 

1093 self.health_check_response_counter = 0 

1094 self.pending_unsubscribe_patterns.difference_update(new_patterns) 

1095 return ret_val 

1096 

1097 def punsubscribe(self, *args): 

1098 """ 

1099 Unsubscribe from the supplied patterns. If empty, unsubscribe from 

1100 all patterns. 

1101 """ 

1102 if args: 

1103 args = list_or_args(args[0], args[1:]) 

1104 patterns = self._normalize_keys(dict.fromkeys(args)) 

1105 else: 

1106 patterns = self.patterns 

1107 self.pending_unsubscribe_patterns.update(patterns) 

1108 return self.execute_command("PUNSUBSCRIBE", *args) 

1109 

1110 def subscribe(self, *args, **kwargs): 

1111 """ 

1112 Subscribe to channels. Channels supplied as keyword arguments expect 

1113 a channel name as the key and a callable as the value. A channel's 

1114 callable will be invoked automatically when a message is received on 

1115 that channel rather than producing a message via ``listen()`` or 

1116 ``get_message()``. 

1117 """ 

1118 if args: 

1119 args = list_or_args(args[0], args[1:]) 

1120 new_channels = dict.fromkeys(args) 

1121 new_channels.update(kwargs) 

1122 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys()) 

1123 # update the channels dict AFTER we send the command. we don't want to 

1124 # subscribe twice to these channels, once for the command and again 

1125 # for the reconnection. 

1126 new_channels = self._normalize_keys(new_channels) 

1127 self.channels.update(new_channels) 

1128 if not self.subscribed: 

1129 # Set the subscribed_event flag to True 

1130 self.subscribed_event.set() 

1131 # Clear the health check counter 

1132 self.health_check_response_counter = 0 

1133 self.pending_unsubscribe_channels.difference_update(new_channels) 

1134 return ret_val 

1135 

1136 def unsubscribe(self, *args): 

1137 """ 

1138 Unsubscribe from the supplied channels. If empty, unsubscribe from 

1139 all channels 

1140 """ 

1141 if args: 

1142 args = list_or_args(args[0], args[1:]) 

1143 channels = self._normalize_keys(dict.fromkeys(args)) 

1144 else: 

1145 channels = self.channels 

1146 self.pending_unsubscribe_channels.update(channels) 

1147 return self.execute_command("UNSUBSCRIBE", *args) 

1148 

1149 def ssubscribe(self, *args, target_node=None, **kwargs): 

1150 """ 

1151 Subscribes the client to the specified shard channels. 

1152 Channels supplied as keyword arguments expect a channel name as the key 

1153 and a callable as the value. A channel's callable will be invoked automatically 

1154 when a message is received on that channel rather than producing a message via 

1155 ``listen()`` or ``get_sharded_message()``. 

1156 """ 

1157 if args: 

1158 args = list_or_args(args[0], args[1:]) 

1159 new_s_channels = dict.fromkeys(args) 

1160 new_s_channels.update(kwargs) 

1161 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys()) 

1162 # update the s_channels dict AFTER we send the command. we don't want to 

1163 # subscribe twice to these channels, once for the command and again 

1164 # for the reconnection. 

1165 new_s_channels = self._normalize_keys(new_s_channels) 

1166 self.shard_channels.update(new_s_channels) 

1167 if not self.subscribed: 

1168 # Set the subscribed_event flag to True 

1169 self.subscribed_event.set() 

1170 # Clear the health check counter 

1171 self.health_check_response_counter = 0 

1172 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels) 

1173 return ret_val 

1174 

1175 def sunsubscribe(self, *args, target_node=None): 

1176 """ 

1177 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from 

1178 all shard_channels 

1179 """ 

1180 if args: 

1181 args = list_or_args(args[0], args[1:]) 

1182 s_channels = self._normalize_keys(dict.fromkeys(args)) 

1183 else: 

1184 s_channels = self.shard_channels 

1185 self.pending_unsubscribe_shard_channels.update(s_channels) 

1186 return self.execute_command("SUNSUBSCRIBE", *args) 

1187 

1188 def listen(self): 

1189 "Listen for messages on channels this client has been subscribed to" 

1190 while self.subscribed: 

1191 response = self.handle_message(self.parse_response(block=True)) 

1192 if response is not None: 

1193 yield response 

1194 

1195 def get_message( 

1196 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0 

1197 ): 

1198 """ 

1199 Get the next message if one is available, otherwise None. 

1200 

1201 If timeout is specified, the system will wait for `timeout` seconds 

1202 before returning. Timeout should be specified as a floating point 

1203 number, or None, to wait indefinitely. 

1204 """ 

1205 if not self.subscribed: 

1206 # Wait for subscription 

1207 start_time = time.monotonic() 

1208 if self.subscribed_event.wait(timeout) is True: 

1209 # The connection was subscribed during the timeout time frame. 

1210 # The timeout should be adjusted based on the time spent 

1211 # waiting for the subscription 

1212 time_spent = time.monotonic() - start_time 

1213 timeout = max(0.0, timeout - time_spent) 

1214 else: 

1215 # The connection isn't subscribed to any channels or patterns, 

1216 # so no messages are available 

1217 return None 

1218 

1219 response = self.parse_response(block=(timeout is None), timeout=timeout) 

1220 

1221 if response: 

1222 return self.handle_message(response, ignore_subscribe_messages) 

1223 return None 

1224 

1225 get_sharded_message = get_message 

1226 

1227 def ping(self, message: Union[str, None] = None) -> bool: 

1228 """ 

1229 Ping the Redis server to test connectivity. 

1230 

1231 Sends a PING command to the Redis server and returns True if the server 

1232 responds with "PONG". 

1233 """ 

1234 args = ["PING", message] if message is not None else ["PING"] 

1235 return self.execute_command(*args) 

1236 

1237 def handle_message(self, response, ignore_subscribe_messages=False): 

1238 """ 

1239 Parses a pub/sub message. If the channel or pattern was subscribed to 

1240 with a message handler, the handler is invoked instead of a parsed 

1241 message being returned. 

1242 """ 

1243 if response is None: 

1244 return None 

1245 if isinstance(response, bytes): 

1246 response = [b"pong", response] if response != b"PONG" else [b"pong", b""] 

1247 

1248 message_type = str_if_bytes(response[0]) 

1249 if message_type == "pmessage": 

1250 message = { 

1251 "type": message_type, 

1252 "pattern": response[1], 

1253 "channel": response[2], 

1254 "data": response[3], 

1255 } 

1256 elif message_type == "pong": 

1257 message = { 

1258 "type": message_type, 

1259 "pattern": None, 

1260 "channel": None, 

1261 "data": response[1], 

1262 } 

1263 else: 

1264 message = { 

1265 "type": message_type, 

1266 "pattern": None, 

1267 "channel": response[1], 

1268 "data": response[2], 

1269 } 

1270 

1271 # if this is an unsubscribe message, remove it from memory 

1272 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 

1273 if message_type == "punsubscribe": 

1274 pattern = response[1] 

1275 if pattern in self.pending_unsubscribe_patterns: 

1276 self.pending_unsubscribe_patterns.remove(pattern) 

1277 self.patterns.pop(pattern, None) 

1278 elif message_type == "sunsubscribe": 

1279 s_channel = response[1] 

1280 if s_channel in self.pending_unsubscribe_shard_channels: 

1281 self.pending_unsubscribe_shard_channels.remove(s_channel) 

1282 self.shard_channels.pop(s_channel, None) 

1283 else: 

1284 channel = response[1] 

1285 if channel in self.pending_unsubscribe_channels: 

1286 self.pending_unsubscribe_channels.remove(channel) 

1287 self.channels.pop(channel, None) 

1288 if not self.channels and not self.patterns and not self.shard_channels: 

1289 # There are no subscriptions anymore, set subscribed_event flag 

1290 # to false 

1291 self.subscribed_event.clear() 

1292 

1293 if message_type in self.PUBLISH_MESSAGE_TYPES: 

1294 # if there's a message handler, invoke it 

1295 if message_type == "pmessage": 

1296 handler = self.patterns.get(message["pattern"], None) 

1297 elif message_type == "smessage": 

1298 handler = self.shard_channels.get(message["channel"], None) 

1299 else: 

1300 handler = self.channels.get(message["channel"], None) 

1301 if handler: 

1302 handler(message) 

1303 return None 

1304 elif message_type != "pong": 

1305 # this is a subscribe/unsubscribe message. ignore if we don't 

1306 # want them 

1307 if ignore_subscribe_messages or self.ignore_subscribe_messages: 

1308 return None 

1309 

1310 return message 

1311 

1312 def run_in_thread( 

1313 self, 

1314 sleep_time: float = 0.0, 

1315 daemon: bool = False, 

1316 exception_handler: Optional[Callable] = None, 

1317 pubsub=None, 

1318 sharded_pubsub: bool = False, 

1319 ) -> "PubSubWorkerThread": 

1320 for channel, handler in self.channels.items(): 

1321 if handler is None: 

1322 raise PubSubError(f"Channel: '{channel}' has no handler registered") 

1323 for pattern, handler in self.patterns.items(): 

1324 if handler is None: 

1325 raise PubSubError(f"Pattern: '{pattern}' has no handler registered") 

1326 for s_channel, handler in self.shard_channels.items(): 

1327 if handler is None: 

1328 raise PubSubError( 

1329 f"Shard Channel: '{s_channel}' has no handler registered" 

1330 ) 

1331 

1332 pubsub = self if pubsub is None else pubsub 

1333 thread = PubSubWorkerThread( 

1334 pubsub, 

1335 sleep_time, 

1336 daemon=daemon, 

1337 exception_handler=exception_handler, 

1338 sharded_pubsub=sharded_pubsub, 

1339 ) 

1340 thread.start() 

1341 return thread 

1342 

1343 

1344class PubSubWorkerThread(threading.Thread): 

1345 def __init__( 

1346 self, 

1347 pubsub, 

1348 sleep_time: float, 

1349 daemon: bool = False, 

1350 exception_handler: Union[ 

1351 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None 

1352 ] = None, 

1353 sharded_pubsub: bool = False, 

1354 ): 

1355 super().__init__() 

1356 self.daemon = daemon 

1357 self.pubsub = pubsub 

1358 self.sleep_time = sleep_time 

1359 self.exception_handler = exception_handler 

1360 self.sharded_pubsub = sharded_pubsub 

1361 self._running = threading.Event() 

1362 

1363 def run(self) -> None: 

1364 if self._running.is_set(): 

1365 return 

1366 self._running.set() 

1367 pubsub = self.pubsub 

1368 sleep_time = self.sleep_time 

1369 while self._running.is_set(): 

1370 try: 

1371 if not self.sharded_pubsub: 

1372 pubsub.get_message( 

1373 ignore_subscribe_messages=True, timeout=sleep_time 

1374 ) 

1375 else: 

1376 pubsub.get_sharded_message( 

1377 ignore_subscribe_messages=True, timeout=sleep_time 

1378 ) 

1379 except BaseException as e: 

1380 if self.exception_handler is None: 

1381 raise 

1382 self.exception_handler(e, pubsub, self) 

1383 pubsub.close() 

1384 

1385 def stop(self) -> None: 

1386 # trip the flag so the run loop exits. the run loop will 

1387 # close the pubsub connection, which disconnects the socket 

1388 # and returns the connection to the pool. 

1389 self._running.clear() 

1390 

1391 

1392class Pipeline(Redis): 

1393 """ 

1394 Pipelines provide a way to transmit multiple commands to the Redis server 

1395 in one transmission. This is convenient for batch processing, such as 

1396 saving all the values in a list to Redis. 

1397 

1398 All commands executed within a pipeline(when running in transactional mode, 

1399 which is the default behavior) are wrapped with MULTI and EXEC 

1400 calls. This guarantees all commands executed in the pipeline will be 

1401 executed atomically. 

1402 

1403 Any command raising an exception does *not* halt the execution of 

1404 subsequent commands in the pipeline. Instead, the exception is caught 

1405 and its instance is placed into the response list returned by execute(). 

1406 Code iterating over the response list should be able to deal with an 

1407 instance of an exception as a potential value. In general, these will be 

1408 ResponseError exceptions, such as those raised when issuing a command 

1409 on a key of a different datatype. 

1410 """ 

1411 

1412 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

1413 

1414 def __init__( 

1415 self, 

1416 connection_pool: ConnectionPool, 

1417 response_callbacks, 

1418 transaction, 

1419 shard_hint, 

1420 ): 

1421 self.connection_pool = connection_pool 

1422 self.connection: Optional[Connection] = None 

1423 self.response_callbacks = response_callbacks 

1424 self.transaction = transaction 

1425 self.shard_hint = shard_hint 

1426 self.watching = False 

1427 self.command_stack = [] 

1428 self.scripts: Set[Script] = set() 

1429 self.explicit_transaction = False 

1430 

1431 def __enter__(self) -> "Pipeline": 

1432 return self 

1433 

1434 def __exit__(self, exc_type, exc_value, traceback): 

1435 self.reset() 

1436 

1437 def __del__(self): 

1438 try: 

1439 self.reset() 

1440 except Exception: 

1441 pass 

1442 

1443 def __len__(self) -> int: 

1444 return len(self.command_stack) 

1445 

1446 def __bool__(self) -> bool: 

1447 """Pipeline instances should always evaluate to True""" 

1448 return True 

1449 

1450 def reset(self) -> None: 

1451 self.command_stack = [] 

1452 self.scripts = set() 

1453 # make sure to reset the connection state in the event that we were 

1454 # watching something 

1455 if self.watching and self.connection: 

1456 try: 

1457 # call this manually since our unwatch or 

1458 # immediate_execute_command methods can call reset() 

1459 self.connection.send_command("UNWATCH") 

1460 self.connection.read_response() 

1461 except ConnectionError: 

1462 # disconnect will also remove any previous WATCHes 

1463 self.connection.disconnect() 

1464 # clean up the other instance attributes 

1465 self.watching = False 

1466 self.explicit_transaction = False 

1467 

1468 # we can safely return the connection to the pool here since we're 

1469 # sure we're no longer WATCHing anything 

1470 if self.connection: 

1471 self.connection_pool.release(self.connection) 

1472 self.connection = None 

1473 

1474 def close(self) -> None: 

1475 """Close the pipeline""" 

1476 self.reset() 

1477 

1478 def multi(self) -> None: 

1479 """ 

1480 Start a transactional block of the pipeline after WATCH commands 

1481 are issued. End the transactional block with `execute`. 

1482 """ 

1483 if self.explicit_transaction: 

1484 raise RedisError("Cannot issue nested calls to MULTI") 

1485 if self.command_stack: 

1486 raise RedisError( 

1487 "Commands without an initial WATCH have already been issued" 

1488 ) 

1489 self.explicit_transaction = True 

1490 

1491 def execute_command(self, *args, **kwargs): 

1492 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction: 

1493 return self.immediate_execute_command(*args, **kwargs) 

1494 return self.pipeline_execute_command(*args, **kwargs) 

1495 

1496 def _disconnect_reset_raise_on_watching( 

1497 self, 

1498 conn: AbstractConnection, 

1499 error: Exception, 

1500 ) -> None: 

1501 """ 

1502 Close the connection reset watching state and 

1503 raise an exception if we were watching. 

1504 

1505 The supported exceptions are already checked in the 

1506 retry object so we don't need to do it here. 

1507 

1508 After we disconnect the connection, it will try to reconnect and 

1509 do a health check as part of the send_command logic(on connection level). 

1510 """ 

1511 conn.disconnect() 

1512 

1513 # if we were already watching a variable, the watch is no longer 

1514 # valid since this connection has died. raise a WatchError, which 

1515 # indicates the user should retry this transaction. 

1516 if self.watching: 

1517 self.reset() 

1518 raise WatchError( 

1519 f"A {type(error).__name__} occurred while watching one or more keys" 

1520 ) 

1521 

1522 def immediate_execute_command(self, *args, **options): 

1523 """ 

1524 Execute a command immediately, but don't auto-retry on the supported 

1525 errors for retry if we're already WATCHing a variable. 

1526 Used when issuing WATCH or subsequent commands retrieving their values but before 

1527 MULTI is called. 

1528 """ 

1529 command_name = args[0] 

1530 conn = self.connection 

1531 # if this is the first call, we need a connection 

1532 if not conn: 

1533 conn = self.connection_pool.get_connection() 

1534 self.connection = conn 

1535 

1536 return conn.retry.call_with_retry( 

1537 lambda: self._send_command_parse_response( 

1538 conn, command_name, *args, **options 

1539 ), 

1540 lambda error: self._disconnect_reset_raise_on_watching(conn, error), 

1541 ) 

1542 

1543 def pipeline_execute_command(self, *args, **options) -> "Pipeline": 

1544 """ 

1545 Stage a command to be executed when execute() is next called 

1546 

1547 Returns the current Pipeline object back so commands can be 

1548 chained together, such as: 

1549 

1550 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

1551 

1552 At some other point, you can then run: pipe.execute(), 

1553 which will execute all commands queued in the pipe. 

1554 """ 

1555 self.command_stack.append((args, options)) 

1556 return self 

1557 

1558 def _execute_transaction( 

1559 self, connection: Connection, commands, raise_on_error 

1560 ) -> List: 

1561 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) 

1562 all_cmds = connection.pack_commands( 

1563 [args for args, options in cmds if EMPTY_RESPONSE not in options] 

1564 ) 

1565 connection.send_packed_command(all_cmds) 

1566 errors = [] 

1567 

1568 # parse off the response for MULTI 

1569 # NOTE: we need to handle ResponseErrors here and continue 

1570 # so that we read all the additional command messages from 

1571 # the socket 

1572 try: 

1573 self.parse_response(connection, "_") 

1574 except ResponseError as e: 

1575 errors.append((0, e)) 

1576 

1577 # and all the other commands 

1578 for i, command in enumerate(commands): 

1579 if EMPTY_RESPONSE in command[1]: 

1580 errors.append((i, command[1][EMPTY_RESPONSE])) 

1581 else: 

1582 try: 

1583 self.parse_response(connection, "_") 

1584 except ResponseError as e: 

1585 self.annotate_exception(e, i + 1, command[0]) 

1586 errors.append((i, e)) 

1587 

1588 # parse the EXEC. 

1589 try: 

1590 response = self.parse_response(connection, "_") 

1591 except ExecAbortError: 

1592 if errors: 

1593 raise errors[0][1] 

1594 raise 

1595 

1596 # EXEC clears any watched keys 

1597 self.watching = False 

1598 

1599 if response is None: 

1600 raise WatchError("Watched variable changed.") 

1601 

1602 # put any parse errors into the response 

1603 for i, e in errors: 

1604 response.insert(i, e) 

1605 

1606 if len(response) != len(commands): 

1607 self.connection.disconnect() 

1608 raise ResponseError( 

1609 "Wrong number of response items from pipeline execution" 

1610 ) 

1611 

1612 # find any errors in the response and raise if necessary 

1613 if raise_on_error: 

1614 self.raise_first_error(commands, response) 

1615 

1616 # We have to run response callbacks manually 

1617 data = [] 

1618 for r, cmd in zip(response, commands): 

1619 if not isinstance(r, Exception): 

1620 args, options = cmd 

1621 # Remove keys entry, it needs only for cache. 

1622 options.pop("keys", None) 

1623 command_name = args[0] 

1624 if command_name in self.response_callbacks: 

1625 r = self.response_callbacks[command_name](r, **options) 

1626 data.append(r) 

1627 

1628 return data 

1629 

1630 def _execute_pipeline(self, connection, commands, raise_on_error): 

1631 # build up all commands into a single request to increase network perf 

1632 all_cmds = connection.pack_commands([args for args, _ in commands]) 

1633 connection.send_packed_command(all_cmds) 

1634 

1635 responses = [] 

1636 for args, options in commands: 

1637 try: 

1638 responses.append(self.parse_response(connection, args[0], **options)) 

1639 except ResponseError as e: 

1640 responses.append(e) 

1641 

1642 if raise_on_error: 

1643 self.raise_first_error(commands, responses) 

1644 

1645 return responses 

1646 

1647 def raise_first_error(self, commands, response): 

1648 for i, r in enumerate(response): 

1649 if isinstance(r, ResponseError): 

1650 self.annotate_exception(r, i + 1, commands[i][0]) 

1651 raise r 

1652 

1653 def annotate_exception(self, exception, number, command): 

1654 cmd = " ".join(map(safe_str, command)) 

1655 msg = ( 

1656 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

1657 f"caused error: {exception.args[0]}" 

1658 ) 

1659 exception.args = (msg,) + exception.args[1:] 

1660 

1661 def parse_response(self, connection, command_name, **options): 

1662 result = Redis.parse_response(self, connection, command_name, **options) 

1663 if command_name in self.UNWATCH_COMMANDS: 

1664 self.watching = False 

1665 elif command_name == "WATCH": 

1666 self.watching = True 

1667 return result 

1668 

1669 def load_scripts(self): 

1670 # make sure all scripts that are about to be run on this pipeline exist 

1671 scripts = list(self.scripts) 

1672 immediate = self.immediate_execute_command 

1673 shas = [s.sha for s in scripts] 

1674 # we can't use the normal script_* methods because they would just 

1675 # get buffered in the pipeline. 

1676 exists = immediate("SCRIPT EXISTS", *shas) 

1677 if not all(exists): 

1678 for s, exist in zip(scripts, exists): 

1679 if not exist: 

1680 s.sha = immediate("SCRIPT LOAD", s.script) 

1681 

1682 def _disconnect_raise_on_watching( 

1683 self, 

1684 conn: AbstractConnection, 

1685 error: Exception, 

1686 ) -> None: 

1687 """ 

1688 Close the connection, raise an exception if we were watching. 

1689 

1690 The supported exceptions are already checked in the 

1691 retry object so we don't need to do it here. 

1692 

1693 After we disconnect the connection, it will try to reconnect and 

1694 do a health check as part of the send_command logic(on connection level). 

1695 """ 

1696 conn.disconnect() 

1697 # if we were watching a variable, the watch is no longer valid 

1698 # since this connection has died. raise a WatchError, which 

1699 # indicates the user should retry this transaction. 

1700 if self.watching: 

1701 raise WatchError( 

1702 f"A {type(error).__name__} occurred while watching one or more keys" 

1703 ) 

1704 

1705 def execute(self, raise_on_error: bool = True) -> List[Any]: 

1706 """Execute all the commands in the current pipeline""" 

1707 stack = self.command_stack 

1708 if not stack and not self.watching: 

1709 return [] 

1710 if self.scripts: 

1711 self.load_scripts() 

1712 if self.transaction or self.explicit_transaction: 

1713 execute = self._execute_transaction 

1714 else: 

1715 execute = self._execute_pipeline 

1716 

1717 conn = self.connection 

1718 if not conn: 

1719 conn = self.connection_pool.get_connection() 

1720 # assign to self.connection so reset() releases the connection 

1721 # back to the pool after we're done 

1722 self.connection = conn 

1723 

1724 try: 

1725 return conn.retry.call_with_retry( 

1726 lambda: execute(conn, stack, raise_on_error), 

1727 lambda error: self._disconnect_raise_on_watching(conn, error), 

1728 ) 

1729 finally: 

1730 # in reset() the connection is disconnected before returned to the pool if 

1731 # it is marked for reconnect. 

1732 self.reset() 

1733 

1734 def discard(self): 

1735 """ 

1736 Flushes all previously queued commands 

1737 See: https://redis.io/commands/DISCARD 

1738 """ 

1739 self.execute_command("DISCARD") 

1740 

1741 def watch(self, *names): 

1742 """Watches the values at keys ``names``""" 

1743 if self.explicit_transaction: 

1744 raise RedisError("Cannot issue a WATCH after a MULTI") 

1745 return self.execute_command("WATCH", *names) 

1746 

1747 def unwatch(self) -> bool: 

1748 """Unwatches all previously specified keys""" 

1749 return self.watching and self.execute_command("UNWATCH") or True