Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/asyncio/connection.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

697 statements  

1import asyncio 

2import copy 

3import enum 

4import inspect 

5import socket 

6import sys 

7import warnings 

8import weakref 

9from abc import abstractmethod 

10from itertools import chain 

11from types import MappingProxyType 

12from typing import ( 

13 Any, 

14 Callable, 

15 Iterable, 

16 List, 

17 Mapping, 

18 Optional, 

19 Protocol, 

20 Set, 

21 Tuple, 

22 Type, 

23 TypedDict, 

24 TypeVar, 

25 Union, 

26) 

27from urllib.parse import ParseResult, parse_qs, unquote, urlparse 

28 

29from ..utils import SSL_AVAILABLE 

30 

31if SSL_AVAILABLE: 

32 import ssl 

33 from ssl import SSLContext, TLSVersion, VerifyFlags 

34else: 

35 ssl = None 

36 TLSVersion = None 

37 SSLContext = None 

38 VerifyFlags = None 

39 

40from ..auth.token import TokenInterface 

41from ..event import AsyncAfterConnectionReleasedEvent, EventDispatcher 

42from ..utils import deprecated_args, format_error_message 

43 

44# the functionality is available in 3.11.x but has a major issue before 

45# 3.11.3. See https://github.com/redis/redis-py/issues/2633 

46if sys.version_info >= (3, 11, 3): 

47 from asyncio import timeout as async_timeout 

48else: 

49 from async_timeout import timeout as async_timeout 

50 

51from redis.asyncio.retry import Retry 

52from redis.backoff import NoBackoff 

53from redis.connection import DEFAULT_RESP_VERSION 

54from redis.credentials import CredentialProvider, UsernamePasswordCredentialProvider 

55from redis.exceptions import ( 

56 AuthenticationError, 

57 AuthenticationWrongNumberOfArgsError, 

58 ConnectionError, 

59 DataError, 

60 RedisError, 

61 ResponseError, 

62 TimeoutError, 

63) 

64from redis.typing import EncodableT 

65from redis.utils import HIREDIS_AVAILABLE, get_lib_version, str_if_bytes 

66 

67from .._parsers import ( 

68 BaseParser, 

69 Encoder, 

70 _AsyncHiredisParser, 

71 _AsyncRESP2Parser, 

72 _AsyncRESP3Parser, 

73) 

74 

75SYM_STAR = b"*" 

76SYM_DOLLAR = b"$" 

77SYM_CRLF = b"\r\n" 

78SYM_LF = b"\n" 

79SYM_EMPTY = b"" 

80 

81 

82class _Sentinel(enum.Enum): 

83 sentinel = object() 

84 

85 

86SENTINEL = _Sentinel.sentinel 

87 

88 

89DefaultParser: Type[Union[_AsyncRESP2Parser, _AsyncRESP3Parser, _AsyncHiredisParser]] 

90if HIREDIS_AVAILABLE: 

91 DefaultParser = _AsyncHiredisParser 

92else: 

93 DefaultParser = _AsyncRESP2Parser 

94 

95 

96class ConnectCallbackProtocol(Protocol): 

97 def __call__(self, connection: "AbstractConnection"): ... 

98 

99 

100class AsyncConnectCallbackProtocol(Protocol): 

101 async def __call__(self, connection: "AbstractConnection"): ... 

102 

103 

104ConnectCallbackT = Union[ConnectCallbackProtocol, AsyncConnectCallbackProtocol] 

105 

106 

107class AbstractConnection: 

108 """Manages communication to and from a Redis server""" 

109 

110 __slots__ = ( 

111 "db", 

112 "username", 

113 "client_name", 

114 "lib_name", 

115 "lib_version", 

116 "credential_provider", 

117 "password", 

118 "socket_timeout", 

119 "socket_connect_timeout", 

120 "redis_connect_func", 

121 "retry_on_timeout", 

122 "retry_on_error", 

123 "health_check_interval", 

124 "next_health_check", 

125 "last_active_at", 

126 "encoder", 

127 "ssl_context", 

128 "protocol", 

129 "_reader", 

130 "_writer", 

131 "_parser", 

132 "_connect_callbacks", 

133 "_buffer_cutoff", 

134 "_lock", 

135 "_socket_read_size", 

136 "__dict__", 

137 ) 

138 

139 def __init__( 

140 self, 

141 *, 

142 db: Union[str, int] = 0, 

143 password: Optional[str] = None, 

144 socket_timeout: Optional[float] = None, 

145 socket_connect_timeout: Optional[float] = None, 

146 retry_on_timeout: bool = False, 

147 retry_on_error: Union[list, _Sentinel] = SENTINEL, 

148 encoding: str = "utf-8", 

149 encoding_errors: str = "strict", 

150 decode_responses: bool = False, 

151 parser_class: Type[BaseParser] = DefaultParser, 

152 socket_read_size: int = 65536, 

153 health_check_interval: float = 0, 

154 client_name: Optional[str] = None, 

155 lib_name: Optional[str] = "redis-py", 

156 lib_version: Optional[str] = get_lib_version(), 

157 username: Optional[str] = None, 

158 retry: Optional[Retry] = None, 

159 redis_connect_func: Optional[ConnectCallbackT] = None, 

160 encoder_class: Type[Encoder] = Encoder, 

161 credential_provider: Optional[CredentialProvider] = None, 

162 protocol: Optional[int] = 2, 

163 event_dispatcher: Optional[EventDispatcher] = None, 

164 ): 

165 if (username or password) and credential_provider is not None: 

166 raise DataError( 

167 "'username' and 'password' cannot be passed along with 'credential_" 

168 "provider'. Please provide only one of the following arguments: \n" 

169 "1. 'password' and (optional) 'username'\n" 

170 "2. 'credential_provider'" 

171 ) 

172 if event_dispatcher is None: 

173 self._event_dispatcher = EventDispatcher() 

174 else: 

175 self._event_dispatcher = event_dispatcher 

176 self.db = db 

177 self.client_name = client_name 

178 self.lib_name = lib_name 

179 self.lib_version = lib_version 

180 self.credential_provider = credential_provider 

181 self.password = password 

182 self.username = username 

183 self.socket_timeout = socket_timeout 

184 if socket_connect_timeout is None: 

185 socket_connect_timeout = socket_timeout 

186 self.socket_connect_timeout = socket_connect_timeout 

187 self.retry_on_timeout = retry_on_timeout 

188 if retry_on_error is SENTINEL: 

189 retry_on_error = [] 

190 if retry_on_timeout: 

