Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/client.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

794 statements  

1import copy 

2import logging 

3import re 

4import threading 

5import time 

6from itertools import chain 

7from typing import ( 

8 TYPE_CHECKING, 

9 Any, 

10 Callable, 

11 Dict, 

12 List, 

13 Literal, 

14 Mapping, 

15 Optional, 

16 Set, 

17 Type, 

18 Union, 

19) 

20 

21from redis._defaults import ( 

22 DEFAULT_RETRY_BASE, 

23 DEFAULT_RETRY_CAP, 

24 DEFAULT_RETRY_COUNT, 

25 DEFAULT_SOCKET_CONNECT_TIMEOUT, 

26 DEFAULT_SOCKET_READ_SIZE, 

27 DEFAULT_SOCKET_TIMEOUT, 

28) 

29from redis._parsers.encoders import Encoder 

30from redis._parsers.helpers import bool_ok, get_response_callbacks 

31from redis.backoff import ExponentialWithJitterBackoff 

32from redis.cache import CacheConfig, CacheInterface 

33from redis.commands import ( 

34 CoreCommands, 

35 RedisModuleCommands, 

36 SentinelCommands, 

37 list_or_args, 

38) 

39from redis.commands.core import Script 

40from redis.commands.helpers import parse_pubsub_subscriptions, pubsub_subscription_args 

41from redis.connection import ( 

42 AbstractConnection, 

43 Connection, 

44 ConnectionPool, 

45 SSLConnection, 

46 UnixDomainSocketConnection, 

47) 

48from redis.credentials import CredentialProvider 

49from redis.driver_info import DriverInfo, resolve_driver_info 

50from redis.event import ( 

51 AfterPooledConnectionsInstantiationEvent, 

52 AfterPubSubConnectionInstantiationEvent, 

53 AfterSingleConnectionInstantiationEvent, 

54 ClientType, 

55 EventDispatcher, 

56) 

57from redis.exceptions import ( 

58 ConnectionError, 

59 ExecAbortError, 

60 PubSubError, 

61 RedisError, 

62 ResponseError, 

63 WatchError, 

64) 

65from redis.lock import Lock 

66from redis.maint_notifications import ( 

67 MaintNotificationsConfig, 

68 OSSMaintNotificationsHandler, 

69) 

70from redis.observability.attributes import PubSubDirection 

71from redis.observability.recorder import ( 

72 record_error_count, 

73 record_operation_duration, 

74 record_pubsub_message, 

75) 

76from redis.retry import Retry 

77from redis.typing import ChannelT, PubSubHandler, Subscription 

78from redis.utils import ( 

79 SENTINEL, 

80 _set_info_logger, 

81 check_protocol_version, 

82 deprecated_args, 

83 safe_str, 

84 str_if_bytes, 

85 truncate_text, 

86) 

87 

88if TYPE_CHECKING: 

89 import ssl 

90 

91 import OpenSSL 

92 

93 from redis.keyspace_notifications import KeyspaceNotifications 

94 

95SYM_EMPTY = b"" 

96EMPTY_RESPONSE = "EMPTY_RESPONSE" 

97 

98# some responses (ie. dump) are binary, and just meant to never be decoded 

99NEVER_DECODE = "NEVER_DECODE" 

100 

101 

102logger = logging.getLogger(__name__) 

103 

104 

105def is_debug_log_enabled(): 

106 return logger.isEnabledFor(logging.DEBUG) 

107 

108 

109def add_debug_log_for_operation_failure(connection: "AbstractConnection"): 

110 logger.debug( 

111 f"Operation failed, " 

112 f"with connection: {connection}, details: {connection.extract_connection_details() if connection else 'no connection'}", 

113 ) 

114 

115 

116class CaseInsensitiveDict(dict): 

117 "Case insensitive dict implementation. Assumes string keys only." 

118 

119 def __init__(self, data: Dict[str, str]) -> None: 

120 for k, v in data.items(): 

121 self[k.upper()] = v 

122 

123 def __contains__(self, k): 

124 return super().__contains__(k.upper()) 

125 

126 def __delitem__(self, k): 

127 super().__delitem__(k.upper()) 

128 

129 def __getitem__(self, k): 

130 return super().__getitem__(k.upper()) 

131 

132 def get(self, k, default=None): 

133 return super().get(k.upper(), default) 

134 

135 def __setitem__(self, k, v): 

136 super().__setitem__(k.upper(), v) 

137 

138 def update(self, data): 

139 data = CaseInsensitiveDict(data) 

140 super().update(data) 

141 

142 

143class AbstractRedis: 

144 pass 

145 

146 

147class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): 

148 """ 

149 Implementation of the Redis protocol. 

150 

151 This abstract class provides a Python interface to all Redis commands 

152 and an implementation of the Redis protocol. 

153 

154 Pipelines derive from this, implementing how 

155 the commands are sent and received to the Redis server. Based on 

156 configuration, an instance will either use a ConnectionPool, or 

157 Connection object to talk to redis. 

158 

159 It is not safe to pass PubSub or Pipeline objects between threads. 

160 """ 

161 

162 # Type discrimination marker for @overload self-type pattern 

163 _is_async_client: Literal[False] = False 

164 

165 @classmethod 

166 def from_url(cls, url: str, **kwargs) -> "Redis": 

167 """ 

168 Return a Redis client object configured from the given URL 

169 

170 For example:: 

171 

172 redis://[[username]:[password]]@localhost:6379/0 

173 rediss://[[username]:[password]]@localhost:6379/0 

174 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

175 

176 Three URL schemes are supported: 

177 

178 - `redis://` creates a TCP socket connection. See more at: 

179 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

180 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

181 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

182 - ``unix://``: creates a Unix Domain Socket connection. 

183 

184 The username, password, hostname, path and all querystring values 

185 are passed through urllib.parse.unquote in order to replace any 

186 percent-encoded values with their corresponding characters. 

187 

188 There are several ways to specify a database number. The first value 

189 found will be used: 

190 

191 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

192 2. If using the redis:// or rediss:// schemes, the path argument 

193 of the url, e.g. redis://localhost/0 

194 3. A ``db`` keyword argument to this function. 

195 

196 If none of these options are specified, the default db=0 is used. 

197 

198 All querystring options are cast to their appropriate Python types. 

199 Boolean arguments can be specified with string values "True"/"False" 

200 or "Yes"/"No". Values that cannot be properly cast cause a 

201 ``ValueError`` to be raised. Once parsed, the querystring arguments 

202 and keyword arguments are passed to the ``ConnectionPool``'s 

203 class initializer. In the case of conflicting arguments, querystring 

204 arguments always win. 

205 

206 """ 

207 single_connection_client = kwargs.pop("single_connection_client", False) 

208 connection_pool = ConnectionPool.from_url(url, **kwargs) 

209 client = cls( 

210 connection_pool=connection_pool, 

211 single_connection_client=single_connection_client, 

212 ) 

213 client.auto_close_connection_pool = True 

214 return client 

215 

216 @classmethod 

217 def from_pool( 

218 cls: Type["Redis"], 

219 connection_pool: ConnectionPool, 

220 ) -> "Redis": 

221 """ 

222 Return a Redis client from the given connection pool. 

223 The Redis client will take ownership of the connection pool and 

224 close it when the Redis client is closed. 

225 """ 

226 client = cls( 

227 connection_pool=connection_pool, 

228 ) 

229 client.auto_close_connection_pool = True 

230 return client 

231 

232 @deprecated_args( 

233 args_to_warn=["retry_on_timeout"], 

234 reason="TimeoutError is included by default.", 

235 version="6.0.0", 

236 ) 

237 @deprecated_args( 

238 args_to_warn=["lib_name", "lib_version"], 

239 reason="Use 'driver_info' parameter instead. " 

240 "lib_name and lib_version will be removed in a future version.", 

241 ) 

242 def __init__( 

243 self, 

244 host: str = "localhost", 

245 port: int = 6379, 

246 db: int = 0, 

247 password: str | None = None, 

248 socket_timeout: float | None = DEFAULT_SOCKET_TIMEOUT, 

249 socket_connect_timeout: float | None = DEFAULT_SOCKET_CONNECT_TIMEOUT, 

250 socket_read_size: int = DEFAULT_SOCKET_READ_SIZE, 

251 socket_keepalive: bool | None = True, 

252 socket_keepalive_options: Mapping[int, int | bytes] | object | None = SENTINEL, 

253 connection_pool: ConnectionPool | None = None, 

254 unix_socket_path: str | None = None, 

255 encoding: str = "utf-8", 

256 encoding_errors: str = "strict", 

257 decode_responses: bool = False, 

258 retry_on_timeout: bool = False, 

259 retry: Retry = Retry( 

260 backoff=ExponentialWithJitterBackoff( 

261 base=DEFAULT_RETRY_BASE, cap=DEFAULT_RETRY_CAP 

262 ), 

263 retries=DEFAULT_RETRY_COUNT, 

264 ), 

265 retry_on_error: List[Type[Exception]] | None = None, 

266 ssl: bool = False, 

267 ssl_keyfile: str | None = None, 

268 ssl_certfile: str | None = None, 

269 ssl_cert_reqs: "str | ssl.VerifyMode" = "required", 

270 ssl_include_verify_flags: List["ssl.VerifyFlags"] | None = None, 

271 ssl_exclude_verify_flags: List["ssl.VerifyFlags"] | None = None, 

272 ssl_ca_certs: str | None = None, 

273 ssl_ca_path: str | None = None, 

274 ssl_ca_data: str | None = None, 

275 ssl_check_hostname: bool = True, 

276 ssl_password: str | None = None, 

277 ssl_validate_ocsp: bool = False, 

278 ssl_validate_ocsp_stapled: bool = False, 

279 ssl_ocsp_context: "OpenSSL.SSL.Context | None" = None, 

280 ssl_ocsp_expected_cert: str | None = None, 

281 ssl_min_version: "ssl.TLSVersion | None" = None, 

282 ssl_ciphers: str | None = None, 

283 max_connections: int | None = None, 

284 single_connection_client: bool = False, 

285 health_check_interval: int = 0, 

286 client_name: str | None = None, 

287 lib_name: str | object | None = SENTINEL, 

288 lib_version: str | object | None = SENTINEL, 

289 driver_info: DriverInfo | object | None = SENTINEL, 

290 username: str | None = None, 

291 redis_connect_func: Callable[[], None] | None = None, 

292 credential_provider: CredentialProvider | None = None, 

293 protocol: int | None = None, 

294 legacy_responses: bool = True, 

295 cache: CacheInterface | None = None, 

296 cache_config: CacheConfig | None = None, 

297 event_dispatcher: EventDispatcher | None = None, 

298 maint_notifications_config: MaintNotificationsConfig | None = None, 

299 oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler 

300 | None = None, 

301 ) -> None: 

