Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

714 statements  

1import copy 

2import re 

3import threading 

4import time 

5from itertools import chain 

6from typing import ( 

7 TYPE_CHECKING, 

8 Any, 

9 Callable, 

10 Dict, 

11 List, 

12 Mapping, 

13 Optional, 

14 Set, 

15 Type, 

16 Union, 

17) 

18 

19from redis._parsers.encoders import Encoder 

20from redis._parsers.helpers import ( 

21 _RedisCallbacks, 

22 _RedisCallbacksRESP2, 

23 _RedisCallbacksRESP3, 

24 bool_ok, 

25) 

26from redis.backoff import ExponentialWithJitterBackoff 

27from redis.cache import CacheConfig, CacheInterface 

28from redis.commands import ( 

29 CoreCommands, 

30 RedisModuleCommands, 

31 SentinelCommands, 

32 list_or_args, 

33) 

34from redis.commands.core import Script 

35from redis.connection import ( 

36 AbstractConnection, 

37 Connection, 

38 ConnectionPool, 

39 SSLConnection, 

40 UnixDomainSocketConnection, 

41) 

42from redis.credentials import CredentialProvider 

43from redis.driver_info import DriverInfo, resolve_driver_info 

44from redis.event import ( 

45 AfterPooledConnectionsInstantiationEvent, 

46 AfterPubSubConnectionInstantiationEvent, 

47 AfterSingleConnectionInstantiationEvent, 

48 ClientType, 

49 EventDispatcher, 

50) 

51from redis.exceptions import ( 

52 ConnectionError, 

53 ExecAbortError, 

54 PubSubError, 

55 RedisError, 

56 ResponseError, 

57 WatchError, 

58) 

59from redis.lock import Lock 

60from redis.maint_notifications import ( 

61 MaintNotificationsConfig, 

62 OSSMaintNotificationsHandler, 

63) 

64from redis.retry import Retry 

65from redis.utils import ( 

66 _set_info_logger, 

67 check_protocol_version, 

68 deprecated_args, 

69 safe_str, 

70 str_if_bytes, 

71 truncate_text, 

72) 

73 

74if TYPE_CHECKING: 

75 import ssl 

76 

77 import OpenSSL 

78 

79SYM_EMPTY = b"" 

80EMPTY_RESPONSE = "EMPTY_RESPONSE" 

81 

82# some responses (ie. dump) are binary, and just meant to never be decoded 

83NEVER_DECODE = "NEVER_DECODE" 

84 

85 

86class CaseInsensitiveDict(dict): 

87 "Case insensitive dict implementation. Assumes string keys only." 

88 

89 def __init__(self, data: Dict[str, str]) -> None: 

90 for k, v in data.items(): 

91 self[k.upper()] = v 

92 

93 def __contains__(self, k): 

94 return super().__contains__(k.upper()) 

95 

96 def __delitem__(self, k): 

97 super().__delitem__(k.upper()) 

98 

99 def __getitem__(self, k): 

100 return super().__getitem__(k.upper()) 

101 

102 def get(self, k, default=None): 

103 return super().get(k.upper(), default) 

104 

105 def __setitem__(self, k, v): 

106 super().__setitem__(k.upper(), v) 

107 

108 def update(self, data): 

109 data = CaseInsensitiveDict(data) 

110 super().update(data) 

111 

112 

113class AbstractRedis: 

114 pass 

115 

116 

117class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): 

118 """ 

119 Implementation of the Redis protocol. 

120 

121 This abstract class provides a Python interface to all Redis commands 

122 and an implementation of the Redis protocol. 

123 

124 Pipelines derive from this, implementing how 

125 the commands are sent and received to the Redis server. Based on 

126 configuration, an instance will either use a ConnectionPool, or 

127 Connection object to talk to redis. 

128 

129 It is not safe to pass PubSub or Pipeline objects between threads. 

130 """ 

131 

132 @classmethod 

133 def from_url(cls, url: str, **kwargs) -> "Redis": 

134 """ 

135 Return a Redis client object configured from the given URL 

136 

137 For example:: 

138 

139 redis://[[username]:[password]]@localhost:6379/0 

140 rediss://[[username]:[password]]@localhost:6379/0 

141 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

142 

143 Three URL schemes are supported: 

144 

145 - `redis://` creates a TCP socket connection. See more at: 

146 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

147 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

148 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

149 - ``unix://``: creates a Unix Domain Socket connection. 

150 

151 The username, password, hostname, path and all querystring values 

152 are passed through urllib.parse.unquote in order to replace any 

153 percent-encoded values with their corresponding characters. 

154 

155 There are several ways to specify a database number. The first value 

156 found will be used: 

157 

158 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

159 2. If using the redis:// or rediss:// schemes, the path argument 

160 of the url, e.g. redis://localhost/0 

161 3. A ``db`` keyword argument to this function. 

162 

163 If none of these options are specified, the default db=0 is used. 

164 

165 All querystring options are cast to their appropriate Python types. 

166 Boolean arguments can be specified with string values "True"/"False" 

167 or "Yes"/"No". Values that cannot be properly cast cause a 

168 ``ValueError`` to be raised. Once parsed, the querystring arguments 

169 and keyword arguments are passed to the ``ConnectionPool``'s 

170 class initializer. In the case of conflicting arguments, querystring 

171 arguments always win. 

172 

173 """ 

174 single_connection_client = kwargs.pop("single_connection_client", False) 

175 connection_pool = ConnectionPool.from_url(url, **kwargs) 

176 client = cls( 

177 connection_pool=connection_pool, 

178 single_connection_client=single_connection_client, 

179 ) 

180 client.auto_close_connection_pool = True 

181 return client 

182 

183 @classmethod 

184 def from_pool( 

185 cls: Type["Redis"], 

186 connection_pool: ConnectionPool, 

187 ) -> "Redis": 

188 """ 

189 Return a Redis client from the given connection pool. 

190 The Redis client will take ownership of the connection pool and 

191 close it when the Redis client is closed. 

192 """ 

193 client = cls( 

194 connection_pool=connection_pool, 

195 ) 

196 client.auto_close_connection_pool = True 

197 return client 

198 

199 @deprecated_args( 

200 args_to_warn=["retry_on_timeout"], 

201 reason="TimeoutError is included by default.", 

202 version="6.0.0", 

203 ) 

204 @deprecated_args( 

205 args_to_warn=["lib_name", "lib_version"], 

206 reason="Use 'driver_info' parameter instead. " 

207 "lib_name and lib_version will be removed in a future version.", 

208 ) 

209 def __init__( 

210 self, 

211 host: str = "localhost", 

212 port: int = 6379, 

213 db: int = 0, 

214 password: Optional[str] = None, 

215 socket_timeout: Optional[float] = None, 

216 socket_connect_timeout: Optional[float] = None, 

217 socket_keepalive: Optional[bool] = None, 

218 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, 

219 connection_pool: Optional[ConnectionPool] = None, 

220 unix_socket_path: Optional[str] = None, 

221 encoding: str = "utf-8", 

222 encoding_errors: str = "strict", 

223 decode_responses: bool = False, 

224 retry_on_timeout: bool = False, 

225 retry: Retry = Retry( 

226 backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 

227 ), 

228 retry_on_error: Optional[List[Type[Exception]]] = None, 

229 ssl: bool = False, 

230 ssl_keyfile: Optional[str] = None, 

231 ssl_certfile: Optional[str] = None, 

232 ssl_cert_reqs: Union[str, "ssl.VerifyMode"] = "required", 

233 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

234 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

235 ssl_ca_certs: Optional[str] = None, 

236 ssl_ca_path: Optional[str] = None, 

237 ssl_ca_data: Optional[str] = None, 

238 ssl_check_hostname: bool = True, 

239 ssl_password: Optional[str] = None, 

240 ssl_validate_ocsp: bool = False, 

241 ssl_validate_ocsp_stapled: bool = False, 

242 ssl_ocsp_context: Optional["OpenSSL.SSL.Context"] = None, 

243 ssl_ocsp_expected_cert: Optional[str] = None, 

244 ssl_min_version: Optional["ssl.TLSVersion"] = None, 

245 ssl_ciphers: Optional[str] = None, 

246 max_connections: Optional[int] = None, 

247 single_connection_client: bool = False, 

248 health_check_interval: int = 0, 

249 client_name: Optional[str] = None, 

250 lib_name: Optional[str] = None, 

251 lib_version: Optional[str] = None, 

252 driver_info: Optional["DriverInfo"] = None, 

253 username: Optional[str] = None, 

254 redis_connect_func: Optional[Callable[[], None]] = None, 

255 credential_provider: Optional[CredentialProvider] = None, 

256 protocol: Optional[int] = 2, 

257 cache: Optional[CacheInterface] = None, 

258 cache_config: Optional[CacheConfig] = None, 

259 event_dispatcher: Optional[EventDispatcher] = None, 

260 maint_notifications_config: Optional[MaintNotificationsConfig] = None, 

261 oss_cluster_maint_notifications_handler: Optional[ 

262 OSSMaintNotificationsHandler 

263 ] = None, 

264 ) -> None: 