191 retry_on_error.append(TimeoutError) 

192 retry_on_error.append(socket.timeout) 

193 retry_on_error.append(asyncio.TimeoutError) 

194 self.retry_on_error = retry_on_error 

195 if retry or retry_on_error: 

196 if not retry: 

197 self.retry = Retry(NoBackoff(), 1) 

198 else: 

199 # deep-copy the Retry object as it is mutable 

200 self.retry = copy.deepcopy(retry) 

201 # Update the retry's supported errors with the specified errors 

202 self.retry.update_supported_errors(retry_on_error) 

203 else: 

204 self.retry = Retry(NoBackoff(), 0) 

205 self.health_check_interval = health_check_interval 

206 self.next_health_check: float = -1 

207 self.encoder = encoder_class(encoding, encoding_errors, decode_responses) 

208 self.redis_connect_func = redis_connect_func 

209 self._reader: Optional[asyncio.StreamReader] = None 

210 self._writer: Optional[asyncio.StreamWriter] = None 

211 self._socket_read_size = socket_read_size 

212 self.set_parser(parser_class) 

213 self._connect_callbacks: List[weakref.WeakMethod[ConnectCallbackT]] = [] 

214 self._buffer_cutoff = 6000 

215 self._re_auth_token: Optional[TokenInterface] = None 

216 self._should_reconnect = False 

217 

218 try: 

219 p = int(protocol) 

220 except TypeError: 

221 p = DEFAULT_RESP_VERSION 

222 except ValueError: 

223 raise ConnectionError("protocol must be an integer") 

224 finally: 

225 if p < 2 or p > 3: 

226 raise ConnectionError("protocol must be either 2 or 3") 

227 self.protocol = protocol 

228 

229 def __del__(self, _warnings: Any = warnings): 

230 # For some reason, the individual streams don't get properly garbage 

231 # collected and therefore produce no resource warnings. We add one 

232 # here, in the same style as those from the stdlib. 

233 if getattr(self, "_writer", None): 

234 _warnings.warn( 

235 f"unclosed Connection {self!r}", ResourceWarning, source=self 

236 ) 

237 

238 try: 

239 asyncio.get_running_loop() 

240 self._close() 

241 except RuntimeError: 

242 # No actions been taken if pool already closed. 

243 pass 

244 

245 def _close(self): 

246 """ 

247 Internal method to silently close the connection without waiting 

248 """ 

249 if self._writer: 

250 self._writer.close() 

251 self._writer = self._reader = None 

252 

253 def __repr__(self): 

254 repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces())) 

255 return f"<{self.__class__.__module__}.{self.__class__.__name__}({repr_args})>" 

256 

257 @abstractmethod 

258 def repr_pieces(self): 

259 pass 

260 

261 @property 

262 def is_connected(self): 

263 return self._reader is not None and self._writer is not None 

264 

265 def register_connect_callback(self, callback): 

266 """ 

267 Register a callback to be called when the connection is established either 

268 initially or reconnected. This allows listeners to issue commands that 

269 are ephemeral to the connection, for example pub/sub subscription or 

270 key tracking. The callback must be a _method_ and will be kept as 

271 a weak reference. 

272 """ 

273 wm = weakref.WeakMethod(callback) 

274 if wm not in self._connect_callbacks: 

275 self._connect_callbacks.append(wm) 

276 

277 def deregister_connect_callback(self, callback): 

278 """ 

279 De-register a previously registered callback. It will no-longer receive 

280 notifications on connection events. Calling this is not required when the 

281 listener goes away, since the callbacks are kept as weak methods. 

282 """ 

283 try: 

284 self._connect_callbacks.remove(weakref.WeakMethod(callback)) 

285 except ValueError: 

286 pass 

287 

288 def set_parser(self, parser_class: Type[BaseParser]) -> None: 

289 """ 

290 Creates a new instance of parser_class with socket size: 

291 _socket_read_size and assigns it to the parser for the connection 

292 :param parser_class: The required parser class 

293 """ 

294 self._parser = parser_class(socket_read_size=self._socket_read_size) 

295 

296 async def connect(self): 

297 """Connects to the Redis server if not already connected""" 

298 await self.connect_check_health(check_health=True) 

299 

300 async def connect_check_health( 

301 self, check_health: bool = True, retry_socket_connect: bool = True 

302 ): 

303 if self.is_connected: 

304 return 

305 try: 

306 if retry_socket_connect: 

307 await self.retry.call_with_retry( 

308 lambda: self._connect(), lambda error: self.disconnect() 

309 ) 

310 else: 

311 await self._connect() 

312 except asyncio.CancelledError: 

313 raise # in 3.7 and earlier, this is an Exception, not BaseException 

314 except (socket.timeout, asyncio.TimeoutError): 

315 raise TimeoutError("Timeout connecting to server") 

316 except OSError as e: 

317 raise ConnectionError(self._error_message(e)) 

318 except Exception as exc: 

319 raise ConnectionError(exc) from exc 

320 

321 try: 

322 if not self.redis_connect_func: 

323 # Use the default on_connect function 

324 await self.on_connect_check_health(check_health=check_health) 

325 else: 

326 # Use the passed function redis_connect_func 

327 ( 

328 await self.redis_connect_func(self) 

329 if asyncio.iscoroutinefunction(self.redis_connect_func) 

330 else self.redis_connect_func(self) 

331 ) 

332 except RedisError: 

333 # clean up after any error in on_connect 

334 await self.disconnect() 

335 raise 

336 

337 # run any user callbacks. right now the only internal callback 

338 # is for pubsub channel/pattern resubscription 

339 # first, remove any dead weakrefs 

340 self._connect_callbacks = [ref for ref in self._connect_callbacks if ref()] 

341 for ref in self._connect_callbacks: 

342 callback = ref() 

343 task = callback(self) 

344 if task and inspect.isawaitable(task): 

345 await task 

346 

347 def mark_for_reconnect(self): 

348 self._should_reconnect = True 

349 

350 def should_reconnect(self): 

351 return self._should_reconnect 

352 

353 @abstractmethod 

354 async def _connect(self): 

355 pass 

356 

357 @abstractmethod 

358 def _host_error(self) -> str: 

359 pass 

360 

361 def _error_message(self, exception: BaseException) -> str: 

362 return format_error_message(self._host_error(), exception) 

363 

364 def get_protocol(self): 

365 return self.protocol 

366 

367 async def on_connect(self) -> None: 

368 """Initialize the connection, authenticate and select a database""" 

369 await self.on_connect_check_health(check_health=True) 

370 