302 """ 

303 Initialize a new Redis client. 

304 

305 To specify a retry policy for specific errors, you have two options: 

306 

307 1. Set the `retry_on_error` to a list of the error/s to retry on, and 

308 you can also set `retry` to a valid `Retry` object(in case the default 

309 one is not appropriate) - with this approach the retries will be triggered 

310 on the default errors specified in the Retry object enriched with the 

311 errors specified in `retry_on_error`. 

312 

313 2. Define a `Retry` object with configured 'supported_errors' and set 

314 it to the `retry` parameter - with this approach you completely redefine 

315 the errors on which retries will happen. 

316 

317 `retry_on_timeout` is deprecated - please include the TimeoutError 

318 either in the Retry object or in the `retry_on_error` list. 

319 

320 When 'connection_pool' is provided - the retry configuration of the 

321 provided pool will be used. 

322 

323 Args: 

324 

325 socket_keepalive: 

326 if `True`, TCP keepalive is enabled for TCP socket connections. 

327 Argument is ignored when connection_pool is provided. 

328 socket_keepalive_options: 

329 mapping of TCP keepalive socket option constants to values, for 

330 example `{socket.TCP_KEEPIDLE: 30}`. If left unspecified, redis-py 

331 uses TCP keepalive defaults when `socket_keepalive` is enabled: 

332 idle 30 seconds, interval 5 seconds, and 3 probes. Platform-specific 

333 options that are not available are skipped. Pass `None` or `{}` to 

334 avoid setting additional TCP keepalive options. Argument is ignored 

335 when connection_pool is provided. 

336 single_connection_client: 

337 if `True`, connection pool is not used. In that case `Redis` 

338 instance use is not thread safe. 

339 decode_responses: 

340 if `True`, the response will be decoded to utf-8. 

341 Argument is ignored when connection_pool is provided. 

342 driver_info: 

343 Optional DriverInfo object to identify upstream libraries. 

344 If provided, lib_name and lib_version are ignored. 

345 If not provided, a DriverInfo will be created from lib_name and lib_version. 

346 Explicit None disables CLIENT SETINFO. 

347 Argument is ignored when connection_pool is provided. 

348 lib_name: 

349 **Deprecated.** Use driver_info instead. Library name for CLIENT SETINFO. 

350 lib_version: 

351 **Deprecated.** Use driver_info instead. Library version for CLIENT SETINFO. 

352 maint_notifications_config: 

353 configures the pool to support maintenance notifications - see 

354 `redis.maint_notifications.MaintNotificationsConfig` for details. 

355 Only supported with RESP3 

356 If not provided and protocol is RESP3, the maintenance notifications 

357 will be enabled by default (logic is included in the connection pool 

358 initialization). 

359 Argument is ignored when connection_pool is provided. 

360 oss_cluster_maint_notifications_handler: 

361 handler for OSS cluster notifications - see 

362 `redis.maint_notifications.OSSMaintNotificationsHandler` for details. 

363 Only supported with RESP3 

364 Argument is ignored when connection_pool is provided. 

365 """ 

366 if event_dispatcher is None: 

367 self._event_dispatcher = EventDispatcher() 

368 else: 

369 self._event_dispatcher = event_dispatcher 

370 if not connection_pool: 

371 if not retry_on_error: 

372 retry_on_error = [] 

373 

374 # Handle driver_info: if provided, use it; otherwise create from lib_name/lib_version. 

375 computed_driver_info = resolve_driver_info( 

376 driver_info, lib_name, lib_version 

377 ) 

378 

379 kwargs = { 

380 "db": db, 

381 "username": username, 

382 "password": password, 

383 "socket_timeout": socket_timeout, 

384 "socket_read_size": socket_read_size, 

385 "encoding": encoding, 

386 "encoding_errors": encoding_errors, 

387 "decode_responses": decode_responses, 

388 "retry_on_error": retry_on_error, 

389 "retry": copy.deepcopy(retry), 

390 "max_connections": max_connections, 

391 "health_check_interval": health_check_interval, 

392 "client_name": client_name, 

393 "driver_info": computed_driver_info, 

394 "redis_connect_func": redis_connect_func, 

395 "credential_provider": credential_provider, 

396 "protocol": protocol, 

397 "legacy_responses": legacy_responses, 

398 } 

399 # based on input, setup appropriate connection args 

400 if unix_socket_path is not None: 

401 if ( 

402 maint_notifications_config 

403 and maint_notifications_config.enabled is True 

404 ): 

405 raise RedisError( 

406 "Maintenance notifications are not supported with Unix " 

407 "domain socket connections" 

408 ) 

409 kwargs.update( 

410 { 

411 "path": unix_socket_path, 

412 "connection_class": UnixDomainSocketConnection, 

413 "maint_notifications_config": MaintNotificationsConfig( 

414 enabled=False 

415 ), 

416 } 

417 ) 

418 else: 

419 # TCP specific options 

420 kwargs.update( 

421 { 

422 "host": host, 

423 "port": port, 

424 "socket_connect_timeout": socket_connect_timeout, 

425 "socket_keepalive": socket_keepalive, 

426 "socket_keepalive_options": socket_keepalive_options, 

427 } 

428 ) 

429 

430 if ssl: 

431 kwargs.update( 

432 { 

433 "connection_class": SSLConnection, 

434 "ssl_keyfile": ssl_keyfile, 

435 "ssl_certfile": ssl_certfile, 

436 "ssl_cert_reqs": ssl_cert_reqs, 

437 "ssl_include_verify_flags": ssl_include_verify_flags, 

438 "ssl_exclude_verify_flags": ssl_exclude_verify_flags, 

439 "ssl_ca_certs": ssl_ca_certs, 

440 "ssl_ca_data": ssl_ca_data, 

441 "ssl_check_hostname": ssl_check_hostname, 

442 "ssl_password": ssl_password, 

443 "ssl_ca_path": ssl_ca_path, 

444 "ssl_validate_ocsp_stapled": ssl_validate_ocsp_stapled, 

445 "ssl_validate_ocsp": ssl_validate_ocsp, 

446 "ssl_ocsp_context": ssl_ocsp_context, 

447 "ssl_ocsp_expected_cert": ssl_ocsp_expected_cert, 

448 "ssl_min_version": ssl_min_version, 

449 "ssl_ciphers": ssl_ciphers, 

450 } 

451 ) 

452 if (cache_config or cache) and check_protocol_version(protocol, 3): 

453 kwargs.update( 

454 { 

455 "cache": cache, 

456 "cache_config": cache_config, 

457 } 

458 ) 

459 maint_notifications_enabled = ( 

460 maint_notifications_config and maint_notifications_config.enabled 

461 ) 

462 if maint_notifications_enabled and not check_protocol_version( 

463 protocol, 3 

464 ): 

465 raise RedisError( 

466 "Maintenance notifications handlers on connection are only supported with RESP version 3" 

467 ) 

468 if maint_notifications_config: 

469 kwargs.update( 

470 { 

471 "maint_notifications_config": maint_notifications_config, 

472 } 

473 ) 

474 if oss_cluster_maint_notifications_handler: 

475 kwargs.update( 

476 { 

477 "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler, 

478 } 

479 ) 

480 connection_pool = ConnectionPool(**kwargs) 

481 self._event_dispatcher.dispatch( 

482 AfterPooledConnectionsInstantiationEvent( 

483 [connection_pool], ClientType.SYNC, credential_provider 

484 ) 

485 ) 

486 self.auto_close_connection_pool = True 

487 else: 

488 self.auto_close_connection_pool = False 

489 self._event_dispatcher.dispatch( 

490 AfterPooledConnectionsInstantiationEvent( 

491 [connection_pool], ClientType.SYNC, credential_provider 

492 ) 

493 ) 

494 

495 self.connection_pool = connection_pool 

496 

497 if (cache_config or cache) and not check_protocol_version( 

498 self.connection_pool.get_protocol(), 3 

499 ): 

500 raise RedisError("Client caching is only supported with RESP version 3") 

501 

502 self.single_connection_lock = threading.RLock() 

503 self.connection = None 

504 self._single_connection_client = single_connection_client 

505 if self._single_connection_client: 

506 self.connection = self.connection_pool.get_connection() 

507 self._event_dispatcher.dispatch( 

508 AfterSingleConnectionInstantiationEvent( 

509 self.connection, ClientType.SYNC, self.single_connection_lock 

510 ) 

511 ) 

512 

513 connection_kwargs = self.connection_pool.connection_kwargs 

514 self.response_callbacks = CaseInsensitiveDict( 

515 get_response_callbacks( 

516 user_protocol=connection_kwargs.get("protocol"), 

517 legacy_responses=connection_kwargs.get("legacy_responses", True), 

518 ) 

519 ) 

520 

521 def __repr__(self) -> str: 

522 return ( 

523 f"<{type(self).__module__}.{type(self).__name__}" 

524 f"({repr(self.connection_pool)})>" 

525 ) 