265 """ 

266 Initialize a new Redis client. 

267 

268 To specify a retry policy for specific errors, you have two options: 

269 

270 1. Set the `retry_on_error` to a list of the error/s to retry on, and 

271 you can also set `retry` to a valid `Retry` object(in case the default 

272 one is not appropriate) - with this approach the retries will be triggered 

273 on the default errors specified in the Retry object enriched with the 

274 errors specified in `retry_on_error`. 

275 

276 2. Define a `Retry` object with configured 'supported_errors' and set 

277 it to the `retry` parameter - with this approach you completely redefine 

278 the errors on which retries will happen. 

279 

280 `retry_on_timeout` is deprecated - please include the TimeoutError 

281 either in the Retry object or in the `retry_on_error` list. 

282 

283 When 'connection_pool' is provided - the retry configuration of the 

284 provided pool will be used. 

285 

286 Args: 

287 

288 single_connection_client: 

289 if `True`, connection pool is not used. In that case `Redis` 

290 instance use is not thread safe. 

291 decode_responses: 

292 if `True`, the response will be decoded to utf-8. 

293 Argument is ignored when connection_pool is provided. 

294 driver_info: 

295 Optional DriverInfo object to identify upstream libraries. 

296 If provided, lib_name and lib_version are ignored. 

297 If not provided, a DriverInfo will be created from lib_name and lib_version. 

298 Argument is ignored when connection_pool is provided. 

299 lib_name: 

300 **Deprecated.** Use driver_info instead. Library name for CLIENT SETINFO. 

301 lib_version: 

302 **Deprecated.** Use driver_info instead. Library version for CLIENT SETINFO. 

303 maint_notifications_config: 

304 configures the pool to support maintenance notifications - see 

305 `redis.maint_notifications.MaintNotificationsConfig` for details. 

306 Only supported with RESP3 

307 If not provided and protocol is RESP3, the maintenance notifications 

308 will be enabled by default (logic is included in the connection pool 

309 initialization). 

310 Argument is ignored when connection_pool is provided. 

311 oss_cluster_maint_notifications_handler: 

312 handler for OSS cluster notifications - see 

313 `redis.maint_notifications.OSSMaintNotificationsHandler` for details. 

314 Only supported with RESP3 

315 Argument is ignored when connection_pool is provided. 

316 """ 

317 if event_dispatcher is None: 

318 self._event_dispatcher = EventDispatcher() 

319 else: 

320 self._event_dispatcher = event_dispatcher 

321 if not connection_pool: 

322 if not retry_on_error: 

323 retry_on_error = [] 

324 

325 # Handle driver_info: if provided, use it; otherwise create from lib_name/lib_version 

326 computed_driver_info = resolve_driver_info( 

327 driver_info, lib_name, lib_version 

328 ) 

329 

330 kwargs = { 

331 "db": db, 

332 "username": username, 

333 "password": password, 

334 "socket_timeout": socket_timeout, 

335 "encoding": encoding, 

336 "encoding_errors": encoding_errors, 

337 "decode_responses": decode_responses, 

338 "retry_on_error": retry_on_error, 

339 "retry": copy.deepcopy(retry), 

340 "max_connections": max_connections, 

341 "health_check_interval": health_check_interval, 

342 "client_name": client_name, 

343 "driver_info": computed_driver_info, 

344 "redis_connect_func": redis_connect_func, 

345 "credential_provider": credential_provider, 

346 "protocol": protocol, 

347 } 

348 # based on input, setup appropriate connection args 

349 if unix_socket_path is not None: 

350 kwargs.update( 

351 { 

352 "path": unix_socket_path, 

353 "connection_class": UnixDomainSocketConnection, 

354 } 

355 ) 

356 else: 

357 # TCP specific options 

358 kwargs.update( 

359 { 

360 "host": host, 

361 "port": port, 

362 "socket_connect_timeout": socket_connect_timeout, 

363 "socket_keepalive": socket_keepalive, 

364 "socket_keepalive_options": socket_keepalive_options, 

365 } 

366 ) 

367 

368 if ssl: 

369 kwargs.update( 

370 { 

371 "connection_class": SSLConnection, 

372 "ssl_keyfile": ssl_keyfile, 

373 "ssl_certfile": ssl_certfile, 

374 "ssl_cert_reqs": ssl_cert_reqs, 

375 "ssl_include_verify_flags": ssl_include_verify_flags, 

376 "ssl_exclude_verify_flags": ssl_exclude_verify_flags, 

377 "ssl_ca_certs": ssl_ca_certs, 

378 "ssl_ca_data": ssl_ca_data, 

379 "ssl_check_hostname": ssl_check_hostname, 

380 "ssl_password": ssl_password, 

381 "ssl_ca_path": ssl_ca_path, 

382 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled, 

383 "ssl_validate_ocsp": ssl_validate_ocsp, 

384 "ssl_ocsp_context": ssl_ocsp_context, 

385 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert, 

386 "ssl_min_version": ssl_min_version, 

387 "ssl_ciphers": ssl_ciphers, 

388 } 

389 ) 

390 if (cache_config or cache) and check_protocol_version(protocol, 3): 

391 kwargs.update( 

392 { 

393 "cache": cache, 

394 "cache_config": cache_config, 

395 } 

396 ) 

397 maint_notifications_enabled = ( 

398 maint_notifications_config and maint_notifications_config.enabled 

399 ) 

400 if maint_notifications_enabled and protocol not in [ 

401 3, 

402 "3", 

403 ]: 

404 raise RedisError( 

405 "Maintenance notifications handlers on connection are only supported with RESP version 3" 

406 ) 

407 if maint_notifications_config: 

408 kwargs.update( 

409 { 

410 "maint_notifications_config": maint_notifications_config, 

411 } 

412 ) 

413 if oss_cluster_maint_notifications_handler: 

414 kwargs.update( 

415 { 

416 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler, 

417 } 

418 ) 

419 connection_pool = ConnectionPool(**kwargs) 

420 self._event_dispatcher.dispatch( 

421 AfterPooledConnectionsInstantiationEvent( 

422 [connection_pool], ClientType.SYNC, credential_provider 

423 ) 

424 ) 

425 self.auto_close_connection_pool = True 

426 else: 

427 self.auto_close_connection_pool = False 

428 self._event_dispatcher.dispatch( 

429 AfterPooledConnectionsInstantiationEvent( 

430 [connection_pool], ClientType.SYNC, credential_provider 

431 ) 

432 ) 

433 

434 self.connection_pool = connection_pool 

435 

436 if (cache_config or cache) and self.connection_pool.get_protocol() not in [ 

437 3, 

438 "3", 

439 ]: 

440 raise RedisError("Client caching is only supported with RESP version 3") 

441 

442 self.single_connection_lock = threading.RLock() 

443 self.connection = None 