371 async def on_connect_check_health(self, check_health: bool = True) -> None: 

372 self._parser.on_connect(self) 

373 parser = self._parser 

374 

375 auth_args = None 

376 # if credential provider or username and/or password are set, authenticate 

377 if self.credential_provider or (self.username or self.password): 

378 cred_provider = ( 

379 self.credential_provider 

380 or UsernamePasswordCredentialProvider(self.username, self.password) 

381 ) 

382 auth_args = await cred_provider.get_credentials_async() 

383 

384 # if resp version is specified and we have auth args, 

385 # we need to send them via HELLO 

386 if auth_args and self.protocol not in [2, "2"]: 

387 if isinstance(self._parser, _AsyncRESP2Parser): 

388 self.set_parser(_AsyncRESP3Parser) 

389 # update cluster exception classes 

390 self._parser.EXCEPTION_CLASSES = parser.EXCEPTION_CLASSES 

391 self._parser.on_connect(self) 

392 if len(auth_args) == 1: 

393 auth_args = ["default", auth_args[0]] 

394 # avoid checking health here -- PING will fail if we try 

395 # to check the health prior to the AUTH 

396 await self.send_command( 

397 "HELLO", self.protocol, "AUTH", *auth_args, check_health=False 

398 ) 

399 response = await self.read_response() 

400 if response.get(b"proto") != int(self.protocol) and response.get( 

401 "proto" 

402 ) != int(self.protocol): 

403 raise ConnectionError("Invalid RESP version") 

404 # avoid checking health here -- PING will fail if we try 

405 # to check the health prior to the AUTH 

406 elif auth_args: 

407 await self.send_command("AUTH", *auth_args, check_health=False) 

408 

409 try: 

410 auth_response = await self.read_response() 

411 except AuthenticationWrongNumberOfArgsError: 

412 # a username and password were specified but the Redis 

413 # server seems to be < 6.0.0 which expects a single password 

414 # arg. retry auth with just the password. 

415 # https://github.com/andymccurdy/redis-py/issues/1274 

416 await self.send_command("AUTH", auth_args[-1], check_health=False) 

417 auth_response = await self.read_response() 

418 

419 if str_if_bytes(auth_response) != "OK": 

420 raise AuthenticationError("Invalid Username or Password") 

421 

422 # if resp version is specified, switch to it 

423 elif self.protocol not in [2, "2"]: 

424 if isinstance(self._parser, _AsyncRESP2Parser): 

425 self.set_parser(_AsyncRESP3Parser) 

426 # update cluster exception classes 

427 self._parser.EXCEPTION_CLASSES = parser.EXCEPTION_CLASSES 

428 self._parser.on_connect(self) 

429 await self.send_command("HELLO", self.protocol, check_health=check_health) 

430 response = await self.read_response() 

431 # if response.get(b"proto") != self.protocol and response.get( 

432 # "proto" 

433 # ) != self.protocol: 

434 # raise ConnectionError("Invalid RESP version") 

435 

436 # if a client_name is given, set it 

437 if self.client_name: 

438 await self.send_command( 

439 "CLIENT", 

440 "SETNAME", 

441 self.client_name, 

442 check_health=check_health, 

443 ) 

444 if str_if_bytes(await self.read_response()) != "OK": 

445 raise ConnectionError("Error setting client name") 

446 

447 # set the library name and version, pipeline for lower startup latency 

448 if self.lib_name: 

449 await self.send_command( 

450 "CLIENT", 

451 "SETINFO", 

452 "LIB-NAME", 

453 self.lib_name, 

454 check_health=check_health, 

455 ) 

456 if self.lib_version: 

457 await self.send_command( 

458 "CLIENT", 

459 "SETINFO", 

460 "LIB-VER", 

461 self.lib_version, 

462 check_health=check_health, 

463 ) 

464 # if a database is specified, switch to it. Also pipeline this 

465 if self.db: 

466 await self.send_command("SELECT", self.db, check_health=check_health) 

467 

468 # read responses from pipeline 

469 for _ in (sent for sent in (self.lib_name, self.lib_version) if sent): 

470 try: 

471 await self.read_response() 

472 except ResponseError: 

473 pass 

474 

475 if self.db: 

476 if str_if_bytes(await self.read_response()) != "OK": 

477 raise ConnectionError("Invalid Database") 

478 

479 async def disconnect(self, nowait: bool = False) -> None: 

480 """Disconnects from the Redis server""" 

481 try: 

482 async with async_timeout(self.socket_connect_timeout): 

483 self._parser.on_disconnect() 

484 if not self.is_connected: 

485 return 

486 try: 

487 self._writer.close() # type: ignore[union-attr] 

488 # wait for close to finish, except when handling errors and 

489 # forcefully disconnecting. 

490 if not nowait: 

491 await self._writer.wait_closed() # type: ignore[union-attr] 

492 except OSError: 

493 pass 

494 finally: 

495 self._reader = None 

496 self._writer = None 

497 except asyncio.TimeoutError: 

498 raise TimeoutError( 

499 f"Timed out closing connection after {self.socket_connect_timeout}" 

500 ) from None 

501 

502 async def _send_ping(self): 

503 """Send PING, expect PONG in return""" 

504 await self.send_command("PING", check_health=False) 

505 if str_if_bytes(await self.read_response()) != "PONG": 

506 raise ConnectionError("Bad response from PING health check") 

507 

508 async def _ping_failed(self, error): 

509 """Function to call when PING fails""" 

510 await self.disconnect() 

511 

512 async def check_health(self): 

513 """Check the health of the connection with a PING/PONG""" 

514 if ( 

515 self.health_check_interval 

516 and asyncio.get_running_loop().time() > self.next_health_check 

517 ): 

518 await self.retry.call_with_retry(self._send_ping, self._ping_failed) 

519 

520 async def _send_packed_command(self, command: Iterable[bytes]) -> None: 

521 self._writer.writelines(command) 

522 await self._writer.drain() 

523 

524 async def send_packed_command( 

525 self, command: Union[bytes, str, Iterable[bytes]], check_health: bool = True 

526 ) -> None: 

527 if not self.is_connected: 

528 await self.connect_check_health(check_health=False) 

529 if check_health: 

530 await self.check_health() 

531 

532 try: 

533 if isinstance(command, str): 

534 command = command.encode() 

535 if isinstance(command, bytes): 

536 command = [command] 

537 if self.socket_timeout: 

538 await asyncio.wait_for( 

539 self._send_packed_command(command), self.socket_timeout 

540 ) 

541 else: 

542 self._writer.writelines(command) 