526 

527 def get_encoder(self) -> "Encoder": 

528 """Get the connection pool's encoder""" 

529 return self.connection_pool.get_encoder() 

530 

531 def get_connection_kwargs(self) -> Dict: 

532 """Get the connection's key-word arguments""" 

533 return self.connection_pool.connection_kwargs 

534 

535 def get_retry(self) -> Optional[Retry]: 

536 return self.get_connection_kwargs().get("retry") 

537 

538 def set_retry(self, retry: Retry) -> None: 

539 self.get_connection_kwargs().update({"retry": retry}) 

540 self.connection_pool.set_retry(retry) 

541 

542 def set_response_callback(self, command: str, callback: Callable) -> None: 

543 """Set a custom Response Callback""" 

544 self.response_callbacks[command] = callback 

545 

546 def load_external_module(self, funcname, func) -> None: 

547 """ 

548 This function can be used to add externally defined redis modules, 

549 and their namespaces to the redis client. 

550 

551 funcname - A string containing the name of the function to create 

552 func - The function, being added to this class. 

553 

554 ex: Assume that one has a custom redis module named foomod that 

555 creates command named 'foo.dothing' and 'foo.anotherthing' in redis. 

556 To load function functions into this namespace: 

557 

558 from redis import Redis 

559 from foomodule import F 

560 r = Redis() 

561 r.load_external_module("foo", F) 

562 r.foo().dothing('your', 'arguments') 

563 

564 For a concrete example see the reimport of the redisjson module in 

565 tests/test_connection.py::test_loading_external_modules 

566 """ 

567 setattr(self, funcname, func) 

568 

569 def pipeline(self, transaction=True, shard_hint=None) -> "Pipeline": 

570 """ 

571 Return a new pipeline object that can queue multiple commands for 

572 later execution. ``transaction`` indicates whether all commands 

573 should be executed atomically. Apart from making a group of operations 

574 atomic, pipelines are useful for reducing the back-and-forth overhead 

575 between the client and server. 

576 """ 

577 return Pipeline( 

578 self.connection_pool, self.response_callbacks, transaction, shard_hint 

579 ) 

580 

581 def transaction( 

582 self, func: Callable[["Pipeline"], None], *watches, **kwargs 

583 ) -> Union[List[Any], Any, None]: 

584 """ 

585 Convenience method for executing the callable `func` as a transaction 

586 while watching all keys specified in `watches`. The 'func' callable 

587 should expect a single argument which is a Pipeline object. 

588 """ 

589 shard_hint = kwargs.pop("shard_hint", None) 

590 value_from_callable = kwargs.pop("value_from_callable", False) 

591 watch_delay = kwargs.pop("watch_delay", None) 

592 with self.pipeline(True, shard_hint) as pipe: 

593 while True: 

594 try: 

595 if watches: 

596 pipe.watch(*watches) 

597 func_value = func(pipe) 

598 exec_value = pipe.execute() 

599 return func_value if value_from_callable else exec_value 

600 except WatchError: 

601 if watch_delay is not None and watch_delay > 0: 

602 time.sleep(watch_delay) 

603 continue 

604 

605 def lock( 

606 self, 

607 name: str, 

608 timeout: Optional[float] = None, 

609 sleep: float = 0.1, 

610 blocking: bool = True, 

611 blocking_timeout: Optional[float] = None, 

612 lock_class: Union[None, Any] = None, 

613 thread_local: bool = True, 

614 raise_on_release_error: bool = True, 

615 ): 

616 """ 

617 Return a new Lock object using key ``name`` that mimics 

618 the behavior of threading.Lock. 

619 

620 If specified, ``timeout`` indicates a maximum life for the lock. 

621 By default, it will remain locked until release() is called. 

622 

623 ``sleep`` indicates the amount of time to sleep per loop iteration 

624 when the lock is in blocking mode and another client is currently 

625 holding the lock. 

626 

627 ``blocking`` indicates whether calling ``acquire`` should block until 

628 the lock has been acquired or to fail immediately, causing ``acquire`` 

629 to return False and the lock not being acquired. Defaults to True. 

630 Note this value can be overridden by passing a ``blocking`` 

631 argument to ``acquire``. 

632 

633 ``blocking_timeout`` indicates the maximum amount of time in seconds to 

634 spend trying to acquire the lock. A value of ``None`` indicates 

635 continue trying forever. ``blocking_timeout`` can be specified as a 

636 float or integer, both representing the number of seconds to wait. 

637 

638 ``lock_class`` forces the specified lock implementation. Note that as 

639 of redis-py 3.0, the only lock class we implement is ``Lock`` (which is 

640 a Lua-based lock). So, it's unlikely you'll need this parameter, unless 

641 you have created your own custom lock class. 

642 

643 ``thread_local`` indicates whether the lock token is placed in 

644 thread-local storage. By default, the token is placed in thread local 

645 storage so that a thread only sees its token, not a token set by 

646 another thread. Consider the following timeline: 

647 

648 time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds. 

649 thread-1 sets the token to "abc" 

650 time: 1, thread-2 blocks trying to acquire `my-lock` using the 

651 Lock instance. 

652 time: 5, thread-1 has not yet completed. redis expires the lock 

653 key. 

654 time: 5, thread-2 acquired `my-lock` now that it's available. 

655 thread-2 sets the token to "xyz" 

656 time: 6, thread-1 finishes its work and calls release(). if the 

657 token is *not* stored in thread local storage, then 

658 thread-1 would see the token value as "xyz" and would be 

659 able to successfully release the thread-2's lock. 

660 

661 ``raise_on_release_error`` indicates whether to raise an exception when 

662 the lock is no longer owned when exiting the context manager. By default, 

663 this is True, meaning an exception will be raised. If False, the warning 

664 will be logged and the exception will be suppressed. 

665 

666 In some use cases it's necessary to disable thread local storage. For 

667 example, if you have code where one thread acquires a lock and passes 

668 that lock instance to a worker thread to release later. If thread 

669 local storage isn't disabled in this case, the worker thread won't see 

670 the token set by the thread that acquired the lock. Our assumption 

671 is that these cases aren't common and as such default to using 

672 thread local storage.""" 

673 if lock_class is None: 

674 lock_class = Lock 

675 return lock_class( 

676 self, 

677 name, 

678 timeout=timeout, 

679 sleep=sleep, 

680 blocking=blocking, 

681 blocking_timeout=blocking_timeout, 

682 thread_local=thread_local, 

683 raise_on_release_error=raise_on_release_error, 

684 ) 

685 

686 def pubsub(self, **kwargs): 

687 """ 

688 Return a Publish/Subscribe object. With this object, you can 

689 subscribe to channels and listen for messages that get published to 

690 them. 

691 """ 

692 return PubSub( 

693 self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs 

694 ) 

695 

696 def keyspace_notifications( 

697 self, 

698 key_prefix: Union[str, bytes, None] = None, 

699 ignore_subscribe_messages: bool = True, 

700 ) -> "KeyspaceNotifications": 

701 """ 

702 Return a :class:`~redis.keyspace_notifications.KeyspaceNotifications` 

703 object for subscribing to keyspace and keyevent notifications. 

704 

705 Note: Keyspace notifications must be enabled on the Redis server via 

706 the ``notify-keyspace-events`` configuration option. 

707 

708 Args: 

709 key_prefix: Optional prefix to filter and strip from keys in 

710 notifications. 

711 ignore_subscribe_messages: If True, subscribe/unsubscribe 

712 confirmations are not returned by 

713 get_message/listen. 

714 """ 

715 from redis.keyspace_notifications import KeyspaceNotifications 

716 

717 return KeyspaceNotifications( 

718 self, 

719 key_prefix=key_prefix, 

720 ignore_subscribe_messages=ignore_subscribe_messages, 

721 ) 

722 

723 def monitor(self): 

724 return Monitor(self.connection_pool) 

725 

726 def client(self): 

727 return self.__class__( 

728 connection_pool=self.connection_pool, 

729 single_connection_client=True, 

730 ) 

731 

732 def __enter__(self): 

733 return self 

734 

735 def __exit__(self, exc_type, exc_value, traceback): 

736 self.close() 

737 

738 def __del__(self): 

739 try: 

740 self.close() 

741 except Exception: 

742 pass 

743 

744 def close(self) -> None: 

745 # In case a connection property does not yet exist 

746 # (due to a crash earlier in the Redis() constructor), return 

747 # immediately as there is nothing to clean-up. 

748 if not hasattr(self, "connection"): 

749 return 

750 

751 conn = self.connection 

752 if conn: 

753 self.connection = None 

754 self.connection_pool.release(conn) 

755 

756 if self.auto_close_connection_pool: 

757 self.connection_pool.disconnect() 

758 

759 def _send_command_parse_response(self, conn, command_name, *args, **options): 

760 """ 

761 Send a command and parse the response 

762 """ 

763 conn.send_command(*args, **options) 

764 return self.parse_response(conn, command_name, **options) 

765 

766 def _close_connection( 

767 self, 

768 conn, 

769 error: Optional[Exception] = None, 

770 failure_count: Optional[int] = None, 

771 start_time: Optional[float] = None, 

772 command_name: Optional[str] = None, 

773 ) -> None: 

774 """ 

775 Close the connection before retrying. 

776 

777 The supported exceptions are already checked in the 

778 retry object so we don't need to do it here. 

779 

780 After we disconnect the connection, it will try to reconnect and 

781 do a health check as part of the send_command logic(on connection level). 

782 """ 

783 if error and failure_count <= conn.retry.get_retries(): 

784 record_operation_duration( 

785 command_name=command_name, 

786 duration_seconds=time.monotonic() - start_time, 

787 server_address=getattr(conn, "host", None), 

788 server_port=getattr(conn, "port", None), 

789 db_namespace=str(conn.db), 

790 error=error, 

791 retry_attempts=failure_count, 

792 ) 