444 self._single_connection_client = single_connection_client 

445 if self._single_connection_client: 

446 self.connection = self.connection_pool.get_connection() 

447 self._event_dispatcher.dispatch( 

448 AfterSingleConnectionInstantiationEvent( 

449 self.connection, ClientType.SYNC, self.single_connection_lock 

450 ) 

451 ) 

452 

453 self.response_callbacks = CaseInsensitiveDict(_RedisCallbacks) 

454 

455 if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]: 

456 self.response_callbacks.update(_RedisCallbacksRESP3) 

457 else: 

458 self.response_callbacks.update(_RedisCallbacksRESP2) 

459 

460 def __repr__(self) -> str: 

461 return ( 

462 f"<{type(self).__module__}.{type(self).__name__}" 

463 f"({repr(self.connection_pool)})>" 

464 ) 

465 

466 def get_encoder(self) -> "Encoder": 

467 """Get the connection pool's encoder""" 

468 return self.connection_pool.get_encoder() 

469 

470 def get_connection_kwargs(self) -> Dict: 

471 """Get the connection's key-word arguments""" 

472 return self.connection_pool.connection_kwargs 

473 

474 def get_retry(self) -> Optional[Retry]: 

475 return self.get_connection_kwargs().get("retry") 

476 

477 def set_retry(self, retry: Retry) -> None: 

478 self.get_connection_kwargs().update({"retry": retry}) 

479 self.connection_pool.set_retry(retry) 

480 

481 def set_response_callback(self, command: str, callback: Callable) -> None: 

482 """Set a custom Response Callback""" 

483 self.response_callbacks[command] = callback 

484 

485 def load_external_module(self, funcname, func) -> None: 

486 """ 

487 This function can be used to add externally defined redis modules, 

488 and their namespaces to the redis client. 

489 

490 funcname - A string containing the name of the function to create 

491 func - The function, being added to this class. 

492 

493 ex: Assume that one has a custom redis module named foomod that 

494 creates command named 'foo.dothing' and 'foo.anotherthing' in redis. 

495 To load function functions into this namespace: 

496 

497 from redis import Redis 

498 from foomodule import F 

499 r = Redis() 

500 r.load_external_module("foo", F) 

501 r.foo().dothing('your', 'arguments') 

502 

503 For a concrete example see the reimport of the redisjson module in 

504 tests/test_connection.py::test_loading_external_modules 

505 """ 

506 setattr(self, funcname, func) 

507 

508 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline": 

509 """ 

510 Return a new pipeline object that can queue multiple commands for 

511 later execution. ``transaction`` indicates whether all commands 

512 should be executed atomically. Apart from making a group of operations 

513 atomic, pipelines are useful for reducing the back-and-forth overhead 

514 between the client and server. 

515 """ 

516 return Pipeline( 

517 self.connection_pool, self.response_callbacks, transaction, shard_hint 

518 ) 

519 

520 def transaction( 

521 self, func: Callable[["Pipeline"], None], *watches, **kwargs 

522 ) -> Union[List[Any], Any, None]: 

523 """ 

524 Convenience method for executing the callable `func` as a transaction 

525 while watching all keys specified in `watches`. The 'func' callable 

526 should expect a single argument which is a Pipeline object. 

527 """ 

528 shard_hint = kwargs.pop("shard_hint", None) 

529 value_from_callable = kwargs.pop("value_from_callable", False) 

530 watch_delay = kwargs.pop("watch_delay", None) 

531 with self.pipeline(True, shard_hint) as pipe: 

532 while True: 

533 try: 

534 if watches: 

535 pipe.watch(*watches) 

536 func_value = func(pipe) 

537 exec_value = pipe.execute() 

538 return func_value if value_from_callable else exec_value 

539 except WatchError: 

540 if watch_delay is not None and watch_delay > 0: 

541 time.sleep(watch_delay) 

542 continue 

543 

544 def lock( 

545 self, 

546 name: str, 

547 timeout: Optional[float] = None, 

548 sleep: float = 0.1, 

549 blocking: bool = True, 

550 blocking_timeout: Optional[float] = None, 

551 lock_class: Union[None, Any] = None, 

552 thread_local: bool = True, 

553 raise_on_release_error: bool = True, 

554 ): 

555 """ 

556 Return a new Lock object using key ``name`` that mimics 

557 the behavior of threading.Lock. 

558 

559 If specified, ``timeout`` indicates a maximum life for the lock. 

560 By default, it will remain locked until release() is called. 

561 

562 ``sleep`` indicates the amount of time to sleep per loop iteration 

563 when the lock is in blocking mode and another client is currently 

564 holding the lock. 

565 

566 ``blocking`` indicates whether calling ``acquire`` should block until 

567 the lock has been acquired or to fail immediately, causing ``acquire`` 

568 to return False and the lock not being acquired. Defaults to True. 

569 Note this value can be overridden by passing a ``blocking`` 

570 argument to ``acquire``. 

571 

572 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

573 spend trying to acquire the lock. A value of ``None`` indicates 

574 continue trying forever. ``blocking_timeout`` can be specified as a 

575 float or integer, both representing the number of seconds to wait. 

576 

577 ``lock_class`` forces the specified lock implementation. Note that as 

578 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

579 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

580 you have created your own custom lock class. 

581 

582 ``thread_local`` indicates whether the lock token is placed in 

583 thread-local storage. By default, the token is placed in thread local 

584 storage so that a thread only sees its token, not a token set by 

585 another thread. Consider the following timeline: 

586 

587 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

588 thread-1 sets the token to "abc" 

589 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

590 Lock instance. 

591 time: 5, thread-1 has not yet completed. redis expires the lock 

592 key. 

593 time: 5, thread-2 acquired `my-lock` now that it's available. 

594 thread-2 sets the token to "xyz" 

595 time: 6, thread-1 finishes its work and calls release(). if the 

596 token is *not* stored in thread local storage, then 

597 thread-1 would see the token value as "xyz" and would be 

598 able to successfully release the thread-2's lock. 

599 

600 ``raise_on_release_error`` indicates whether to raise an exception when 

601 the lock is no longer owned when exiting the context manager. By default, 

602 this is True, meaning an exception will be raised. If False, the warning 

603 will be logged and the exception will be suppressed. 

604 

605 In some use cases it's necessary to disable thread local storage. For 

606 example, if you have code where one thread acquires a lock and passes 

607 that lock instance to a worker thread to release later. If thread 

608 local storage isn't disabled in this case, the worker thread won't see 

609 the token set by the thread that acquired the lock. Our assumption 

610 is that these cases aren't common and as such default to using 

611 thread local storage.""" 

612 if lock_class is None: 

613 lock_class = Lock 

614 return lock_class( 

615 self, 

616 name, 

617 timeout=timeout, 

618 sleep=sleep, 

619 blocking=blocking, 

620 blocking_timeout=blocking_timeout, 

621 thread_local=thread_local, 

622 raise_on_release_error=raise_on_release_error, 

623 ) 

624 

625 def pubsub(self, **kwargs): 

626 """ 

627 Return a Publish/Subscribe object. With this object, you can 

628 subscribe to channels and listen for messages that get published to 

629 them. 

630 """ 

631 return PubSub( 

632 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs 

633 ) 

634 

635 def monitor(self): 

636 return Monitor(self.connection_pool) 

637 

638 def client(self): 

639 return self.__class__( 

640 connection_pool=self.connection_pool, 

641 single_connection_client=True, 

642 ) 

643 

644 def __enter__(self): 

645 return self 

646 

647 def __exit__(self, exc_type, exc_value, traceback): 

648 self.close() 

649 

650 def __del__(self): 

651 try: 

652 self.close() 

653 except Exception: 

654 pass 

655 

656 def close(self) -> None: 

657 # In case a connection property does not yet exist 

658 # (due to a crash earlier in the Redis() constructor), return 

659 # immediately as there is nothing to clean-up. 

660 if not hasattr(self, "connection"): 

661 return 