543 await self._writer.drain() 

544 except asyncio.TimeoutError: 

545 await self.disconnect(nowait=True) 

546 raise TimeoutError("Timeout writing to socket") from None 

547 except OSError as e: 

548 await self.disconnect(nowait=True) 

549 if len(e.args) == 1: 

550 err_no, errmsg = "UNKNOWN", e.args[0] 

551 else: 

552 err_no = e.args[0] 

553 errmsg = e.args[1] 

554 raise ConnectionError( 

555 f"Error {err_no} while writing to socket. {errmsg}." 

556 ) from e 

557 except BaseException: 

558 # BaseExceptions can be raised when a socket send operation is not 

559 # finished, e.g. due to a timeout. Ideally, a caller could then re-try 

560 # to send un-sent data. However, the send_packed_command() API 

561 # does not support it so there is no point in keeping the connection open. 

562 await self.disconnect(nowait=True) 

563 raise 

564 

565 async def send_command(self, *args: Any, **kwargs: Any) -> None: 

566 """Pack and send a command to the Redis server""" 

567 await self.send_packed_command( 

568 self.pack_command(*args), check_health=kwargs.get("check_health", True) 

569 ) 

570 

571 async def can_read_destructive(self): 

572 """Poll the socket to see if there's data that can be read.""" 

573 try: 

574 return await self._parser.can_read_destructive() 

575 except OSError as e: 

576 await self.disconnect(nowait=True) 

577 host_error = self._host_error() 

578 raise ConnectionError(f"Error while reading from {host_error}: {e.args}") 

579 

580 async def read_response( 

581 self, 

582 disable_decoding: bool = False, 

583 timeout: Optional[float] = None, 

584 *, 

585 disconnect_on_error: bool = True, 

586 push_request: Optional[bool] = False, 

587 ): 

588 """Read the response from a previously sent command""" 

589 read_timeout = timeout if timeout is not None else self.socket_timeout 

590 host_error = self._host_error() 

591 try: 

592 if read_timeout is not None and self.protocol in ["3", 3]: 

593 async with async_timeout(read_timeout): 

594 response = await self._parser.read_response( 

595 disable_decoding=disable_decoding, push_request=push_request 

596 ) 

597 elif read_timeout is not None: 

598 async with async_timeout(read_timeout): 

599 response = await self._parser.read_response( 

600 disable_decoding=disable_decoding 

601 ) 

602 elif self.protocol in ["3", 3]: 

603 response = await self._parser.read_response( 

604 disable_decoding=disable_decoding, push_request=push_request 

605 ) 

606 else: 

607 response = await self._parser.read_response( 

608 disable_decoding=disable_decoding 

609 ) 

610 except asyncio.TimeoutError: 

611 if timeout is not None: 

612 # user requested timeout, return None. Operation can be retried 

613 return None 

614 # it was a self.socket_timeout error. 

615 if disconnect_on_error: 

616 await self.disconnect(nowait=True) 

617 raise TimeoutError(f"Timeout reading from {host_error}") 

618 except OSError as e: 

619 if disconnect_on_error: 

620 await self.disconnect(nowait=True) 

621 raise ConnectionError(f"Error while reading from {host_error} : {e.args}") 

622 except BaseException: 

623 # Also by default close in case of BaseException. A lot of code 

624 # relies on this behaviour when doing Command/Response pairs. 

625 # See #1128. 

626 if disconnect_on_error: 

627 await self.disconnect(nowait=True) 

628 raise 

629 

630 if self.health_check_interval: 

631 next_time = asyncio.get_running_loop().time() + self.health_check_interval 

632 self.next_health_check = next_time 

633 

634 if isinstance(response, ResponseError): 

635 raise response from None 

636 return response 

637 

638 def pack_command(self, *args: EncodableT) -> List[bytes]: 

639 """Pack a series of arguments into the Redis protocol""" 

640 output = [] 

641 # the client might have included 1 or more literal arguments in 

642 # the command name, e.g., 'CONFIG GET'. The Redis server expects these 

643 # arguments to be sent separately, so split the first argument 

644 # manually. These arguments should be bytestrings so that they are 

645 # not encoded. 

646 assert not isinstance(args[0], float) 

647 if isinstance(args[0], str): 

648 args = tuple(args[0].encode().split()) + args[1:] 

649 elif b" " in args[0]: 

650 args = tuple(args[0].split()) + args[1:] 

651 

652 buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF)) 

653 

654 buffer_cutoff = self._buffer_cutoff 

655 for arg in map(self.encoder.encode, args): 

656 # to avoid large string mallocs, chunk the command into the 

657 # output list if we're sending large values or memoryviews 

658 arg_length = len(arg) 

659 if ( 

660 len(buff) > buffer_cutoff 

661 or arg_length > buffer_cutoff 

662 or isinstance(arg, memoryview) 

663 ): 

664 buff = SYM_EMPTY.join( 

665 (buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF) 

666 ) 

667 output.append(buff) 

668 output.append(arg) 

669 buff = SYM_CRLF 

670 else: 

671 buff = SYM_EMPTY.join( 

672 ( 

673 buff, 

674 SYM_DOLLAR, 

675 str(arg_length).encode(), 

676 SYM_CRLF, 

677 arg, 

678 SYM_CRLF, 

679 ) 

680 ) 

681 output.append(buff) 

682 return output 

683 

684 def pack_commands(self, commands: Iterable[Iterable[EncodableT]]) -> List[bytes]: 

685 """Pack multiple commands into the Redis protocol""" 

686 output: List[bytes] = [] 

687 pieces: List[bytes] = [] 

688 buffer_length = 0 

689 buffer_cutoff = self._buffer_cutoff 

690 

691 for cmd in commands: 

692 for chunk in self.pack_command(*cmd): 

693 chunklen = len(chunk) 

694 if ( 

695 buffer_length > buffer_cutoff 

696 or chunklen > buffer_cutoff 

697 or isinstance(chunk, memoryview) 

698 ): 

699 if pieces: 

700 output.append(SYM_EMPTY.join(pieces)) 

701 buffer_length = 0 

702 pieces = [] 

703 

704 if chunklen > buffer_cutoff or isinstance(chunk, memoryview): 

705 output.append(chunk) 

706 else: 

707 pieces.append(chunk) 

708 buffer_length += chunklen 

709 

710 if pieces: 

711 output.append(SYM_EMPTY.join(pieces)) 

712 return output 

713 

714 def _socket_is_empty(self): 

715 """Check if the socket is empty""" 

716 return len(self._reader._buffer) == 0 