793 

794 conn.disconnect() 

795 

796 # COMMAND EXECUTION AND PROTOCOL PARSING 

797 def execute_command(self, *args, **options): 

798 return self._execute_command(*args, **options) 

799 

800 def _execute_command(self, *args, **options): 

801 """Execute a command and return a parsed response""" 

802 pool = self.connection_pool 

803 command_name = args[0] 

804 conn = self.connection or pool.get_connection() 

805 

806 # Start timing for observability 

807 start_time = time.monotonic() 

808 # Track actual retry attempts for error reporting 

809 actual_retry_attempts = [0] 

810 

811 def failure_callback(error, failure_count): 

812 if is_debug_log_enabled(): 

813 add_debug_log_for_operation_failure(conn) 

814 actual_retry_attempts[0] = failure_count 

815 self._close_connection(conn, error, failure_count, start_time, command_name) 

816 

817 if self._single_connection_client: 

818 self.single_connection_lock.acquire() 

819 try: 

820 result = conn.retry.call_with_retry( 

821 lambda: self._send_command_parse_response( 

822 conn, command_name, *args, **options 

823 ), 

824 failure_callback, 

825 with_failure_count=True, 

826 ) 

827 

828 record_operation_duration( 

829 command_name=command_name, 

830 duration_seconds=time.monotonic() - start_time, 

831 server_address=getattr(conn, "host", None), 

832 server_port=getattr(conn, "port", None), 

833 db_namespace=str(conn.db), 

834 ) 

835 return result 

836 except Exception as e: 

837 record_error_count( 

838 server_address=getattr(conn, "host", None), 

839 server_port=getattr(conn, "port", None), 

840 network_peer_address=getattr(conn, "host", None), 

841 network_peer_port=getattr(conn, "port", None), 

842 error_type=e, 

843 retry_attempts=actual_retry_attempts[0], 

844 is_internal=False, 

845 ) 

846 raise 

847 

848 finally: 

849 if conn and conn.should_reconnect(): 

850 self._close_connection(conn) 

851 conn.connect() 

852 if self._single_connection_client: 

853 self.single_connection_lock.release() 

854 if not self.connection: 

855 pool.release(conn) 

856 

857 def parse_response(self, connection, command_name, **options): 

858 """Parses a response from the Redis server""" 

859 try: 

860 if NEVER_DECODE in options: 

861 response = connection.read_response(disable_decoding=True) 

862 options.pop(NEVER_DECODE) 

863 else: 

864 response = connection.read_response() 

865 except ResponseError: 

866 if EMPTY_RESPONSE in options: 

867 return options[EMPTY_RESPONSE] 

868 raise 

869 

870 if EMPTY_RESPONSE in options: 

871 options.pop(EMPTY_RESPONSE) 

872 

873 # Remove keys entry, it needs only for cache. 

874 options.pop("keys", None) 

875 

876 if command_name in self.response_callbacks: 

877 return self.response_callbacks[command_name](response, **options) 

878 return response 

879 

880 def get_cache(self) -> Optional[CacheInterface]: 

881 return self.connection_pool.cache 

882 

883 

884StrictRedis = Redis 

885 

886 

887class Monitor: 

888 """ 

889 Monitor is useful for handling the MONITOR command to the redis server. 

890 next_command() method returns one command from monitor 

891 listen() method yields commands from monitor. 

892 """ 

893 

894 monitor_re = re.compile(r"\[(\d+) (.*?)\] (.*)") 

895 command_re = re.compile(r'"(.*?)(?<!\\)"') 

896 

897 def __init__(self, connection_pool): 

898 self.connection_pool = connection_pool 

899 self.connection = self.connection_pool.get_connection() 

900 

901 def __enter__(self): 

902 self._start_monitor() 

903 return self 

904 

905 def __exit__(self, *args): 

906 self.connection.disconnect() 

907 self.connection_pool.release(self.connection) 

908 

909 def next_command(self): 

910 """Parse the response from a monitor command""" 

911 response = self.connection.read_response() 

912 

913 if response is None: 

914 return None 

915 

916 if isinstance(response, bytes): 

917 response = self.connection.encoder.decode(response, force=True) 

918 

919 command_time, command_data = response.split(" ", 1) 

920 m = self.monitor_re.match(command_data) 

921 db_id, client_info, command = m.groups() 

922 command = " ".join(self.command_re.findall(command)) 

923 # Redis escapes double quotes because each piece of the command 

924 # string is surrounded by double quotes. We don't have that 

925 # requirement so remove the escaping and leave the quote. 

926 command = command.replace('\\"', '"') 

927 

928 if client_info == "lua": 

929 client_address = "lua" 

930 client_port = "" 

931 client_type = "lua" 

932 elif client_info.startswith("unix"): 

933 client_address = "unix" 

934 client_port = client_info[5:] 

935 client_type = "unix" 

936 else: 

937 if client_info == "": 

938 client_address = "" 

939 client_port = "" 

940 client_type = "unknown" 

941 else: 

942 # use rsplit as ipv6 addresses contain colons 

943 client_address, client_port = client_info.rsplit(":", 1) 

944 client_type = "tcp" 

945 return { 

946 "time": float(command_time), 

947 "db": int(db_id), 

948 "client_address": client_address, 

949 "client_port": client_port, 

950 "client_type": client_type, 

951 "command": command, 

952 } 

953 

954 def listen(self): 

955 """Listen for commands coming to the server.""" 

956 while True: 

957 yield self.next_command() 

958 

959 def _start_monitor(self): 

960 self.connection.send_command("MONITOR") 

961 # check that monitor returns 'OK', but don't return it to user 

962 response = self.connection.read_response() 

963 

964 if not bool_ok(response): 

965 raise RedisError(f"MONITOR failed: {response}") 

966 

967 

968class PubSub: 

969 """ 

970 PubSub provides publish, subscribe and listen support to Redis channels. 

971 

972 After subscribing to one or more channels, the listen() method will block 

973 until a message arrives on one of the subscribed channels. That message 

974 will be returned and it's safe to start listening again. 

975 """ 

976 

977 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage") 

978 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe") 

979 HEALTH_CHECK_MESSAGE = "redis-py-health-check" 

980 

981 def __init__( 

982 self, 

983 connection_pool, 

984 shard_hint=None, 

985 ignore_subscribe_messages: bool = False, 

986 encoder: Optional["Encoder"] = None, 

987 push_handler_func: Union[None, Callable[[str], None]] = None, 

988 event_dispatcher: Optional["EventDispatcher"] = None, 

989 ): 

990 self.connection_pool = connection_pool 

991 self.shard_hint = shard_hint 

992 self.ignore_subscribe_messages = ignore_subscribe_messages 

993 self.connection = None 

994 self.subscribed_event = threading.Event() 

995 # we need to know the encoding options for this connection in order 

996 # to lookup channel and pattern names for callback handlers. 

997 self.encoder = encoder 

998 self.push_handler_func = push_handler_func 

999 if event_dispatcher is None: 

1000 self._event_dispatcher = EventDispatcher() 

1001 else: 

1002 self._event_dispatcher = event_dispatcher 

1003 

1004 self._lock = threading.RLock() 

1005 if self.encoder is None: 

1006 self.encoder = self.connection_pool.get_encoder() 

1007 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) 

1008 if self.encoder.decode_responses: 

1009 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] 

1010 else: 

1011 self.health_check_response = [b"pong", self.health_check_response_b] 

1012 if self.push_handler_func is None: 

1013 _set_info_logger() 

1014 self.reset() 

1015 

1016 def __enter__(self) -> "PubSub": 

1017 return self 

1018 

1019 def __exit__(self, exc_type, exc_value, traceback) -> None: 

1020 self.reset() 

1021 

1022 def __del__(self) -> None: 

1023 try: 

1024 # if this object went out of scope prior to shutting down 

1025 # subscriptions, close the connection manually before 

1026 # returning it to the connection pool 

1027 self.reset() 

1028 except Exception: 

1029 pass 

1030 

1031 def reset(self) -> None: 

1032 if self.connection: 

1033 self.connection.disconnect() 

1034 self.connection.deregister_connect_callback(self.on_connect) 

1035 self.connection_pool.release(self.connection) 

1036 self.connection = None 

1037 self.health_check_response_counter = 0 

1038 self.channels = {} 

1039 self.pending_unsubscribe_channels = set() 

1040 self.shard_channels = {} 

1041 self.pending_unsubscribe_shard_channels = set() 

1042 self.patterns = {} 

1043 self.pending_unsubscribe_patterns = set() 

1044 self.subscribed_event.clear() 

1045 

1046 def close(self) -> None: 

1047 self.reset() 

1048 

1049 def _resubscribe(self, subscribed, subscribe_fn) -> None: 

1050 # Replay handler-backed subscriptions as positional Subscription objects 

1051 # so binary names never need to be decoded into keyword argument keys. 

1052 subscriptions = pubsub_subscription_args(subscribed) 

1053 if subscriptions: 

1054 subscribe_fn(*subscriptions) 

1055 

1056 def _resubscribe_shard_channels(self) -> None: 

1057 self._resubscribe(self.shard_channels, self.ssubscribe) 

1058 

1059 def on_connect(self, connection) -> None: 

1060 "Re-subscribe to any channels and patterns previously subscribed to" 

1061 self.pending_unsubscribe_channels.clear() 

1062 self.pending_unsubscribe_patterns.clear() 

1063 self.pending_unsubscribe_shard_channels.clear() 

1064 if self.channels: 

1065 self._resubscribe(self.channels, self.subscribe) 

1066 if self.patterns: 

1067 self._resubscribe(self.patterns, self.psubscribe) 

1068 if self.shard_channels: 

1069 self._resubscribe_shard_channels() 

1070 

1071 @property 

1072 def subscribed(self) -> bool: 