662 

663 conn = self.connection 

664 if conn: 

665 self.connection = None 

666 self.connection_pool.release(conn) 

667 

668 if self.auto_close_connection_pool: 

669 self.connection_pool.disconnect() 

670 

671 def _send_command_parse_response(self, conn, command_name, *args, **options): 

672 """ 

673 Send a command and parse the response 

674 """ 

675 conn.send_command(*args, **options) 

676 return self.parse_response(conn, command_name, **options) 

677 

678 def _close_connection(self, conn) -> None: 

679 """ 

680 Close the connection before retrying. 

681 

682 The supported exceptions are already checked in the 

683 retry object so we don't need to do it here. 

684 

685 After we disconnect the connection, it will try to reconnect and 

686 do a health check as part of the send_command logic(on connection level). 

687 """ 

688 

689 conn.disconnect() 

690 

691 # COMMAND EXECUTION AND PROTOCOL PARSING 

692 def execute_command(self, *args, **options): 

693 return self._execute_command(*args, **options) 

694 

695 def _execute_command(self, *args, **options): 

696 """Execute a command and return a parsed response""" 

697 pool = self.connection_pool 

698 command_name = args[0] 

699 conn = self.connection or pool.get_connection() 

700 

701 if self._single_connection_client: 

702 self.single_connection_lock.acquire() 

703 try: 

704 return conn.retry.call_with_retry( 

705 lambda: self._send_command_parse_response( 

706 conn, command_name, *args, **options 

707 ), 

708 lambda _: self._close_connection(conn), 

709 ) 

710 

711 finally: 

712 if conn and conn.should_reconnect(): 

713 self._close_connection(conn) 

714 conn.connect() 

715 if self._single_connection_client: 

716 self.single_connection_lock.release() 

717 if not self.connection: 

718 pool.release(conn) 

719 

720 def parse_response(self, connection, command_name, **options): 

721 """Parses a response from the Redis server""" 

722 try: 

723 if NEVER_DECODE in options: 

724 response = connection.read_response(disable_decoding=True) 

725 options.pop(NEVER_DECODE) 

726 else: 

727 response = connection.read_response() 

728 except ResponseError: 

729 if EMPTY_RESPONSE in options: 

730 return options[EMPTY_RESPONSE] 

731 raise 

732 

733 if EMPTY_RESPONSE in options: 

734 options.pop(EMPTY_RESPONSE) 

735 

736 # Remove keys entry, it needs only for cache. 

737 options.pop("keys", None) 

738 

739 if command_name in self.response_callbacks: 

740 return self.response_callbacks[command_name](response, **options) 

741 return response 

742 

743 def get_cache(self) -> Optional[CacheInterface]: 

744 return self.connection_pool.cache 

745 

746 

747StrictRedis = Redis 

748 

749 

750class Monitor: 

751 """ 

752 Monitor is useful for handling the MONITOR command to the redis server. 

753 next_command() method returns one command from monitor 

754 listen() method yields commands from monitor. 

755 """ 

756 

757 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)") 

758 command_re = re.compile(r'"(.*?)(?<!\\)"') 

759 

760 def __init__(self, connection_pool): 

761 self.connection_pool = connection_pool 

762 self.connection = self.connection_pool.get_connection() 

763 

764 def __enter__(self): 

765 self._start_monitor() 

766 return self 

767 

768 def __exit__(self, *args): 

769 self.connection.disconnect() 

770 self.connection_pool.release(self.connection) 

771 

772 def next_command(self): 

773 """Parse the response from a monitor command""" 

774 response = self.connection.read_response() 

775 

776 if response is None: 

777 return None 

778 

779 if isinstance(response, bytes): 

780 response = self.connection.encoder.decode(response, force=True) 

781 

782 command_time, command_data = response.split(" ", 1) 

783 m = self.monitor_re.match(command_data) 

784 db_id, client_info, command = m.groups() 

785 command = " ".join(self.command_re.findall(command)) 

786 # Redis escapes double quotes because each piece of the command 

787 # string is surrounded by double quotes. We don't have that 

788 # requirement so remove the escaping and leave the quote. 

789 command = command.replace('\\"', '"') 

790 

791 if client_info == "lua": 

792 client_address = "lua" 

793 client_port = "" 

794 client_type = "lua" 

795 elif client_info.startswith("unix"): 

796 client_address = "unix" 

797 client_port = client_info[5:] 

798 client_type = "unix" 

799 else: 

800 if client_info == "": 

801 client_address = "" 

802 client_port = "" 

803 client_type = "unknown" 

804 else: 

805 # use rsplit as ipv6 addresses contain colons 

806 client_address, client_port = client_info.rsplit(":", 1) 

807 client_type = "tcp" 

808 return { 

809 "time": float(command_time), 

810 "db": int(db_id), 

811 "client_address": client_address, 

812 "client_port": client_port, 

813 "client_type": client_type, 

814 "command": command, 

815 } 

816 

817 def listen(self): 

818 """Listen for commands coming to the server.""" 

819 while True: 

820 yield self.next_command() 

821 

822 def _start_monitor(self): 

823 self.connection.send_command("MONITOR") 

824 # check that monitor returns 'OK', but don't return it to user 

825 response = self.connection.read_response() 

826 

827 if not bool_ok(response): 

828 raise RedisError(f"MONITOR failed: {response}") 

829 

830 

831class PubSub: 

832 """ 

833 PubSub provides publish, subscribe and listen support to Redis channels. 

834 

835 After subscribing to one or more channels, the listen() method will block 

836 until a message arrives on one of the subscribed channels. That message 

837 will be returned and it's safe to start listening again. 

838 """ 

839 

840 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage") 

841 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe") 

842 HEALTH_CHECK_MESSAGE = "redis-py-health-check" 

843 

844 def __init__( 

845 self, 

846 connection_pool, 

847 shard_hint=None, 

848 ignore_subscribe_messages: bool = False, 

849 encoder: Optional["Encoder"] = None, 

850 push_handler_func: Union[None, Callable[[str], None]] = None, 

851 event_dispatcher: Optional["EventDispatcher"] = None, 

852 ): 

853 self.connection_pool = connection_pool 

854 self.shard_hint = shard_hint 

855 self.ignore_subscribe_messages = ignore_subscribe_messages 

856 self.connection = None 

857 self.subscribed_event = threading.Event() 

858 # we need to know the encoding options for this connection in order 

859 # to lookup channel and pattern names for callback handlers. 

860 self.encoder = encoder 

861 self.push_handler_func = push_handler_func 

862 if event_dispatcher is None: 

863 self._event_dispatcher = EventDispatcher() 

864 else: 

865 self._event_dispatcher = event_dispatcher 

866 

867 self._lock = threading.RLock() 

868 if self.encoder is None: 

869 self.encoder = self.connection_pool.get_encoder() 

870 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) 

871 if self.encoder.decode_responses: 

872 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] 

873 else: 

874 self.health_check_response = [b"pong", self.health_check_response_b] 

875 if self.push_handler_func is None: 

876 _set_info_logger() 

877 self.reset() 

878 

879 def __enter__(self) -> "PubSub": 

880 return self 

881 

882 def __exit__(self, exc_type, exc_value, traceback) -> None: 

883 self.reset() 

884 

885 def __del__(self) -> None: 

886 try: 

887 # if this object went out of scope prior to shutting down 

888 # subscriptions, close the connection manually before 

889 # returning it to the connection pool 

890 self.reset() 

891 except Exception: 

892 pass 

893 

894 def reset(self) -> None: 

895 if self.connection: 

896 self.connection.disconnect() 

897 self.connection.deregister_connect_callback(self.on_connect) 

898 self.connection_pool.release(self.connection) 

899 self.connection = None 

900 self.health_check_response_counter = 0 

901 self.channels = {} 

902 self.pending_unsubscribe_channels = set() 

903 self.shard_channels = {} 

904 self.pending_unsubscribe_shard_channels = set() 

905 self.patterns = {} 