717 

718 async def process_invalidation_messages(self): 

719 while not self._socket_is_empty(): 

720 await self.read_response(push_request=True) 

721 

722 def set_re_auth_token(self, token: TokenInterface): 

723 self._re_auth_token = token 

724 

725 async def re_auth(self): 

726 if self._re_auth_token is not None: 

727 await self.send_command( 

728 "AUTH", 

729 self._re_auth_token.try_get("oid"), 

730 self._re_auth_token.get_value(), 

731 ) 

732 await self.read_response() 

733 self._re_auth_token = None 

734 

735 

736class Connection(AbstractConnection): 

737 "Manages TCP communication to and from a Redis server" 

738 

739 def __init__( 

740 self, 

741 *, 

742 host: str = "localhost", 

743 port: Union[str, int] = 6379, 

744 socket_keepalive: bool = False, 

745 socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, 

746 socket_type: int = 0, 

747 **kwargs, 

748 ): 

749 self.host = host 

750 self.port = int(port) 

751 self.socket_keepalive = socket_keepalive 

752 self.socket_keepalive_options = socket_keepalive_options or {} 

753 self.socket_type = socket_type 

754 super().__init__(**kwargs) 

755 

756 def repr_pieces(self): 

757 pieces = [("host", self.host), ("port", self.port), ("db", self.db)] 

758 if self.client_name: 

759 pieces.append(("client_name", self.client_name)) 

760 return pieces 

761 

762 def _connection_arguments(self) -> Mapping: 

763 return {"host": self.host, "port": self.port} 

764 

765 async def _connect(self): 

766 """Create a TCP socket connection""" 

767 async with async_timeout(self.socket_connect_timeout): 

768 reader, writer = await asyncio.open_connection( 

769 **self._connection_arguments() 

770 ) 

771 self._reader = reader 

772 self._writer = writer 

773 sock = writer.transport.get_extra_info("socket") 

774 if sock: 

775 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

776 try: 

777 # TCP_KEEPALIVE 

778 if self.socket_keepalive: 

779 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 

780 for k, v in self.socket_keepalive_options.items(): 

781 sock.setsockopt(socket.SOL_TCP, k, v) 

782 

783 except (OSError, TypeError): 

784 # `socket_keepalive_options` might contain invalid options 

785 # causing an error. Do not leave the connection open. 

786 writer.close() 

787 raise 

788 

789 def _host_error(self) -> str: 

790 return f"{self.host}:{self.port}" 

791 

792 

793class SSLConnection(Connection): 

794 """Manages SSL connections to and from the Redis server(s). 

795 This class extends the Connection class, adding SSL functionality, and making 

796 use of ssl.SSLContext (https://docs.python.org/3/library/ssl.html#ssl.SSLContext) 

797 """ 

798 

799 def __init__( 

800 self, 

801 ssl_keyfile: Optional[str] = None, 

802 ssl_certfile: Optional[str] = None, 

803 ssl_cert_reqs: Union[str, ssl.VerifyMode] = "required", 

804 ssl_include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

805 ssl_exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

806 ssl_ca_certs: Optional[str] = None, 

807 ssl_ca_data: Optional[str] = None, 

808 ssl_check_hostname: bool = True, 

809 ssl_min_version: Optional[TLSVersion] = None, 

810 ssl_ciphers: Optional[str] = None, 

811 **kwargs, 

812 ): 

813 if not SSL_AVAILABLE: 

814 raise RedisError("Python wasn't built with SSL support") 

815 

816 self.ssl_context: RedisSSLContext = RedisSSLContext( 

817 keyfile=ssl_keyfile, 

818 certfile=ssl_certfile, 

819 cert_reqs=ssl_cert_reqs, 

820 include_verify_flags=ssl_include_verify_flags, 

821 exclude_verify_flags=ssl_exclude_verify_flags, 

822 ca_certs=ssl_ca_certs, 

823 ca_data=ssl_ca_data, 

824 check_hostname=ssl_check_hostname, 

825 min_version=ssl_min_version, 

826 ciphers=ssl_ciphers, 

827 ) 

828 super().__init__(**kwargs) 

829 

830 def _connection_arguments(self) -> Mapping: 

831 kwargs = super()._connection_arguments() 

832 kwargs["ssl"] = self.ssl_context.get() 

833 return kwargs 

834 

835 @property 

836 def keyfile(self): 

837 return self.ssl_context.keyfile 

838 

839 @property 

840 def certfile(self): 

841 return self.ssl_context.certfile 

842 

843 @property 

844 def cert_reqs(self): 

845 return self.ssl_context.cert_reqs 

846 

847 @property 

848 def include_verify_flags(self): 

849 return self.ssl_context.include_verify_flags 

850 

851 @property 

852 def exclude_verify_flags(self): 

853 return self.ssl_context.exclude_verify_flags 

854 

855 @property 

856 def ca_certs(self): 

857 return self.ssl_context.ca_certs 

858 

859 @property 

860 def ca_data(self): 

861 return self.ssl_context.ca_data 

862 

863 @property 

864 def check_hostname(self): 

865 return self.ssl_context.check_hostname 

866 

867 @property 

868 def min_version(self): 

869 return self.ssl_context.min_version 

870 

871 

872class RedisSSLContext: 

873 __slots__ = ( 

874 "keyfile", 

875 "certfile", 

876 "cert_reqs", 

877 "include_verify_flags", 

878 "exclude_verify_flags", 

879 "ca_certs", 

880 "ca_data", 

881 "context", 

882 "check_hostname", 

883 "min_version", 

884 "ciphers", 

885 ) 

886 

887 def __init__( 

888 self, 

889 keyfile: Optional[str] = None, 

890 certfile: Optional[str] = None, 

891 cert_reqs: Optional[Union[str, ssl.VerifyMode]] = None, 

892 include_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

893 exclude_verify_flags: Optional[List["ssl.VerifyFlags"]] = None, 

894 ca_certs: Optional[str] = None, 

895 ca_data: Optional[str] = None, 

896 check_hostname: bool = False, 

897 min_version: Optional[TLSVersion] = None, 

898 ciphers: Optional[str] = None, 

899 ): 

900 if not SSL_AVAILABLE: 

901 raise RedisError("Python wasn't built with SSL support") 

902 

903 self.keyfile = keyfile 

904 self.certfile = certfile 

905 if cert_reqs is None: 

906 cert_reqs = ssl.CERT_NONE 

907 elif isinstance(cert_reqs, str): 