1073 """Indicates if there are subscriptions to any channels or patterns""" 

1074 return self.subscribed_event.is_set() 

1075 

1076 def execute_command(self, *args): 

1077 """Execute a publish/subscribe command""" 

1078 

1079 # NOTE: don't parse the response in this function -- it could pull a 

1080 # legitimate message off the stack if the connection is already 

1081 # subscribed to one or more channels 

1082 

1083 if self.connection is None: 

1084 self.connection = self.connection_pool.get_connection() 

1085 # register a callback that re-subscribes to any channels we 

1086 # were listening to when we were disconnected 

1087 self.connection.register_connect_callback(self.on_connect) 

1088 if self.push_handler_func is not None: 

1089 self.connection._parser.set_pubsub_push_handler(self.push_handler_func) 

1090 self._event_dispatcher.dispatch( 

1091 AfterPubSubConnectionInstantiationEvent( 

1092 self.connection, self.connection_pool, ClientType.SYNC, self._lock 

1093 ) 

1094 ) 

1095 connection = self.connection 

1096 kwargs = {"check_health": not self.subscribed} 

1097 if not self.subscribed: 

1098 self.clean_health_check_responses() 

1099 with self._lock: 

1100 self._execute(connection, connection.send_command, *args, **kwargs) 

1101 

1102 def clean_health_check_responses(self) -> None: 

1103 """ 

1104 If any health check responses are present, clean them 

1105 """ 

1106 ttl = 10 

1107 conn = self.connection 

1108 while conn and self.health_check_response_counter > 0 and ttl > 0: 

1109 if self._execute(conn, conn.can_read, timeout=conn.socket_timeout): 

1110 response = self._execute(conn, conn.read_response) 

1111 if self.is_health_check_response(response): 

1112 self.health_check_response_counter -= 1 

1113 else: 

1114 raise PubSubError( 

1115 "A non health check response was cleaned by " 

1116 "execute_command: {}".format(response) 

1117 ) 

1118 ttl -= 1 

1119 

1120 def _reconnect( 

1121 self, 

1122 conn, 

1123 error: Optional[Exception] = None, 

1124 failure_count: Optional[int] = None, 

1125 start_time: Optional[float] = None, 

1126 command_name: Optional[str] = None, 

1127 ) -> None: 

1128 """ 

1129 The supported exceptions are already checked in the 

1130 retry object so we don't need to do it here. 

1131 

1132 In this error handler we are trying to reconnect to the server. 

1133 """ 

1134 if error and failure_count <= conn.retry.get_retries(): 

1135 if command_name: 

1136 record_operation_duration( 

1137 command_name=command_name, 

1138 duration_seconds=time.monotonic() - start_time, 

1139 server_address=getattr(conn, "host", None), 

1140 server_port=getattr(conn, "port", None), 

1141 db_namespace=str(conn.db), 

1142 error=error, 

1143 retry_attempts=failure_count, 

1144 ) 

1145 conn.disconnect() 

1146 conn.connect() 

1147 

1148 def _execute(self, conn, command, *args, **kwargs): 

1149 """ 

1150 Connect manually upon disconnection. If the Redis server is down, 

1151 this will fail and raise a ConnectionError as desired. 

1152 After reconnection, the ``on_connect`` callback should have been 

1153 called by the # connection to resubscribe us to any channels and 

1154 patterns we were previously listening to 

1155 """ 

1156 

1157 if conn.should_reconnect(): 

1158 self._reconnect(conn) 

1159 

1160 if not len(args) == 0: 

1161 command_name = args[0] 

1162 else: 

1163 command_name = None 

1164 

1165 # Start timing for observability 

1166 start_time = time.monotonic() 

1167 # Track actual retry attempts for error reporting 

1168 actual_retry_attempts = [0] 

1169 

1170 def failure_callback(error, failure_count): 

1171 actual_retry_attempts[0] = failure_count 

1172 self._reconnect(conn, error, failure_count, start_time, command_name) 

1173 

1174 try: 

1175 response = conn.retry.call_with_retry( 

1176 lambda: command(*args, **kwargs), 

1177 failure_callback, 

1178 with_failure_count=True, 

1179 ) 

1180 

1181 if command_name: 

1182 record_operation_duration( 

1183 command_name=command_name, 

1184 duration_seconds=time.monotonic() - start_time, 

1185 server_address=getattr(conn, "host", None), 

1186 server_port=getattr(conn, "port", None), 

1187 db_namespace=str(conn.db), 

1188 ) 

1189 

1190 return response 

1191 except Exception as e: 

1192 record_error_count( 

1193 server_address=getattr(conn, "host", None), 

1194 server_port=getattr(conn, "port", None), 

1195 network_peer_address=getattr(conn, "host", None), 

1196 network_peer_port=getattr(conn, "port", None), 

1197 error_type=e, 

1198 retry_attempts=actual_retry_attempts[0], 

1199 is_internal=False, 

1200 ) 

1201 raise 

1202 

1203 def parse_response(self, block=True, timeout=0): 

1204 """ 

1205 Parse the response from a publish/subscribe command. 

1206 

1207 Args: 

1208 block: If True, block indefinitely until a message is available. 

1209 If False, return immediately if no message is available. 

1210 Default: True 

1211 timeout: The timeout in seconds for reading a response when block=False. 

1212 This parameter is ignored when block=True. 

1213 Default: 0 (return immediately if no data available) 

1214 

1215 Returns: 

1216 The parsed response from the server, or None if no message is available 

1217 within the timeout period (when block=False). 

1218 

1219 Important: 

1220 The block and timeout parameters work together: 

1221 - When block=True: timeout is IGNORED, method blocks indefinitely 

1222 - When block=False: timeout is USED, method returns after timeout expires 

1223 

1224 Typically, you should use get_message(timeout=X) instead of calling 

1225 parse_response() directly. The get_message() method automatically sets 

1226 block=False when a timeout is provided, and block=True when timeout=None. 

1227 

1228 Example: 

1229 # Block indefinitely (timeout is ignored) 

1230 response = pubsub.parse_response(block=True, timeout=0.1) 

1231 

1232 # Non-blocking with 0.1 second timeout 

1233 response = pubsub.parse_response(block=False, timeout=0.1) 

1234 

1235 # Non-blocking, return immediately 

1236 response = pubsub.parse_response(block=False, timeout=0) 

1237 

1238 # Recommended: use get_message() instead 

1239 msg = pubsub.get_message(timeout=0.1) # automatically sets block=False 

1240 msg = pubsub.get_message(timeout=None) # automatically sets block=True 

1241 """ 

1242 conn = self.connection 

1243 if conn is None: 

1244 raise RuntimeError( 

1245 "pubsub connection not set: " 

1246 "did you forget to call subscribe() or psubscribe()?" 

1247 ) 

1248 

1249 self.check_health() 

1250 

1251 def try_read(): 

1252 if not block: 

1253 if not conn.can_read(timeout=timeout): 

1254 return None 

1255 read_timeout = timeout 

1256 else: 

1257 conn.connect() 

1258 read_timeout = SENTINEL # Use default socket timeout for blocking 

1259 return conn.read_response( 

1260 disconnect_on_error=False, push_request=True, timeout=read_timeout 

1261 ) 

1262 

1263 response = self._execute(conn, try_read) 

1264 

1265 if self.is_health_check_response(response): 

1266 # ignore the health check message as user might not expect it 

1267 self.health_check_response_counter -= 1 

1268 return None 

1269 return response 

1270 

1271 def is_health_check_response(self, response) -> bool: 

1272 """ 

1273 Check if the response is a health check response. 

1274 If there are no subscriptions redis responds to PING command with a 

1275 bulk response, instead of a multi-bulk with "pong" and the response. 

1276 """ 

1277 if self.encoder.decode_responses: 

1278 return ( 

1279 response 

1280 in [ 

1281 self.health_check_response, # If there is a subscription 

1282 self.HEALTH_CHECK_MESSAGE, # If there are no subscriptions and decode_responses=True 

1283 ] 

1284 ) 

1285 else: 

1286 return ( 

1287 response 

1288 in [ 

1289 self.health_check_response, # If there is a subscription 

1290 self.health_check_response_b, # If there isn't a subscription and decode_responses=False 

1291 ] 

1292 ) 

1293 

1294 def check_health(self) -> None: 

1295 conn = self.connection 

1296 if conn is None: 

1297 raise RuntimeError( 

1298 "pubsub connection not set: " 

1299 "did you forget to call subscribe() or psubscribe()?" 

1300 ) 

1301 

1302 if conn.health_check_interval and time.monotonic() > conn.next_health_check: 

1303 conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False) 

1304 self.health_check_response_counter += 1 

1305 

1306 def _normalize_keys(self, data) -> Dict: 

1307 """ 

1308 normalize channel/pattern names to be either bytes or strings 

1309 based on whether responses are automatically decoded. this saves us 

1310 from coercing the value for each message coming in. 

1311 """ 

1312 encode = self.encoder.encode 

1313 decode = self.encoder.decode 

1314 return {decode(encode(k)): v for k, v in data.items()} 

1315 

1316 def psubscribe( 

1317 self, *args: ChannelT | Subscription, **kwargs: PubSubHandler 

1318 ) -> None: 

1319 """ 

1320 Subscribe to channel patterns. 

1321 Patterns supplied as keyword arguments expect a pattern name as the 

1322 key and a callable as the value. 

1323 ``Subscription`` objects can also be supplied positionally with an 

1324 optional handler. 

1325 A pattern's callable will be invoked automatically 

1326 when a message is received on that pattern rather than producing a 

1327 message via ``listen()``. 

1328 """ 

1329 new_patterns = parse_pubsub_subscriptions(args, kwargs) 

1330 ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys()) 

1331 # update the patterns dict AFTER we send the command. we don't want to 

1332 # subscribe twice to these patterns, once for the command and again 

1333 # for the reconnection. 