906 self.pending_unsubscribe_patterns = set() 

907 self.subscribed_event.clear() 

908 

909 def close(self) -> None: 

910 self.reset() 

911 

912 def on_connect(self, connection) -> None: 

913 "Re-subscribe to any channels and patterns previously subscribed to" 

914 # NOTE: for python3, we can't pass bytestrings as keyword arguments 

915 # so we need to decode channel/pattern names back to unicode strings 

916 # before passing them to [p]subscribe. 

917 self.pending_unsubscribe_channels.clear() 

918 self.pending_unsubscribe_patterns.clear() 

919 self.pending_unsubscribe_shard_channels.clear() 

920 if self.channels: 

921 channels = { 

922 self.encoder.decode(k, force=True): v for k, v in self.channels.items() 

923 } 

924 self.subscribe(**channels) 

925 if self.patterns: 

926 patterns = { 

927 self.encoder.decode(k, force=True): v for k, v in self.patterns.items() 

928 } 

929 self.psubscribe(**patterns) 

930 if self.shard_channels: 

931 shard_channels = { 

932 self.encoder.decode(k, force=True): v 

933 for k, v in self.shard_channels.items() 

934 } 

935 self.ssubscribe(**shard_channels) 

936 

937 @property 

938 def subscribed(self) -> bool: 

939 """Indicates if there are subscriptions to any channels or patterns""" 

940 return self.subscribed_event.is_set() 

941 

942 def execute_command(self, *args): 

943 """Execute a publish/subscribe command""" 

944 

945 # NOTE: don't parse the response in this function -- it could pull a 

946 # legitimate message off the stack if the connection is already 

947 # subscribed to one or more channels 

948 

949 if self.connection is None: 

950 self.connection = self.connection_pool.get_connection() 

951 # register a callback that re-subscribes to any channels we 

952 # were listening to when we were disconnected 

953 self.connection.register_connect_callback(self.on_connect) 

954 if self.push_handler_func is not None: 

955 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

956 self._event_dispatcher.dispatch( 

957 AfterPubSubConnectionInstantiationEvent( 

958 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

959 ) 

960 ) 

961 connection = self.connection 

962 kwargs = {"check_health": not self.subscribed} 

963 if not self.subscribed: 

964 self.clean_health_check_responses() 

965 with self._lock: 

966 self._execute(connection, connection.send_command, *args, **kwargs) 

967 

968 def clean_health_check_responses(self) -> None: 

969 """ 

970 If any health check responses are present, clean them 

971 """ 

972 ttl = 10 

973 conn = self.connection 

974 while conn and self.health_check_response_counter > 0 and ttl > 0: 

975 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout): 

976 response = self._execute(conn, conn.read_response) 

977 if self.is_health_check_response(response): 

978 self.health_check_response_counter -= 1 

979 else: 

980 raise PubSubError( 

981 "A non health check response was cleaned by " 

982 "execute_command: {}".format(response) 

983 ) 

984 ttl -= 1 

985 

986 def _reconnect(self, conn) -> None: 

987 """ 

988 The supported exceptions are already checked in the 

989 retry object so we don't need to do it here. 

990 

991 In this error handler we are trying to reconnect to the server. 

992 """ 

993 conn.disconnect() 

994 conn.connect() 

995 

996 def _execute(self, conn, command, *args, **kwargs): 

997 """ 

998 Connect manually upon disconnection. If the Redis server is down, 

999 this will fail and raise a ConnectionError as desired. 

1000 After reconnection, the ``on_connect`` callback should have been 

1001 called by the # connection to resubscribe us to any channels and 

1002 patterns we were previously listening to 

1003 """ 

1004 

1005 if conn.should_reconnect(): 

1006 self._reconnect(conn) 

1007 

1008 response = conn.retry.call_with_retry( 

1009 lambda: command(*args, **kwargs), 

1010 lambda _: self._reconnect(conn), 

1011 ) 

1012 

1013 return response 

1014 

1015 def parse_response(self, block=True, timeout=0): 

1016 """Parse the response from a publish/subscribe command""" 

1017 conn = self.connection 

1018 if conn is None: 

1019 raise RuntimeError( 

1020 "pubsub connection not set: " 

1021 "did you forget to call subscribe() or psubscribe()?" 

1022 ) 

1023 

1024 self.check_health() 

1025 

1026 def try_read(): 

1027 if not block: 

1028 if not conn.can_read(timeout=timeout): 

1029 return None 

1030 else: 

1031 conn.connect() 

1032 return conn.read_response(disconnect_on_error=False, push_request=True) 

1033 

1034 response = self._execute(conn, try_read) 

1035 

1036 if self.is_health_check_response(response): 

1037 # ignore the health check message as user might not expect it 

1038 self.health_check_response_counter -= 1 

1039 return None 

1040 return response 

1041 

1042 def is_health_check_response(self, response) -> bool: 

1043 """ 

1044 Check if the response is a health check response. 

1045 If there are no subscriptions redis responds to PING command with a 

1046 bulk response, instead of a multi-bulk with "pong" and the response. 

1047 """ 

1048 if self.encoder.decode_responses: 

1049 return ( 

1050 response 

1051 in [ 

1052 self.health_check_response, # If there is a subscription 

1053 self.HEALTH_CHECK_MESSAGE, # If there are no subscriptions and decode_responses=True 

1054 ] 

1055 ) 

1056 else: 

1057 return ( 

1058 response 

1059 in [ 

1060 self.health_check_response, # If there is a subscription 

1061 self.health_check_response_b, # If there isn't a subscription and decode_responses=False 

1062 ] 

1063 ) 

1064 

1065 def check_health(self) -> None: 

1066 conn = self.connection 

1067 if conn is None: 

1068 raise RuntimeError( 

1069 "pubsub connection not set: " 

1070 "did you forget to call subscribe() or psubscribe()?" 

1071 ) 

1072 

1073 if conn.health_check_interval and time.monotonic() > conn.next_health_check: 

1074 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False) 

1075 self.health_check_response_counter += 1 

1076 

1077 def _normalize_keys(self, data) -> Dict: 

1078 """ 

1079 normalize channel/pattern names to be either bytes or strings 

1080 based on whether responses are automatically decoded. this saves us 

1081 from coercing the value for each message coming in. 

1082 """ 

1083 encode = self.encoder.encode 

1084 decode = self.encoder.decode 

1085 return {decode(encode(k)): v for k, v in data.items()} 

1086 

1087 def psubscribe(self, *args, **kwargs): 

1088 """ 

1089 Subscribe to channel patterns. Patterns supplied as keyword arguments 

1090 expect a pattern name as the key and a callable as the value. A 

1091 pattern's callable will be invoked automatically when a message is 

1092 received on that pattern rather than producing a message via 

1093 ``listen()``. 

1094 """ 

1095 if args: 

1096 args = list_or_args(args[0], args[1:]) 

1097 new_patterns = dict.fromkeys(args) 

1098 new_patterns.update(kwargs) 

1099 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys()) 

1100 # update the patterns dict AFTER we send the command. we don't want to 

1101 # subscribe twice to these patterns, once for the command and again 

1102 # for the reconnection. 

1103 new_patterns = self._normalize_keys(new_patterns) 

1104 self.patterns.update(new_patterns) 

1105 if not self.subscribed: 

1106 # Set the subscribed_event flag to True 

1107 self.subscribed_event.set() 

1108 # Clear the health check counter 

1109 self.health_check_response_counter = 0 

1110 self.pending_unsubscribe_patterns.difference_update(new_patterns) 

1111 return ret_val 

1112 

1113 def punsubscribe(self, *args): 

1114 """ 

1115 Unsubscribe from the supplied patterns. If empty, unsubscribe from 

1116 all patterns. 

1117 """ 

1118 if args: 

1119 args = list_or_args(args[0], args[1:]) 

1120 patterns = self._normalize_keys(dict.fromkeys(args)) 

1121 else: 

1122 patterns = self.patterns 