908 CERT_REQS = { # noqa: N806 

909 "none": ssl.CERT_NONE, 

910 "optional": ssl.CERT_OPTIONAL, 

911 "required": ssl.CERT_REQUIRED, 

912 } 

913 if cert_reqs not in CERT_REQS: 

914 raise RedisError( 

915 f"Invalid SSL Certificate Requirements Flag: {cert_reqs}" 

916 ) 

917 cert_reqs = CERT_REQS[cert_reqs] 

918 self.cert_reqs = cert_reqs 

919 self.include_verify_flags = include_verify_flags 

920 self.exclude_verify_flags = exclude_verify_flags 

921 self.ca_certs = ca_certs 

922 self.ca_data = ca_data 

923 self.check_hostname = ( 

924 check_hostname if self.cert_reqs != ssl.CERT_NONE else False 

925 ) 

926 self.min_version = min_version 

927 self.ciphers = ciphers 

928 self.context: Optional[SSLContext] = None 

929 

930 def get(self) -> SSLContext: 

931 if not self.context: 

932 context = ssl.create_default_context() 

933 context.check_hostname = self.check_hostname 

934 context.verify_mode = self.cert_reqs 

935 if self.include_verify_flags: 

936 for flag in self.include_verify_flags: 

937 context.verify_flags |= flag 

938 if self.exclude_verify_flags: 

939 for flag in self.exclude_verify_flags: 

940 context.verify_flags &= ~flag 

941 if self.certfile and self.keyfile: 

942 context.load_cert_chain(certfile=self.certfile, keyfile=self.keyfile) 

943 if self.ca_certs or self.ca_data: 

944 context.load_verify_locations(cafile=self.ca_certs, cadata=self.ca_data) 

945 if self.min_version is not None: 

946 context.minimum_version = self.min_version 

947 if self.ciphers is not None: 

948 context.set_ciphers(self.ciphers) 

949 self.context = context 

950 return self.context 

951 

952 

953class UnixDomainSocketConnection(AbstractConnection): 

954 "Manages UDS communication to and from a Redis server" 

955 

956 def __init__(self, *, path: str = "", **kwargs): 

957 self.path = path 

958 super().__init__(**kwargs) 

959 

960 def repr_pieces(self) -> Iterable[Tuple[str, Union[str, int]]]: 

961 pieces = [("path", self.path), ("db", self.db)] 

962 if self.client_name: 

963 pieces.append(("client_name", self.client_name)) 

964 return pieces 

965 

966 async def _connect(self): 

967 async with async_timeout(self.socket_connect_timeout): 

968 reader, writer = await asyncio.open_unix_connection(path=self.path) 

969 self._reader = reader 

970 self._writer = writer 

971 await self.on_connect() 

972 

973 def _host_error(self) -> str: 

974 return self.path 

975 

976 

977FALSE_STRINGS = ("0", "F", "FALSE", "N", "NO") 

978 

979 

980def to_bool(value) -> Optional[bool]: 

981 if value is None or value == "": 

982 return None 

983 if isinstance(value, str) and value.upper() in FALSE_STRINGS: 

984 return False 

985 return bool(value) 

986 

987 

988def parse_ssl_verify_flags(value): 

989 # flags are passed in as a string representation of a list, 

990 # e.g. VERIFY_X509_STRICT, VERIFY_X509_PARTIAL_CHAIN 

991 verify_flags_str = value.replace("[", "").replace("]", "") 

992 

993 verify_flags = [] 

994 for flag in verify_flags_str.split(","): 

995 flag = flag.strip() 

996 if not hasattr(VerifyFlags, flag): 

997 raise ValueError(f"Invalid ssl verify flag: {flag}") 

998 verify_flags.append(getattr(VerifyFlags, flag)) 

999 return verify_flags 

1000 

1001 

1002URL_QUERY_ARGUMENT_PARSERS: Mapping[str, Callable[..., object]] = MappingProxyType( 

1003 { 

1004 "db": int, 

1005 "socket_timeout": float, 

1006 "socket_connect_timeout": float, 

1007 "socket_keepalive": to_bool, 

1008 "retry_on_timeout": to_bool, 

1009 "max_connections": int, 

1010 "health_check_interval": int, 

1011 "ssl_check_hostname": to_bool, 

1012 "ssl_include_verify_flags": parse_ssl_verify_flags, 

1013 "ssl_exclude_verify_flags": parse_ssl_verify_flags, 

1014 "timeout": float, 

1015 } 

1016) 

1017 

1018 

1019class ConnectKwargs(TypedDict, total=False): 

1020 username: str 

1021 password: str 

1022 connection_class: Type[AbstractConnection] 

1023 host: str 

1024 port: int 

1025 db: int 

1026 path: str 

1027 

1028 

1029def parse_url(url: str) -> ConnectKwargs: 

1030 parsed: ParseResult = urlparse(url) 

1031 kwargs: ConnectKwargs = {} 

1032 

1033 for name, value_list in parse_qs(parsed.query).items(): 

1034 if value_list and len(value_list) > 0: 

1035 value = unquote(value_list[0]) 

1036 parser = URL_QUERY_ARGUMENT_PARSERS.get(name) 

1037 if parser: 

1038 try: 

1039 kwargs[name] = parser(value) 

1040 except (TypeError, ValueError): 

1041 raise ValueError(f"Invalid value for '{name}' in connection URL.") 

1042 else: 

1043 kwargs[name] = value 

1044 

1045 if parsed.username: 

1046 kwargs["username"] = unquote(parsed.username) 

1047 if parsed.password: 

1048 kwargs["password"] = unquote(parsed.password) 

1049 

1050 # We only support redis://, rediss:// and unix:// schemes. 

1051 if parsed.scheme == "unix": 

1052 if parsed.path: 

1053 kwargs["path"] = unquote(parsed.path) 

1054 kwargs["connection_class"] = UnixDomainSocketConnection 

1055 

1056 elif parsed.scheme in ("redis", "rediss"): 

1057 if parsed.hostname: 

1058 kwargs["host"] = unquote(parsed.hostname) 

1059 if parsed.port: 

1060 kwargs["port"] = int(parsed.port) 

1061 

1062 # If there's a path argument, use it as the db argument if a 

1063 # querystring value wasn't specified 

1064 if parsed.path and "db" not in kwargs: 

1065 try: 

1066 kwargs["db"] = int(unquote(parsed.path).replace("/", "")) 

1067 except (AttributeError, ValueError): 

1068 pass 

1069 

1070 if parsed.scheme == "rediss": 

1071 kwargs["connection_class"] = SSLConnection 

1072 

1073 else: 