1334 new_patterns = self._normalize_keys(new_patterns) 

1335 self.patterns.update(new_patterns) 

1336 if not self.subscribed: 

1337 # Set the subscribed_event flag to True 

1338 self.subscribed_event.set() 

1339 # Clear the health check counter 

1340 self.health_check_response_counter = 0 

1341 self.pending_unsubscribe_patterns.difference_update(new_patterns) 

1342 return ret_val 

1343 

1344 def punsubscribe(self, *args): 

1345 """ 

1346 Unsubscribe from the supplied patterns. If empty, unsubscribe from 

1347 all patterns. 

1348 """ 

1349 if args: 

1350 args = list_or_args(args[0], args[1:]) 

1351 patterns = self._normalize_keys(dict.fromkeys(args)) 

1352 else: 

1353 patterns = self.patterns 

1354 self.pending_unsubscribe_patterns.update(patterns) 

1355 return self.execute_command("PUNSUBSCRIBE", *args) 

1356 

1357 def subscribe( 

1358 self, *args: ChannelT | Subscription, **kwargs: PubSubHandler 

1359 ) -> None: 

1360 """ 

1361 Subscribe to channels. 

1362 Channels supplied as keyword arguments expect 

1363 a channel name as the key and a callable as the value. 

1364 ``Subscription`` objects can also be supplied positionally with an 

1365 optional handler. 

1366 A channel's callable will be invoked automatically 

1367 when a message is received on that channel rather than producing a 

1368 message via ``listen()`` or ``get_message()``. 

1369 """ 

1370 new_channels = parse_pubsub_subscriptions(args, kwargs) 

1371 ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys()) 

1372 # update the channels dict AFTER we send the command. we don't want to 

1373 # subscribe twice to these channels, once for the command and again 

1374 # for the reconnection. 

1375 new_channels = self._normalize_keys(new_channels) 

1376 self.channels.update(new_channels) 

1377 if not self.subscribed: 

1378 # Set the subscribed_event flag to True 

1379 self.subscribed_event.set() 

1380 # Clear the health check counter 

1381 self.health_check_response_counter = 0 

1382 self.pending_unsubscribe_channels.difference_update(new_channels) 

1383 return ret_val 

1384 

1385 def unsubscribe(self, *args): 

1386 """ 

1387 Unsubscribe from the supplied channels. If empty, unsubscribe from 

1388 all channels 

1389 """ 

1390 if args: 

1391 args = list_or_args(args[0], args[1:]) 

1392 channels = self._normalize_keys(dict.fromkeys(args)) 

1393 else: 

1394 channels = self.channels 

1395 self.pending_unsubscribe_channels.update(channels) 

1396 return self.execute_command("UNSUBSCRIBE", *args) 

1397 

1398 def ssubscribe( 

1399 self, 

1400 *args: ChannelT | Subscription, 

1401 target_node: Any = None, 

1402 **kwargs: PubSubHandler, 

1403 ) -> None: 

1404 """ 

1405 Subscribes the client to the specified shard channels. 

1406 Channels supplied as keyword arguments expect a channel name as the key 

1407 and a callable as the value. 

1408 ``Subscription`` objects can also be supplied positionally 

1409 with an optional handler. 

1410 A channel's callable will be invoked automatically when a message 

1411 is received on that channel rather than producing a message 

1412 via ``listen()`` or ``get_sharded_message()``. 

1413 """ 

1414 new_s_channels = parse_pubsub_subscriptions(args, kwargs) 

1415 ret_val = self.execute_command("SSUBSCRIBE", *new_s_channels.keys()) 

1416 # update the s_channels dict AFTER we send the command. we don't want to 

1417 # subscribe twice to these channels, once for the command and again 

1418 # for the reconnection. 

1419 new_s_channels = self._normalize_keys(new_s_channels) 

1420 self.shard_channels.update(new_s_channels) 

1421 if not self.subscribed: 

1422 # Set the subscribed_event flag to True 

1423 self.subscribed_event.set() 

1424 # Clear the health check counter 

1425 self.health_check_response_counter = 0 

1426 self.pending_unsubscribe_shard_channels.difference_update(new_s_channels) 

1427 return ret_val 

1428 

1429 def sunsubscribe(self, *args, target_node=None): 

1430 """ 

1431 Unsubscribe from the supplied shard_channels. If empty, unsubscribe from 

1432 all shard_channels 

1433 """ 

1434 if args: 

1435 args = list_or_args(args[0], args[1:]) 

1436 s_channels = self._normalize_keys(dict.fromkeys(args)) 

1437 else: 

1438 s_channels = self.shard_channels 

1439 self.pending_unsubscribe_shard_channels.update(s_channels) 

1440 return self.execute_command("SUNSUBSCRIBE", *args) 

1441 

1442 def listen(self): 

1443 "Listen for messages on channels this client has been subscribed to" 

1444 while self.subscribed: 

1445 response = self.handle_message(self.parse_response(block=True)) 

1446 if response is not None: 

1447 yield response 

1448 

1449 def get_message( 

1450 self, ignore_subscribe_messages: bool = False, timeout: float = 0.0 

1451 ): 

1452 """ 

1453 Get the next message if one is available, otherwise None. 

1454 

1455 If timeout is specified, the system will wait for `timeout` seconds 

1456 before returning. Timeout should be specified as a floating point 

1457 number, or None, to wait indefinitely. 

1458 """ 

1459 if not self.subscribed: 

1460 # Wait for subscription 

1461 start_time = time.monotonic() 

1462 if self.subscribed_event.wait(timeout) is True: 

1463 # The connection was subscribed during the timeout time frame. 

1464 # The timeout should be adjusted based on the time spent 

1465 # waiting for the subscription 

1466 time_spent = time.monotonic() - start_time 

1467 timeout = max(0.0, timeout - time_spent) 

1468 else: 

1469 # The connection isn't subscribed to any channels or patterns, 

1470 # so no messages are available 

1471 return None 

1472 

1473 response = self.parse_response(block=(timeout is None), timeout=timeout) 

1474 

1475 if response: 

1476 return self.handle_message(response, ignore_subscribe_messages) 

1477 return None 

1478 

1479 get_sharded_message = get_message 

1480 

1481 def ping(self, message: Union[str, None] = None) -> bool: 

1482 """ 

1483 Ping the Redis server to test connectivity. 

1484 

1485 Sends a PING command to the Redis server and returns True if the server 

1486 responds with "PONG". 

1487 """ 

1488 args = ["PING", message] if message is not None else ["PING"] 

1489 return self.execute_command(*args) 

1490 

1491 def handle_message(self, response, ignore_subscribe_messages=False): 

1492 """ 

1493 Parses a pub/sub message. If the channel or pattern was subscribed to 

1494 with a message handler, the handler is invoked instead of a parsed 

1495 message being returned. 

1496 """ 

1497 if response is None: 

1498 return None 

1499 if isinstance(response, bytes): 

1500 response = [b"pong", response] if response != b"PONG" else [b"pong", b""] 

1501 

1502 message_type = str_if_bytes(response[0]) 

1503 if message_type == "pmessage": 

1504 message = { 

1505 "type": message_type, 

1506 "pattern": response[1], 

1507 "channel": response[2], 

1508 "data": response[3], 

1509 } 

1510 elif message_type == "pong": 

1511 message = { 

1512 "type": message_type, 

1513 "pattern": None, 

1514 "channel": None, 

1515 "data": response[1], 

1516 } 

1517 else: 

1518 message = { 

1519 "type": message_type, 

1520 "pattern": None, 

1521 "channel": response[1], 

1522 "data": response[2], 

1523 } 

1524 

1525 if message_type in ["message", "pmessage"]: 

1526 channel = str_if_bytes(message["channel"]) 

1527 record_pubsub_message( 

1528 direction=PubSubDirection.RECEIVE, 

1529 channel=channel, 

1530 ) 

1531 elif message_type == "smessage": 

1532 channel = str_if_bytes(message["channel"]) 

1533 record_pubsub_message( 

1534 direction=PubSubDirection.RECEIVE, 

1535 channel=channel, 

1536 sharded=True, 

1537 ) 

1538 

1539 # if this is an unsubscribe message, remove it from memory 

1540 if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: 

1541 if message_type == "punsubscribe": 

1542 pattern = response[1] 

1543 if pattern in self.pending_unsubscribe_patterns: 

1544 self.pending_unsubscribe_patterns.remove(pattern) 

1545 self.patterns.pop(pattern, None) 

1546 elif message_type == "sunsubscribe": 

1547 s_channel = response[1] 

1548 if s_channel in self.pending_unsubscribe_shard_channels: 

1549 self.pending_unsubscribe_shard_channels.remove(s_channel) 

1550 self.shard_channels.pop(s_channel, None) 

1551 else: 

1552 channel = response[1] 

1553 if channel in self.pending_unsubscribe_channels: 

1554 self.pending_unsubscribe_channels.remove(channel) 

1555 self.channels.pop(channel, None) 

1556 if not self.channels and not self.patterns and not self.shard_channels: 

1557 # There are no subscriptions anymore, set subscribed_event flag 

1558 # to false 

1559 self.subscribed_event.clear() 

1560 

1561 if message_type in self.PUBLISH_MESSAGE_TYPES: 

1562 # if there's a message handler, invoke it 

1563 if message_type == "pmessage": 

1564 handler = self.patterns.get(message["pattern"], None) 

1565 elif message_type == "smessage": 

1566 handler = self.shard_channels.get(message["channel"], None) 

1567 else: 

1568 handler = self.channels.get(message["channel"], None) 

1569 if handler: 

1570 handler(message) 

1571 return None 

1572 elif message_type != "pong": 

1573 # this is a subscribe/unsubscribe message. ignore if we don't 

1574 # want them 

1575 if ignore_subscribe_messages or self.ignore_subscribe_messages: 

1576 return None 