1123 self.pending_unsubscribe_patterns.update(patterns) 

1124 return self.execute_command("PUNSUBSCRIBE", *args) 

1125 

1126 def subscribe(self, *args, **kwargs): 

1127 """ 

1128 Subscribe to channels. Channels supplied as keyword arguments expect 

1129 a channel name as the key and a callable as the value. A channel's 

1130 callable will be invoked automatically when a message is received on 

1131 that channel rather than producing a message via ``listen()`` or 

1132 ``get_message()``. 

1133 """ 

1134 if args: 

1135 args = list_or_args(args[0], args[1:]) 

1136 new_channels = dict.fromkeys(args) 

1137 new_channels.update(kwargs) 

1138 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys()) 

1139 # update the channels dict AFTER we send the command. we don't want to 

1140 # subscribe twice to these channels, once for the command and again 

1141 # for the reconnection. 

1142 new_channels = self._normalize_keys(new_channels) 

1143 self.channels.update(new_channels) 

1144 if not self.subscribed: 

1145 # Set the subscribed_event flag to True 

1146 self.subscribed_event.set() 

1147 # Clear the health check counter 

1148 self.health_check_response_counter = 0 

1149 self.pending_unsubscribe_channels.difference_update(new_channels) 

1150 return ret_val 

1151 

1152 def unsubscribe(self, *args): 

1153 """ 

1154 Unsubscribe from the supplied channels. If empty, unsubscribe from 

1155 all channels 

1156 """ 

1157 if args: 

1158 args = list_or_args(args[0], args[1:]) 

1159 channels = self._normalize_keys(dict.fromkeys(args)) 

1160 else: 

1161 channels = self.channels 

1162 self.pending_unsubscribe_channels.update(channels) 

1163 return self.execute_command("UNSUBSCRIBE", *args) 

1164 

1165 def ssubscribe(self, *args, target_node=None, **kwargs): 

1166 """ 

1167 Subscribes the client to the specified shard channels. 

1168 Channels supplied as keyword arguments expect a channel name as the key 

1169 and a callable as the value. A channel's callable will be invoked automatically 

1170 when a message is received on that channel rather than producing a message via 

1171 ``listen()`` or ``get_sharded_message()``. 

1172 """ 

1173 if args: 

1174 args = list_or_args(args[0], args[1:]) 

1175 new_s_channels = dict.fromkeys(args) 

1176 new_s_channels.update(kwargs) 

1177 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys()) 

1178 # update the s_channels dict AFTER we send the command. we don't want to 

1179 # subscribe twice to these channels, once for the command and again 

1180 # for the reconnection. 

1181 new_s_channels = self._normalize_keys(new_s_channels) 

1182 self.shard_channels.update(new_s_channels) 

1183 if not self.subscribed: 

1184 # Set the subscribed_event flag to True 

1185 self.subscribed_event.set() 

1186 # Clear the health check counter 

1187 self.health_check_response_counter = 0 

1188 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels) 

1189 return ret_val 

1190 

1191 def sunsubscribe(self, *args, target_node=None): 

1192 """ 

1193 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from 

1194 all shard_channels 

1195 """ 

1196 if args: 

1197 args = list_or_args(args[0], args[1:]) 

1198 s_channels = self._normalize_keys(dict.fromkeys(args)) 

1199 else: 

1200 s_channels = self.shard_channels 

1201 self.pending_unsubscribe_shard_channels.update(s_channels) 

1202 return self.execute_command("SUNSUBSCRIBE", *args) 

1203 

1204 def listen(self): 

1205 "Listen for messages on channels this client has been subscribed to" 

1206 while self.subscribed: 

1207 response = self.handle_message(self.parse_response(block=True)) 

1208 if response is not None: 

1209 yield response 

1210 

1211 def get_message( 

1212 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0 

1213 ): 

1214 """ 

1215 Get the next message if one is available, otherwise None. 

1216 

1217 If timeout is specified, the system will wait for `timeout` seconds 

1218 before returning. Timeout should be specified as a floating point 

1219 number, or None, to wait indefinitely. 

1220 """ 

1221 if not self.subscribed: 

1222 # Wait for subscription 

1223 start_time = time.monotonic() 

1224 if self.subscribed_event.wait(timeout) is True: 

1225 # The connection was subscribed during the timeout time frame. 

1226 # The timeout should be adjusted based on the time spent 

1227 # waiting for the subscription 

1228 time_spent = time.monotonic() - start_time 

1229 timeout = max(0.0, timeout - time_spent) 

1230 else: 

1231 # The connection isn't subscribed to any channels or patterns, 

1232 # so no messages are available 

1233 return None 

1234 

1235 response = self.parse_response(block=(timeout is None), timeout=timeout) 

1236 

1237 if response: 

1238 return self.handle_message(response, ignore_subscribe_messages) 

1239 return None 

1240 

1241 get_sharded_message = get_message 

1242 

1243 def ping(self, message: Union[str, None] = None) -> bool: 

1244 """ 

1245 Ping the Redis server to test connectivity. 

1246 

1247 Sends a PING command to the Redis server and returns True if the server 

1248 responds with "PONG". 

1249 """ 

1250 args = ["PING", message] if message is not None else ["PING"] 

1251 return self.execute_command(*args) 

1252 

1253 def handle_message(self, response, ignore_subscribe_messages=False): 

1254 """ 

1255 Parses a pub/sub message. If the channel or pattern was subscribed to 

1256 with a message handler, the handler is invoked instead of a parsed 

1257 message being returned. 

1258 """ 

1259 if response is None: 

1260 return None 

1261 if isinstance(response, bytes): 

1262 response = [b"pong", response] if response != b"PONG" else [b"pong", b""] 

1263 

1264 message_type = str_if_bytes(response[0]) 

1265 if message_type == "pmessage": 

1266 message = { 

1267 "type": message_type, 

1268 "pattern": response[1], 

1269 "channel": response[2], 

1270 "data": response[3], 

1271 } 

1272 elif message_type == "pong": 

1273 message = { 

1274 "type": message_type, 

1275 "pattern": None, 

1276 "channel": None, 

1277 "data": response[1], 

1278 } 

1279 else: 

1280 message = { 

1281 "type": message_type, 

1282 "pattern": None, 

1283 "channel": response[1], 

1284 "data": response[2], 

1285 } 

1286 

1287 # if this is an unsubscribe message, remove it from memory 

1288 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 

1289 if message_type == "punsubscribe": 

1290 pattern = response[1] 

1291 if pattern in self.pending_unsubscribe_patterns: 

1292 self.pending_unsubscribe_patterns.remove(pattern) 

1293 self.patterns.pop(pattern, None) 

1294 elif message_type == "sunsubscribe": 

1295 s_channel = response[1] 

1296 if s_channel in self.pending_unsubscribe_shard_channels: 

1297 self.pending_unsubscribe_shard_channels.remove(s_channel) 

1298 self.shard_channels.pop(s_channel, None) 

1299 else: 

1300 channel = response[1] 

1301 if channel in self.pending_unsubscribe_channels: 

1302 self.pending_unsubscribe_channels.remove(channel) 

1303 self.channels.pop(channel, None) 

1304 if not self.channels and not self.patterns and not self.shard_channels: 

1305 # There are no subscriptions anymore, set subscribed_event flag 

1306 # to false 

1307 self.subscribed_event.clear() 

1308 

1309 if message_type in self.PUBLISH_MESSAGE_TYPES: 

1310 # if there's a message handler, invoke it 

1311 if message_type == "pmessage": 

1312 handler = self.patterns.get(message["pattern"], None) 

1313 elif message_type == "smessage": 

1314 handler = self.shard_channels.get(message["channel"], None) 

1315 else: 

1316 handler = self.channels.get(message["channel"], None) 

1317 if handler: 

1318 handler(message) 

1319 return None 

1320 elif message_type != "pong": 

1321 # this is a subscribe/unsubscribe message. ignore if we don't 

1322 # want them 