1074 valid_schemes = "redis://, rediss://, unix://" 

1075 raise ValueError( 

1076 f"Redis URL must specify one of the following schemes ({valid_schemes})" 

1077 ) 

1078 

1079 return kwargs 

1080 

1081 

1082_CP = TypeVar("_CP", bound="ConnectionPool") 

1083 

1084 

1085class ConnectionPool: 

1086 """ 

1087 Create a connection pool. ``If max_connections`` is set, then this 

1088 object raises :py:class:`~redis.ConnectionError` when the pool's 

1089 limit is reached. 

1090 

1091 By default, TCP connections are created unless ``connection_class`` 

1092 is specified. Use :py:class:`~redis.UnixDomainSocketConnection` for 

1093 unix sockets. 

1094 :py:class:`~redis.SSLConnection` can be used for SSL enabled connections. 

1095 

1096 Any additional keyword arguments are passed to the constructor of 

1097 ``connection_class``. 

1098 """ 

1099 

1100 @classmethod 

1101 def from_url(cls: Type[_CP], url: str, **kwargs) -> _CP: 

1102 """ 

1103 Return a connection pool configured from the given URL. 

1104 

1105 For example:: 

1106 

1107 redis://[[username]:[password]]@localhost:6379/0 

1108 rediss://[[username]:[password]]@localhost:6379/0 

1109 unix://[username@]/path/to/socket.sock?db=0[&password=password] 

1110 

1111 Three URL schemes are supported: 

1112 

1113 - `redis://` creates a TCP socket connection. See more at: 

1114 <https://www.iana.org/assignments/uri-schemes/prov/redis> 

1115 - `rediss://` creates a SSL wrapped TCP socket connection. See more at: 

1116 <https://www.iana.org/assignments/uri-schemes/prov/rediss> 

1117 - ``unix://``: creates a Unix Domain Socket connection. 

1118 

1119 The username, password, hostname, path and all querystring values 

1120 are passed through urllib.parse.unquote in order to replace any 

1121 percent-encoded values with their corresponding characters. 

1122 

1123 There are several ways to specify a database number. The first value 

1124 found will be used: 

1125 

1126 1. A ``db`` querystring option, e.g. redis://localhost?db=0 

1127 

1128 2. If using the redis:// or rediss:// schemes, the path argument 

1129 of the url, e.g. redis://localhost/0 

1130 

1131 3. A ``db`` keyword argument to this function. 

1132 

1133 If none of these options are specified, the default db=0 is used. 

1134 

1135 All querystring options are cast to their appropriate Python types. 

1136 Boolean arguments can be specified with string values "True"/"False" 

1137 or "Yes"/"No". Values that cannot be properly cast cause a 

1138 ``ValueError`` to be raised. Once parsed, the querystring arguments 

1139 and keyword arguments are passed to the ``ConnectionPool``'s 

1140 class initializer. In the case of conflicting arguments, querystring 

1141 arguments always win. 

1142 """ 

1143 url_options = parse_url(url) 

1144 kwargs.update(url_options) 

1145 return cls(**kwargs) 

1146 

1147 def __init__( 

1148 self, 

1149 connection_class: Type[AbstractConnection] = Connection, 

1150 max_connections: Optional[int] = None, 

1151 **connection_kwargs, 

1152 ): 

1153 max_connections = max_connections or 2**31 

1154 if not isinstance(max_connections, int) or max_connections < 0: 

1155 raise ValueError('"max_connections" must be a positive integer') 

1156 

1157 self.connection_class = connection_class 

1158 self.connection_kwargs = connection_kwargs 

1159 self.max_connections = max_connections 

1160 

1161 self._available_connections: List[AbstractConnection] = [] 

1162 self._in_use_connections: Set[AbstractConnection] = set() 

1163 self.encoder_class = self.connection_kwargs.get("encoder_class", Encoder) 

1164 self._lock = asyncio.Lock() 

1165 self._event_dispatcher = self.connection_kwargs.get("event_dispatcher", None) 

1166 if self._event_dispatcher is None: 

1167 self._event_dispatcher = EventDispatcher() 

1168 

1169 def __repr__(self): 

1170 conn_kwargs = ",".join([f"{k}={v}" for k, v in self.connection_kwargs.items()]) 

1171 return ( 

1172 f"<{self.__class__.__module__}.{self.__class__.__name__}" 

1173 f"(<{self.connection_class.__module__}.{self.connection_class.__name__}" 

1174 f"({conn_kwargs})>)>" 

1175 ) 

1176 

1177 def reset(self): 

1178 self._available_connections = [] 

1179 self._in_use_connections = weakref.WeakSet() 

1180 

1181 def can_get_connection(self) -> bool: 

1182 """Return True if a connection can be retrieved from the pool.""" 

1183 return ( 

1184 self._available_connections 

1185 or len(self._in_use_connections) < self.max_connections 

1186 ) 

1187 

1188 @deprecated_args( 

1189 args_to_warn=["*"], 

1190 reason="Use get_connection() without args instead", 

1191 version="5.3.0", 

1192 ) 

1193 async def get_connection(self, command_name=None, *keys, **options): 

1194 async with self._lock: 

1195 """Get a connected connection from the pool""" 

1196 connection = self.get_available_connection() 

1197 try: 

1198 await self.ensure_connection(connection) 

1199 except BaseException: 

1200 await self.release(connection) 

1201 raise 

1202 

1203 return connection 

1204 

1205 def get_available_connection(self): 

1206 """Get a connection from the pool, without making sure it is connected""" 

1207 try: 

1208 connection = self._available_connections.pop() 

1209 except IndexError: 

1210 if len(self._in_use_connections) >= self.max_connections: 

1211 raise ConnectionError("Too many connections") from None 

1212 connection = self.make_connection() 

1213 self._in_use_connections.add(connection) 

1214 return connection 

1215 

1216 def get_encoder(self): 

1217 """Return an encoder based on encoding settings""" 

1218 kwargs = self.connection_kwargs 

1219 return self.encoder_class( 

1220 encoding=kwargs.get("encoding", "utf-8"), 

1221 encoding_errors=kwargs.get("encoding_errors", "strict"), 

1222 decode_responses=kwargs.get("decode_responses", False), 

1223 ) 

1224 

1225 def make_connection(self): 

1226 """Create a new connection. Can be overridden by child classes.""" 

1227 return self.connection_class(**self.connection_kwargs) 

1228 

1229 async def ensure_connection(self, connection: AbstractConnection): 

1230 """Ensure that the connection object is connected and valid""" 

1231 await connection.connect() 