1577 

1578 return message 

1579 

1580 def run_in_thread( 

1581 self, 

1582 sleep_time: float = 0.0, 

1583 daemon: bool = False, 

1584 exception_handler: Optional[Callable] = None, 

1585 pubsub=None, 

1586 sharded_pubsub: bool = False, 

1587 ) -> "PubSubWorkerThread": 

1588 for channel, handler in self.channels.items(): 

1589 if handler is None: 

1590 raise PubSubError(f"Channel: '{channel}' has no handler registered") 

1591 for pattern, handler in self.patterns.items(): 

1592 if handler is None: 

1593 raise PubSubError(f"Pattern: '{pattern}' has no handler registered") 

1594 for s_channel, handler in self.shard_channels.items(): 

1595 if handler is None: 

1596 raise PubSubError( 

1597 f"Shard Channel: '{s_channel}' has no handler registered" 

1598 ) 

1599 

1600 pubsub = self if pubsub is None else pubsub 

1601 thread = PubSubWorkerThread( 

1602 pubsub, 

1603 sleep_time, 

1604 daemon=daemon, 

1605 exception_handler=exception_handler, 

1606 sharded_pubsub=sharded_pubsub, 

1607 ) 

1608 thread.start() 

1609 return thread 

1610 

1611 

1612class PubSubWorkerThread(threading.Thread): 

1613 def __init__( 

1614 self, 

1615 pubsub, 

1616 sleep_time: float, 

1617 daemon: bool = False, 

1618 exception_handler: Union[ 

1619 Callable[[Exception, "PubSub", "PubSubWorkerThread"], None], None 

1620 ] = None, 

1621 sharded_pubsub: bool = False, 

1622 ): 

1623 super().__init__() 

1624 self.daemon = daemon 

1625 self.pubsub = pubsub 

1626 self.sleep_time = sleep_time 

1627 self.exception_handler = exception_handler 

1628 self.sharded_pubsub = sharded_pubsub 

1629 self._running = threading.Event() 

1630 

1631 def run(self) -> None: 

1632 if self._running.is_set(): 

1633 return 

1634 self._running.set() 

1635 pubsub = self.pubsub 

1636 sleep_time = self.sleep_time 

1637 while self._running.is_set(): 

1638 try: 

1639 if not self.sharded_pubsub: 

1640 pubsub.get_message( 

1641 ignore_subscribe_messages=True, timeout=sleep_time 

1642 ) 

1643 else: 

1644 pubsub.get_sharded_message( 

1645 ignore_subscribe_messages=True, timeout=sleep_time 

1646 ) 

1647 except BaseException as e: 

1648 if self.exception_handler is None: 

1649 raise 

1650 self.exception_handler(e, pubsub, self) 

1651 pubsub.close() 

1652 

1653 def stop(self) -> None: 

1654 # trip the flag so the run loop exits. the run loop will 

1655 # close the pubsub connection, which disconnects the socket 

1656 # and returns the connection to the pool. 

1657 self._running.clear() 

1658 

1659 

1660class Pipeline(Redis): 

1661 """ 

1662 Pipelines provide a way to transmit multiple commands to the Redis server 

1663 in one transmission. This is convenient for batch processing, such as 

1664 saving all the values in a list to Redis. 

1665 

1666 All commands executed within a pipeline(when running in transactional mode, 

1667 which is the default behavior) are wrapped with MULTI and EXEC 

1668 calls. This guarantees all commands executed in the pipeline will be 

1669 executed atomically. 

1670 

1671 Any command raising an exception does *not* halt the execution of 

1672 subsequent commands in the pipeline. Instead, the exception is caught 

1673 and its instance is placed into the response list returned by execute(). 

1674 Code iterating over the response list should be able to deal with an 

1675 instance of an exception as a potential value. In general, these will be 

1676 ResponseError exceptions, such as those raised when issuing a command 

1677 on a key of a different datatype. 

1678 """ 

1679 

1680 UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} 

1681 

1682 def __init__( 

1683 self, 

1684 connection_pool: ConnectionPool, 

1685 response_callbacks, 

1686 transaction, 

1687 shard_hint, 

1688 ): 

1689 self.connection_pool = connection_pool 

1690 self.connection: Optional[Connection] = None 

1691 self.response_callbacks = response_callbacks 

1692 self.transaction = transaction 

1693 self.shard_hint = shard_hint 

1694 self.watching = False 

1695 self.command_stack = [] 

1696 self.scripts: Set[Script] = set() 

1697 self.explicit_transaction = False 

1698 

1699 def __enter__(self) -> "Pipeline": 

1700 return self 

1701 

1702 def __exit__(self, exc_type, exc_value, traceback): 

1703 self.reset() 

1704 

1705 def __del__(self): 

1706 try: 

1707 self.reset() 

1708 except Exception: 

1709 pass 

1710 

1711 def __len__(self) -> int: 

1712 return len(self.command_stack) 

1713 

1714 def __bool__(self) -> bool: 

1715 """Pipeline instances should always evaluate to True""" 

1716 return True 

1717 

1718 def reset(self) -> None: 

1719 self.command_stack = [] 

1720 self.scripts = set() 

1721 # make sure to reset the connection state in the event that we were 

1722 # watching something 

1723 if self.watching and self.connection: 

1724 try: 

1725 # call this manually since our unwatch or 

1726 # immediate_execute_command methods can call reset() 

1727 self.connection.send_command("UNWATCH") 

1728 self.connection.read_response() 

1729 except ConnectionError: 

1730 # disconnect will also remove any previous WATCHes 

1731 self.connection.disconnect() 

1732 # clean up the other instance attributes 

1733 self.watching = False 

1734 self.explicit_transaction = False 

1735 

1736 # we can safely return the connection to the pool here since we're 

1737 # sure we're no longer WATCHing anything 

1738 if self.connection: 

1739 self.connection_pool.release(self.connection) 

1740 self.connection = None 

1741 

1742 def close(self) -> None: 

1743 """Close the pipeline""" 

1744 self.reset() 

1745 

1746 def multi(self) -> None: 

1747 """ 

1748 Start a transactional block of the pipeline after WATCH commands 

1749 are issued. End the transactional block with `execute`. 

1750 """ 

1751 if self.explicit_transaction: 

1752 raise RedisError("Cannot issue nested calls to MULTI") 

1753 if self.command_stack: 

1754 raise RedisError( 

1755 "Commands without an initial WATCH have already been issued" 

1756 ) 

1757 self.explicit_transaction = True 

1758 

1759 def execute_command(self, *args, **kwargs): 

1760 if (self.watching or args[0] == "WATCH") and not self.explicit_transaction: 

1761 return self.immediate_execute_command(*args, **kwargs) 

1762 return self.pipeline_execute_command(*args, **kwargs) 

1763 

1764 def _disconnect_reset_raise_on_watching( 

1765 self, 

1766 conn: AbstractConnection, 

1767 error: Exception, 

1768 failure_count: Optional[int] = None, 

1769 start_time: Optional[float] = None, 

1770 command_name: Optional[str] = None, 

1771 ) -> None: 

1772 """ 

1773 Close the connection reset watching state and 

1774 raise an exception if we were watching. 

1775 

1776 The supported exceptions are already checked in the 

1777 retry object so we don't need to do it here. 

1778 

1779 After we disconnect the connection, it will try to reconnect and 

1780 do a health check as part of the send_command logic(on connection level). 

1781 """ 

1782 if error and failure_count <= conn.retry.get_retries(): 

1783 record_operation_duration( 

1784 command_name=command_name, 

1785 duration_seconds=time.monotonic() - start_time, 

1786 server_address=getattr(conn, "host", None), 

1787 server_port=getattr(conn, "port", None), 

1788 db_namespace=str(conn.db), 

1789 error=error, 

1790 retry_attempts=failure_count, 

1791 ) 

1792 conn.disconnect() 

1793 

1794 # if we were already watching a variable, the watch is no longer 

1795 # valid since this connection has died. raise a WatchError, which 

1796 # indicates the user should retry this transaction. 

1797 if self.watching: 

1798 self.reset() 

1799 raise WatchError( 

1800 f"A {type(error).__name__} occurred while watching one or more keys" 

1801 ) 

1802 

1803 def immediate_execute_command(self, *args, **options): 

1804 """ 

1805 Execute a command immediately, but don't auto-retry on the supported 

1806 errors for retry if we're already WATCHing a variable. 

1807 Used when issuing WATCH or subsequent commands retrieving their values but before 

1808 MULTI is called. 

1809 """ 

1810 command_name = args[0] 

1811 conn = self.connection 

1812 # if this is the first call, we need a connection 

1813 if not conn: 

1814 conn = self.connection_pool.get_connection() 

1815 self.connection = conn 

1816 

1817 # Start timing for observability 

1818 start_time = time.monotonic() 

1819 # Track actual retry attempts for error reporting 

1820 actual_retry_attempts = [0] 

1821 

1822 def failure_callback(error, failure_count): 

1823 if is_debug_log_enabled(): 

1824 add_debug_log_for_operation_failure(conn) 

1825 actual_retry_attempts[0] = failure_count 

1826 self._disconnect_reset_raise_on_watching( 

1827 conn, error, failure_count, start_time, command_name 

1828 ) 

1829 

1830 try: 

1831 response = conn.retry.call_with_retry( 

1832 lambda: self._send_command_parse_response( 

1833 conn, command_name, *args, **options 

1834 ), 

1835 failure_callback, 

1836 with_failure_count=True, 

1837 ) 

1838 

1839 record_operation_duration( 

1840 command_name=command_name, 

1841 duration_seconds=time.monotonic() - start_time, 

1842 server_address=getattr(conn, "host", None), 

1843 server_port=getattr(conn, "port", None), 

1844 db_namespace=str(conn.db), 

1845 ) 

1846 

1847 return response 

1848 except Exception as e: 