1323 if ignore_subscribe_messages or self.ignore_subscribe_messages: 

1324 return None 

1325 

1326 return message 

1327 

1328 def run_in_thread( 

1329 self, 

1330 sleep_time: float = 0.0, 

1331 daemon: bool = False, 

1332 exception_handler: Optional[Callable] = None, 

1333 pubsub=None, 

1334 sharded_pubsub: bool = False, 

1335 ) -> "PubSubWorkerThread": 

1336 for channel, handler in self.channels.items(): 

1337 if handler is None: 

1338 raise PubSubError(f"Channel: '{channel}' has no handler registered") 

1339 for pattern, handler in self.patterns.items(): 

1340 if handler is None: 

1341 raise PubSubError(f"Pattern: '{pattern}' has no handler registered") 

1342 for s_channel, handler in self.shard_channels.items(): 

1343 if handler is None: 

1344 raise PubSubError( 

1345 f"Shard Channel: '{s_channel}' has no handler registered" 

1346 ) 

1347 

1348 pubsub = self if pubsub is None else pubsub 

1349 thread = PubSubWorkerThread( 

1350 pubsub, 

1351 sleep_time, 

1352 daemon=daemon, 

1353 exception_handler=exception_handler, 

1354 sharded_pubsub=sharded_pubsub, 

1355 ) 

1356 thread.start() 

1357 return thread 

1358 

1359 

1360class PubSubWorkerThread(threading.Thread): 

1361 def __init__( 

1362 self, 

1363 pubsub, 

1364 sleep_time: float, 

1365 daemon: bool = False, 

1366 exception_handler: Union[ 

1367 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None 

1368 ] = None, 

1369 sharded_pubsub: bool = False, 

1370 ): 

1371 super().__init__() 

1372 self.daemon = daemon 

1373 self.pubsub = pubsub 

1374 self.sleep_time = sleep_time 

1375 self.exception_handler = exception_handler 

1376 self.sharded_pubsub = sharded_pubsub 

1377 self._running = threading.Event() 

1378 

1379 def run(self) -> None: 

1380 if self._running.is_set(): 

1381 return 

1382 self._running.set() 

1383 pubsub = self.pubsub 

1384 sleep_time = self.sleep_time 

1385 while self._running.is_set(): 

1386 try: 

1387 if not self.sharded_pubsub: 

1388 pubsub.get_message( 

1389 ignore_subscribe_messages=True, timeout=sleep_time 

1390 ) 

1391 else: 

1392 pubsub.get_sharded_message( 

1393 ignore_subscribe_messages=True, timeout=sleep_time 

1394 ) 

1395 except BaseException as e: 

1396 if self.exception_handler is None: 

1397 raise 

1398 self.exception_handler(e, pubsub, self) 

1399 pubsub.close() 

1400 

1401 def stop(self) -> None: 

1402 # trip the flag so the run loop exits. the run loop will 

1403 # close the pubsub connection, which disconnects the socket 

1404 # and returns the connection to the pool. 

1405 self._running.clear() 

1406 

1407 

1408class Pipeline(Redis): 

1409 """ 

1410 Pipelines provide a way to transmit multiple commands to the Redis server 

1411 in one transmission. This is convenient for batch processing, such as 

1412 saving all the values in a list to Redis. 

1413 

1414 All commands executed within a pipeline(when running in transactional mode, 

1415 which is the default behavior) are wrapped with MULTI and EXEC 

1416 calls. This guarantees all commands executed in the pipeline will be 

1417 executed atomically. 

1418 

1419 Any command raising an exception does *not* halt the execution of 

1420 subsequent commands in the pipeline. Instead, the exception is caught 

1421 and its instance is placed into the response list returned by execute(). 

1422 Code iterating over the response list should be able to deal with an 

1423 instance of an exception as a potential value. In general, these will be 

1424 ResponseError exceptions, such as those raised when issuing a command 

1425 on a key of a different datatype. 

1426 """ 

1427 

1428 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

1429 

1430 def __init__( 

1431 self, 

1432 connection_pool: ConnectionPool, 

1433 response_callbacks, 

1434 transaction, 

1435 shard_hint, 

1436 ): 

1437 self.connection_pool = connection_pool 

1438 self.connection: Optional[Connection] = None 

1439 self.response_callbacks = response_callbacks 

1440 self.transaction = transaction 

1441 self.shard_hint = shard_hint 

1442 self.watching = False 

1443 self.command_stack = [] 

1444 self.scripts: Set[Script] = set() 

1445 self.explicit_transaction = False 

1446 

1447 def __enter__(self) -> "Pipeline": 

1448 return self 

1449 

1450 def __exit__(self, exc_type, exc_value, traceback): 

1451 self.reset() 

1452 

1453 def __del__(self): 

1454 try: 

1455 self.reset() 

1456 except Exception: 

1457 pass 

1458 

1459 def __len__(self) -> int: 

1460 return len(self.command_stack) 

1461 

1462 def __bool__(self) -> bool: 

1463 """Pipeline instances should always evaluate to True""" 

1464 return True 

1465 

1466 def reset(self) -> None: 

1467 self.command_stack = [] 

1468 self.scripts = set() 

1469 # make sure to reset the connection state in the event that we were 

1470 # watching something 

1471 if self.watching and self.connection: 

1472 try: 

1473 # call this manually since our unwatch or 

1474 # immediate_execute_command methods can call reset() 

1475 self.connection.send_command("UNWATCH") 

1476 self.connection.read_response() 

1477 except ConnectionError: 

1478 # disconnect will also remove any previous WATCHes 

1479 self.connection.disconnect() 

1480 # clean up the other instance attributes 

1481 self.watching = False 

1482 self.explicit_transaction = False 

1483 

1484 # we can safely return the connection to the pool here since we're 

1485 # sure we're no longer WATCHing anything 

1486 if self.connection: 

1487 self.connection_pool.release(self.connection) 

1488 self.connection = None 

1489 

1490 def close(self) -> None: 

1491 """Close the pipeline""" 

1492 self.reset() 

1493 

1494 def multi(self) -> None: 

1495 """ 

1496 Start a transactional block of the pipeline after WATCH commands 

1497 are issued. End the transactional block with `execute`. 

1498 """ 

1499 if self.explicit_transaction: 

1500 raise RedisError("Cannot issue nested calls to MULTI") 

1501 if self.command_stack: 

1502 raise RedisError( 

1503 "Commands without an initial WATCH have already been issued" 

1504 ) 

1505 self.explicit_transaction = True 

1506 

1507 def execute_command(self, *args, **kwargs): 

1508 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction: 

1509 return self.immediate_execute_command(*args, **kwargs) 

1510 return self.pipeline_execute_command(*args, **kwargs) 

1511 

1512 def _disconnect_reset_raise_on_watching( 

1513 self, 

1514 conn: AbstractConnection, 

1515 error: Exception, 

1516 ) -> None: 

1517 """ 

1518 Close the connection reset watching state and 

1519 raise an exception if we were watching. 

1520 

1521 The supported exceptions are already checked in the 

1522 retry object so we don't need to do it here. 

1523 

1524 After we disconnect the connection, it will try to reconnect and 

1525 do a health check as part of the send_command logic(on connection level). 

1526 """ 

1527 conn.disconnect() 

1528 

1529 # if we were already watching a variable, the watch is no longer 

1530 # valid since this connection has died. raise a WatchError, which 

1531 # indicates the user should retry this transaction. 

1532 if self.watching: 

1533 self.reset() 

1534 raise WatchError( 

1535 f"A {type(error).__name__} occurred while watching one or more keys" 

1536 ) 

1537 

1538 def immediate_execute_command(self, *args, **options): 

1539 """ 

1540 Execute a command immediately, but don't auto-retry on the supported 

1541 errors for retry if we're already WATCHing a variable. 

1542 Used when issuing WATCH or subsequent commands retrieving their values but before 

1543 MULTI is called. 

1544 """ 

1545 command_name = args[0] 