1232 # connections that the pool provides should be ready to send 

1233 # a command. if not, the connection was either returned to the 

1234 # pool before all data has been read or the socket has been 

1235 # closed. either way, reconnect and verify everything is good. 

1236 try: 

1237 if await connection.can_read_destructive(): 

1238 raise ConnectionError("Connection has data") from None 

1239 except (ConnectionError, TimeoutError, OSError): 

1240 await connection.disconnect() 

1241 await connection.connect() 

1242 if await connection.can_read_destructive(): 

1243 raise ConnectionError("Connection not ready") from None 

1244 

1245 async def release(self, connection: AbstractConnection): 

1246 """Releases the connection back to the pool""" 

1247 # Connections should always be returned to the correct pool, 

1248 # not doing so is an error that will cause an exception here. 

1249 self._in_use_connections.remove(connection) 

1250 if connection.should_reconnect(): 

1251 await connection.disconnect() 

1252 

1253 self._available_connections.append(connection) 

1254 await self._event_dispatcher.dispatch_async( 

1255 AsyncAfterConnectionReleasedEvent(connection) 

1256 ) 

1257 

1258 async def disconnect(self, inuse_connections: bool = True): 

1259 """ 

1260 Disconnects connections in the pool 

1261 

1262 If ``inuse_connections`` is True, disconnect connections that are 

1263 current in use, potentially by other tasks. Otherwise only disconnect 

1264 connections that are idle in the pool. 

1265 """ 

1266 if inuse_connections: 

1267 connections: Iterable[AbstractConnection] = chain( 

1268 self._available_connections, self._in_use_connections 

1269 ) 

1270 else: 

1271 connections = self._available_connections 

1272 resp = await asyncio.gather( 

1273 *(connection.disconnect() for connection in connections), 

1274 return_exceptions=True, 

1275 ) 

1276 exc = next((r for r in resp if isinstance(r, BaseException)), None) 

1277 if exc: 

1278 raise exc 

1279 

1280 async def update_active_connections_for_reconnect(self): 

1281 """ 

1282 Mark all active connections for reconnect. 

1283 """ 

1284 async with self._lock: 

1285 for conn in self._in_use_connections: 

1286 conn.mark_for_reconnect() 

1287 

1288 async def aclose(self) -> None: 

1289 """Close the pool, disconnecting all connections""" 

1290 await self.disconnect() 

1291 

1292 def set_retry(self, retry: "Retry") -> None: 

1293 for conn in self._available_connections: 

1294 conn.retry = retry 

1295 for conn in self._in_use_connections: 

1296 conn.retry = retry 

1297 

1298 async def re_auth_callback(self, token: TokenInterface): 

1299 async with self._lock: 

1300 for conn in self._available_connections: 

1301 await conn.retry.call_with_retry( 

1302 lambda: conn.send_command( 

1303 "AUTH", token.try_get("oid"), token.get_value() 

1304 ), 

1305 lambda error: self._mock(error), 

1306 ) 

1307 await conn.retry.call_with_retry( 

1308 lambda: conn.read_response(), lambda error: self._mock(error) 

1309 ) 

1310 for conn in self._in_use_connections: 

1311 conn.set_re_auth_token(token) 

1312 

1313 async def _mock(self, error: RedisError): 

1314 """ 

1315 Dummy functions, needs to be passed as error callback to retry object. 

1316 :param error: 

1317 :return: 

1318 """ 

1319 pass 

1320 

1321 

1322class BlockingConnectionPool(ConnectionPool): 

1323 """ 

1324 A blocking connection pool:: 

1325 

1326 >>> from redis.asyncio import Redis, BlockingConnectionPool 

1327 >>> client = Redis.from_pool(BlockingConnectionPool()) 

1328 

1329 It performs the same function as the default 

1330 :py:class:`~redis.asyncio.ConnectionPool` implementation, in that, 

1331 it maintains a pool of reusable connections that can be shared by 

1332 multiple async redis clients. 

1333 

1334 The difference is that, in the event that a client tries to get a 

1335 connection from the pool when all of connections are in use, rather than 

1336 raising a :py:class:`~redis.ConnectionError` (as the default 

1337 :py:class:`~redis.asyncio.ConnectionPool` implementation does), it 

1338 blocks the current `Task` for a specified number of seconds until 

1339 a connection becomes available. 

1340 

1341 Use ``max_connections`` to increase / decrease the pool size:: 

1342 

1343 >>> pool = BlockingConnectionPool(max_connections=10) 

1344 

1345 Use ``timeout`` to tell it either how many seconds to wait for a connection 

1346 to become available, or to block forever: 

1347 

1348 >>> # Block forever. 

1349 >>> pool = BlockingConnectionPool(timeout=None) 

1350 

1351 >>> # Raise a ``ConnectionError`` after five seconds if a connection is 

1352 >>> # not available. 

1353 >>> pool = BlockingConnectionPool(timeout=5) 

1354 """ 

1355 

1356 def __init__( 

1357 self, 

1358 max_connections: int = 50, 

1359 timeout: Optional[int] = 20, 

1360 connection_class: Type[AbstractConnection] = Connection, 

1361 queue_class: Type[asyncio.Queue] = asyncio.LifoQueue, # deprecated 

1362 **connection_kwargs, 

1363 ): 

1364 super().__init__( 

1365 connection_class=connection_class, 

1366 max_connections=max_connections, 

1367 **connection_kwargs, 

1368 ) 

1369 self._condition = asyncio.Condition() 

1370 self.timeout = timeout 

1371 

1372 @deprecated_args( 

1373 args_to_warn=["*"], 

1374 reason="Use get_connection() without args instead", 

1375 version="5.3.0", 

1376 ) 

1377 async def get_connection(self, command_name=None, *keys, **options): 

1378 """Gets a connection from the pool, blocking until one is available""" 

1379 try: 

1380 async with self._condition: 

1381 async with async_timeout(self.timeout): 

1382 await self._condition.wait_for(self.can_get_connection) 

1383 connection = super().get_available_connection() 

1384 except asyncio.TimeoutError as err: 

1385 raise ConnectionError("No connection available.") from err 

1386 

1387 # We now perform the connection check outside of the lock. 

1388 try: 

1389 await self.ensure_connection(connection) 

1390 return connection 

1391 except BaseException: 

1392 await self.release(connection) 

1393 raise 

1394 

1395 async def release(self, connection: AbstractConnection): 

1396 """Releases the connection back to the pool.""" 

1397 async with self._condition: 

1398 await super().release(connection) 

1399 self._condition.notify()