1849 record_error_count( 

1850 server_address=getattr(conn, "host", None), 

1851 server_port=getattr(conn, "port", None), 

1852 network_peer_address=getattr(conn, "host", None), 

1853 network_peer_port=getattr(conn, "port", None), 

1854 error_type=e, 

1855 retry_attempts=actual_retry_attempts[0], 

1856 is_internal=False, 

1857 ) 

1858 raise 

1859 

1860 def pipeline_execute_command(self, *args, **options) -> "Pipeline": 

1861 """ 

1862 Stage a command to be executed when execute() is next called 

1863 

1864 Returns the current Pipeline object back so commands can be 

1865 chained together, such as: 

1866 

1867 pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') 

1868 

1869 At some other point, you can then run: pipe.execute(), 

1870 which will execute all commands queued in the pipe. 

1871 """ 

1872 self.command_stack.append((args, options)) 

1873 return self 

1874 

1875 def _execute_transaction( 

1876 self, connection: Connection, commands, raise_on_error 

1877 ) -> List: 

1878 cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) 

1879 all_cmds = connection.pack_commands( 

1880 [args for args, options in cmds if EMPTY_RESPONSE not in options] 

1881 ) 

1882 connection.send_packed_command(all_cmds) 

1883 errors = [] 

1884 

1885 # parse off the response for MULTI 

1886 # NOTE: we need to handle ResponseErrors here and continue 

1887 # so that we read all the additional command messages from 

1888 # the socket 

1889 try: 

1890 self.parse_response(connection, "_") 

1891 except ResponseError as e: 

1892 errors.append((0, e)) 

1893 

1894 # and all the other commands 

1895 for i, command in enumerate(commands): 

1896 if EMPTY_RESPONSE in command[1]: 

1897 errors.append((i, command[1][EMPTY_RESPONSE])) 

1898 else: 

1899 try: 

1900 self.parse_response(connection, "_") 

1901 except ResponseError as e: 

1902 self.annotate_exception(e, i + 1, command[0]) 

1903 errors.append((i, e)) 

1904 

1905 # parse the EXEC. 

1906 try: 

1907 response = self.parse_response(connection, "_") 

1908 except ExecAbortError: 

1909 if errors: 

1910 raise errors[0][1] 

1911 raise 

1912 

1913 # EXEC clears any watched keys 

1914 self.watching = False 

1915 

1916 if response is None: 

1917 raise WatchError("Watched variable changed.") 

1918 

1919 # put any parse errors into the response 

1920 for i, e in errors: 

1921 response.insert(i, e) 

1922 

1923 if len(response) != len(commands): 

1924 self.connection.disconnect() 

1925 raise ResponseError( 

1926 "Wrong number of response items from pipeline execution" 

1927 ) 

1928 

1929 # find any errors in the response and raise if necessary 

1930 if raise_on_error: 

1931 self.raise_first_error(commands, response) 

1932 

1933 # We have to run response callbacks manually 

1934 data = [] 

1935 for r, cmd in zip(response, commands): 

1936 if not isinstance(r, Exception): 

1937 args, options = cmd 

1938 # Remove keys entry, it needs only for cache. 

1939 options.pop("keys", None) 

1940 command_name = args[0] 

1941 if command_name in self.response_callbacks: 

1942 r = self.response_callbacks[command_name](r, **options) 

1943 data.append(r) 

1944 

1945 return data 

1946 

1947 def _execute_pipeline(self, connection, commands, raise_on_error): 

1948 # build up all commands into a single request to increase network perf 

1949 all_cmds = connection.pack_commands([args for args, _ in commands]) 

1950 connection.send_packed_command(all_cmds) 

1951 

1952 responses = [] 

1953 for args, options in commands: 

1954 try: 

1955 responses.append(self.parse_response(connection, args[0], **options)) 

1956 except ResponseError as e: 

1957 responses.append(e) 

1958 

1959 if raise_on_error: 

1960 self.raise_first_error(commands, responses) 

1961 

1962 return responses 

1963 

1964 def raise_first_error(self, commands, response): 

1965 for i, r in enumerate(response): 

1966 if isinstance(r, ResponseError): 

1967 self.annotate_exception(r, i + 1, commands[i][0]) 

1968 raise r 

1969 

1970 def annotate_exception(self, exception, number, command): 

1971 cmd = " ".join(map(safe_str, command)) 

1972 msg = ( 

1973 f"Command # {number} ({truncate_text(cmd)}) of pipeline " 

1974 f"caused error: {exception.args[0]}" 

1975 ) 

1976 exception.args = (msg,) + exception.args[1:] 

1977 

1978 def parse_response(self, connection, command_name, **options): 

1979 result = Redis.parse_response(self, connection, command_name, **options) 

1980 if command_name in self.UNWATCH_COMMANDS: 

1981 self.watching = False 

1982 elif command_name == "WATCH": 

1983 self.watching = True 

1984 return result 

1985 

1986 def load_scripts(self): 

1987 # make sure all scripts that are about to be run on this pipeline exist 

1988 scripts = list(self.scripts) 

1989 immediate = self.immediate_execute_command 

1990 shas = [s.sha for s in scripts] 

1991 # we can't use the normal script_* methods because they would just 

1992 # get buffered in the pipeline. 

1993 exists = immediate("SCRIPT EXISTS", *shas) 

1994 if not all(exists): 

1995 for s, exist in zip(scripts, exists): 

1996 if not exist: 

1997 s.sha = immediate("SCRIPT LOAD", s.script) 

1998 

1999 def _disconnect_raise_on_watching( 

2000 self, 

2001 conn: AbstractConnection, 

2002 error: Exception, 

2003 failure_count: Optional[int] = None, 

2004 start_time: Optional[float] = None, 

2005 command_name: Optional[str] = None, 

2006 ) -> None: 

2007 """ 

2008 Close the connection, raise an exception if we were watching. 

2009 

2010 The supported exceptions are already checked in the 

2011 retry object so we don't need to do it here. 

2012 

2013 After we disconnect the connection, it will try to reconnect and 

2014 do a health check as part of the send_command logic(on connection level). 

2015 """ 

2016 if error and failure_count <= conn.retry.get_retries(): 

2017 record_operation_duration( 

2018 command_name=command_name, 

2019 duration_seconds=time.monotonic() - start_time, 

2020 server_address=getattr(conn, "host", None), 

2021 server_port=getattr(conn, "port", None), 

2022 db_namespace=str(conn.db), 

2023 error=error, 

2024 retry_attempts=failure_count, 

2025 ) 

2026 conn.disconnect() 

2027 # if we were watching a variable, the watch is no longer valid 

2028 # since this connection has died. raise a WatchError, which 

2029 # indicates the user should retry this transaction. 

2030 if self.watching: 

2031 raise WatchError( 

2032 f"A {type(error).__name__} occurred while watching one or more keys" 

2033 ) 

2034 

2035 def execute(self, raise_on_error: bool = True) -> List[Any]: 

2036 """Execute all the commands in the current pipeline""" 

2037 stack = self.command_stack 

2038 if not stack and not self.watching: 

2039 return [] 

2040 if self.scripts: 

2041 self.load_scripts() 

2042 if self.transaction or self.explicit_transaction: 

2043 execute = self._execute_transaction 

2044 operation_name = "MULTI" 

2045 else: 

2046 execute = self._execute_pipeline 

2047 operation_name = "PIPELINE" 

2048 

2049 conn = self.connection 

2050 if not conn: 

2051 conn = self.connection_pool.get_connection() 

2052 # assign to self.connection so reset() releases the connection 

2053 # back to the pool after we're done 

2054 self.connection = conn 

2055 

2056 # Start timing for observability 

2057 start_time = time.monotonic() 

2058 # Track actual retry attempts for error reporting 

2059 actual_retry_attempts = [0] 

2060 

2061 def failure_callback(error, failure_count): 

2062 if is_debug_log_enabled(): 

2063 add_debug_log_for_operation_failure(conn) 

2064 actual_retry_attempts[0] = failure_count 

2065 self._disconnect_raise_on_watching( 

2066 conn, error, failure_count, start_time, operation_name 

2067 ) 

2068 

2069 try: 

2070 response = conn.retry.call_with_retry( 

2071 lambda: execute(conn, stack, raise_on_error), 

2072 failure_callback, 

2073 with_failure_count=True, 

2074 ) 

2075 

2076 record_operation_duration( 

2077 command_name=operation_name, 

2078 duration_seconds=time.monotonic() - start_time, 

2079 server_address=getattr(conn, "host", None), 

2080 server_port=getattr(conn, "port", None), 

2081 db_namespace=str(conn.db), 

2082 ) 

2083 return response 

2084 except Exception as e: 

2085 record_error_count( 

2086 server_address=getattr(conn, "host", None), 

2087 server_port=getattr(conn, "port", None), 

2088 network_peer_address=getattr(conn, "host", None), 

2089 network_peer_port=getattr(conn, "port", None), 

2090 error_type=e, 

2091 retry_attempts=actual_retry_attempts[0], 

2092 is_internal=False, 

2093 ) 

2094 raise 

2095 

2096 finally: 

2097 # in reset() the connection is disconnected before returned to the pool if 

2098 # it is marked for reconnect. 

2099 self.reset() 

2100 

2101 def discard(self): 

2102 """ 

2103 Flushes all previously queued commands 

2104 See: https://redis.io/commands/DISCARD 

2105 """ 

2106 self.execute_command("DISCARD") 

2107 

2108 def watch(self, *names): 

2109 """Watches the values at keys ``names``""" 

2110 if self.explicit_transaction: 

2111 raise RedisError("Cannot issue a WATCH after a MULTI") 

2112 return self.execute_command("WATCH", *names) 

2113 

2114 def unwatch(self) -> bool: 

2115 """Unwatches all previously specified keys""" 

2116 return self.watching and self.execute_command("UNWATCH") or True