1546 conn = self.connection 

1547 # if this is the first call, we need a connection 

1548 if not conn: 

1549 conn = self.connection_pool.get_connection() 

1550 self.connection = conn 

1551 

1552 return conn.retry.call_with_retry( 

1553 lambda: self._send_command_parse_response( 

1554 conn, command_name, *args, **options 

1555 ), 

1556 lambda error: self._disconnect_reset_raise_on_watching(conn, error), 

1557 ) 

1558 

1559 def pipeline_execute_command(self, *args, **options) -> "Pipeline": 

1560 """ 

1561 Stage a command to be executed when execute() is next called 

1562 

1563 Returns the current Pipeline object back so commands can be 

1564 chained together, such as: 

1565 

1566 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

1567 

1568 At some other point, you can then run: pipe.execute(), 

1569 which will execute all commands queued in the pipe. 

1570 """ 

1571 self.command_stack.append((args, options)) 

1572 return self 

1573 

1574 def _execute_transaction( 

1575 self, connection: Connection, commands, raise_on_error 

1576 ) -> List: 

1577 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) 

1578 all_cmds = connection.pack_commands( 

1579 [args for args, options in cmds if EMPTY_RESPONSE not in options] 

1580 ) 

1581 connection.send_packed_command(all_cmds) 

1582 errors = [] 

1583 

1584 # parse off the response for MULTI 

1585 # NOTE: we need to handle ResponseErrors here and continue 

1586 # so that we read all the additional command messages from 

1587 # the socket 

1588 try: 

1589 self.parse_response(connection, "_") 

1590 except ResponseError as e: 

1591 errors.append((0, e)) 

1592 

1593 # and all the other commands 

1594 for i, command in enumerate(commands): 

1595 if EMPTY_RESPONSE in command[1]: 

1596 errors.append((i, command[1][EMPTY_RESPONSE])) 

1597 else: 

1598 try: 

1599 self.parse_response(connection, "_") 

1600 except ResponseError as e: 

1601 self.annotate_exception(e, i + 1, command[0]) 

1602 errors.append((i, e)) 

1603 

1604 # parse the EXEC. 

1605 try: 

1606 response = self.parse_response(connection, "_") 

1607 except ExecAbortError: 

1608 if errors: 

1609 raise errors[0][1] 

1610 raise 

1611 

1612 # EXEC clears any watched keys 

1613 self.watching = False 

1614 

1615 if response is None: 

1616 raise WatchError("Watched variable changed.") 

1617 

1618 # put any parse errors into the response 

1619 for i, e in errors: 

1620 response.insert(i, e) 

1621 

1622 if len(response) != len(commands): 

1623 self.connection.disconnect() 

1624 raise ResponseError( 

1625 "Wrong number of response items from pipeline execution" 

1626 ) 

1627 

1628 # find any errors in the response and raise if necessary 

1629 if raise_on_error: 

1630 self.raise_first_error(commands, response) 

1631 

1632 # We have to run response callbacks manually 

1633 data = [] 

1634 for r, cmd in zip(response, commands): 

1635 if not isinstance(r, Exception): 

1636 args, options = cmd 

1637 # Remove keys entry, it needs only for cache. 

1638 options.pop("keys", None) 

1639 command_name = args[0] 

1640 if command_name in self.response_callbacks: 

1641 r = self.response_callbacks[command_name](r, **options) 

1642 data.append(r) 

1643 

1644 return data 

1645 

1646 def _execute_pipeline(self, connection, commands, raise_on_error): 

1647 # build up all commands into a single request to increase network perf 

1648 all_cmds = connection.pack_commands([args for args, _ in commands]) 

1649 connection.send_packed_command(all_cmds) 

1650 

1651 responses = [] 

1652 for args, options in commands: 

1653 try: 

1654 responses.append(self.parse_response(connection, args[0], **options)) 

1655 except ResponseError as e: 

1656 responses.append(e) 

1657 

1658 if raise_on_error: 

1659 self.raise_first_error(commands, responses) 

1660 

1661 return responses 

1662 

1663 def raise_first_error(self, commands, response): 

1664 for i, r in enumerate(response): 

1665 if isinstance(r, ResponseError): 

1666 self.annotate_exception(r, i + 1, commands[i][0]) 

1667 raise r 

1668 

1669 def annotate_exception(self, exception, number, command): 

1670 cmd = " ".join(map(safe_str, command)) 

1671 msg = ( 

1672 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

1673 f"caused error: {exception.args[0]}" 

1674 ) 

1675 exception.args = (msg,) + exception.args[1:] 

1676 

1677 def parse_response(self, connection, command_name, **options): 

1678 result = Redis.parse_response(self, connection, command_name, **options) 

1679 if command_name in self.UNWATCH_COMMANDS: 

1680 self.watching = False 

1681 elif command_name == "WATCH": 

1682 self.watching = True 

1683 return result 

1684 

1685 def load_scripts(self): 

1686 # make sure all scripts that are about to be run on this pipeline exist 

1687 scripts = list(self.scripts) 

1688 immediate = self.immediate_execute_command 

1689 shas = [s.sha for s in scripts] 

1690 # we can't use the normal script_* methods because they would just 

1691 # get buffered in the pipeline. 

1692 exists = immediate("SCRIPT EXISTS", *shas) 

1693 if not all(exists): 

1694 for s, exist in zip(scripts, exists): 

1695 if not exist: 

1696 s.sha = immediate("SCRIPT LOAD", s.script) 

1697 

1698 def _disconnect_raise_on_watching( 

1699 self, 

1700 conn: AbstractConnection, 

1701 error: Exception, 

1702 ) -> None: 

1703 """ 

1704 Close the connection, raise an exception if we were watching. 

1705 

1706 The supported exceptions are already checked in the 

1707 retry object so we don't need to do it here. 

1708 

1709 After we disconnect the connection, it will try to reconnect and 

1710 do a health check as part of the send_command logic(on connection level). 

1711 """ 

1712 conn.disconnect() 

1713 # if we were watching a variable, the watch is no longer valid 

1714 # since this connection has died. raise a WatchError, which 

1715 # indicates the user should retry this transaction. 

1716 if self.watching: 

1717 raise WatchError( 

1718 f"A {type(error).__name__} occurred while watching one or more keys" 

1719 ) 

1720 

1721 def execute(self, raise_on_error: bool = True) -> List[Any]: 

1722 """Execute all the commands in the current pipeline""" 

1723 stack = self.command_stack 

1724 if not stack and not self.watching: 

1725 return [] 

1726 if self.scripts: 

1727 self.load_scripts() 

1728 if self.transaction or self.explicit_transaction: 

1729 execute = self._execute_transaction 

1730 else: 

1731 execute = self._execute_pipeline 

1732 

1733 conn = self.connection 

1734 if not conn: 

1735 conn = self.connection_pool.get_connection() 

1736 # assign to self.connection so reset() releases the connection 

1737 # back to the pool after we're done 

1738 self.connection = conn 

1739 

1740 try: 

1741 return conn.retry.call_with_retry( 

1742 lambda: execute(conn, stack, raise_on_error), 

1743 lambda error: self._disconnect_raise_on_watching(conn, error), 

1744 ) 

1745 finally: 

1746 # in reset() the connection is disconnected before returned to the pool if 

1747 # it is marked for reconnect. 

1748 self.reset() 

1749 

1750 def discard(self): 

1751 """ 

1752 Flushes all previously queued commands 

1753 See: https://redis.io/commands/DISCARD 

1754 """ 

1755 self.execute_command("DISCARD") 

1756 

1757 def watch(self, *names): 

1758 """Watches the values at keys ``names``""" 

1759 if self.explicit_transaction: 

1760 raise RedisError("Cannot issue a WATCH after a MULTI") 

1761 return self.execute_command("WATCH", *names) 

1762 

1763 def unwatch(self) -> bool: 

1764 """Unwatches all previously specified keys""" 

1765 return self.watching and self.execute_command("UNWATCH") or True