Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/asyncio/sentinel.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

173 statements  

1import asyncio 

2import random 

3import weakref 

4from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type 

5 

6from redis.asyncio.client import Redis 

7from redis.asyncio.connection import ( 

8 Connection, 

9 ConnectionPool, 

10 EncodableT, 

11 SSLConnection, 

12) 

13from redis.commands import AsyncSentinelCommands 

14from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError 

15from redis.utils import str_if_bytes 

16 

17 

18class MasterNotFoundError(ConnectionError): 

19 pass 

20 

21 

22class SlaveNotFoundError(ConnectionError): 

23 pass 

24 

25 

26class SentinelManagedConnection(Connection): 

27 def __init__(self, **kwargs): 

28 self.connection_pool = kwargs.pop("connection_pool") 

29 super().__init__(**kwargs) 

30 

31 def __repr__(self): 

32 s = f"<{self.__class__.__module__}.{self.__class__.__name__}" 

33 if self.host: 

34 host_info = f",host={self.host},port={self.port}" 

35 s += host_info 

36 return s + ")>" 

37 

38 async def connect_to(self, address): 

39 self.host, self.port = address 

40 await super().connect() 

41 if self.connection_pool.check_connection: 

42 await self.send_command("PING") 

43 if str_if_bytes(await self.read_response()) != "PONG": 

44 raise ConnectionError("PING failed") 

45 

46 async def _connect_retry(self): 

47 if self._reader: 

48 return # already connected 

49 if self.connection_pool.is_master: 

50 await self.connect_to(await self.connection_pool.get_master_address()) 

51 else: 

52 async for slave in self.connection_pool.rotate_slaves(): 

53 try: 

54 return await self.connect_to(slave) 

55 except ConnectionError: 

56 continue 

57 raise SlaveNotFoundError # Never be here 

58 

59 async def connect(self): 

60 return await self.retry.call_with_retry( 

61 self._connect_retry, 

62 lambda error: asyncio.sleep(0), 

63 ) 

64 

65 async def read_response( 

66 self, 

67 disable_decoding: bool = False, 

68 timeout: Optional[float] = None, 

69 *, 

70 disconnect_on_error: Optional[float] = True, 

71 push_request: Optional[bool] = False, 

72 ): 

73 try: 

74 return await super().read_response( 

75 disable_decoding=disable_decoding, 

76 timeout=timeout, 

77 disconnect_on_error=disconnect_on_error, 

78 push_request=push_request, 

79 ) 

80 except ReadOnlyError: 

81 if self.connection_pool.is_master: 

82 # When talking to a master, a ReadOnlyError when likely 

83 # indicates that the previous master that we're still connected 

84 # to has been demoted to a slave and there's a new master. 

85 # calling disconnect will force the connection to re-query 

86 # sentinel during the next connect() attempt. 

87 await self.disconnect() 

88 raise ConnectionError("The previous master is now a slave") 

89 raise 

90 

91 

92class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection): 

93 pass 

94 

95 

96class SentinelConnectionPool(ConnectionPool): 

97 """ 

98 Sentinel backed connection pool. 

99 

100 If ``check_connection`` flag is set to True, SentinelManagedConnection 

101 sends a PING command right after establishing the connection. 

102 """ 

103 

104 def __init__(self, service_name, sentinel_manager, **kwargs): 

105 kwargs["connection_class"] = kwargs.get( 

106 "connection_class", 

107 ( 

108 SentinelManagedSSLConnection 

109 if kwargs.pop("ssl", False) 

110 else SentinelManagedConnection 

111 ), 

112 ) 

113 self.is_master = kwargs.pop("is_master", True) 

114 self.check_connection = kwargs.pop("check_connection", False) 

115 super().__init__(**kwargs) 

116 self.connection_kwargs["connection_pool"] = weakref.proxy(self) 

117 self.service_name = service_name 

118 self.sentinel_manager = sentinel_manager 

119 self.master_address = None 

120 self.slave_rr_counter = None 

121 

122 def __repr__(self): 

123 return ( 

124 f"<{self.__class__.__module__}.{self.__class__.__name__}" 

125 f"(service={self.service_name}({self.is_master and 'master' or 'slave'}))>" 

126 ) 

127 

128 def reset(self): 

129 super().reset() 

130 self.master_address = None 

131 self.slave_rr_counter = None 

132 

133 def owns_connection(self, connection: Connection): 

134 check = not self.is_master or ( 

135 self.is_master and self.master_address == (connection.host, connection.port) 

136 ) 

137 return check and super().owns_connection(connection) 

138 

139 async def get_master_address(self): 

140 master_address = await self.sentinel_manager.discover_master(self.service_name) 

141 if self.is_master: 

142 if self.master_address != master_address: 

143 self.master_address = master_address 

144 # disconnect any idle connections so that they reconnect 

145 # to the new master the next time that they are used. 

146 await self.disconnect(inuse_connections=False) 

147 return master_address 

148 

149 async def rotate_slaves(self) -> AsyncIterator: 

150 """Round-robin slave balancer""" 

151 slaves = await self.sentinel_manager.discover_slaves(self.service_name) 

152 if slaves: 

153 if self.slave_rr_counter is None: 

154 self.slave_rr_counter = random.randint(0, len(slaves) - 1) 

155 for _ in range(len(slaves)): 

156 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) 

157 slave = slaves[self.slave_rr_counter] 

158 yield slave 

159 # Fallback to the master connection 

160 try: 

161 yield await self.get_master_address() 

162 except MasterNotFoundError: 

163 pass 

164 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") 

165 

166 

167class Sentinel(AsyncSentinelCommands): 

168 """ 

169 Redis Sentinel cluster client 

170 

171 >>> from redis.sentinel import Sentinel 

172 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1) 

173 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1) 

174 >>> await master.set('foo', 'bar') 

175 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1) 

176 >>> await slave.get('foo') 

177 b'bar' 

178 

179 ``sentinels`` is a list of sentinel nodes. Each node is represented by 

180 a pair (hostname, port). 

181 

182 ``min_other_sentinels`` defined a minimum number of peers for a sentinel. 

183 When querying a sentinel, if it doesn't meet this threshold, responses 

184 from that sentinel won't be considered valid. 

185 

186 ``sentinel_kwargs`` is a dictionary of connection arguments used when 

187 connecting to sentinel instances. Any argument that can be passed to 

188 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is 

189 not specified, any socket_timeout and socket_keepalive options specified 

190 in ``connection_kwargs`` will be used. 

191 

192 ``connection_kwargs`` are keyword arguments that will be used when 

193 establishing a connection to a Redis server. 

194 """ 

195 

196 def __init__( 

197 self, 

198 sentinels, 

199 min_other_sentinels=0, 

200 sentinel_kwargs=None, 

201 force_master_ip=None, 

202 **connection_kwargs, 

203 ): 

204 # if sentinel_kwargs isn't defined, use the socket_* options from 

205 # connection_kwargs 

206 if sentinel_kwargs is None: 

207 sentinel_kwargs = { 

208 k: v for k, v in connection_kwargs.items() if k.startswith("socket_") 

209 } 

210 self.sentinel_kwargs = sentinel_kwargs 

211 

212 self.sentinels = [ 

213 Redis(host=hostname, port=port, **self.sentinel_kwargs) 

214 for hostname, port in sentinels 

215 ] 

216 self.min_other_sentinels = min_other_sentinels 

217 self.connection_kwargs = connection_kwargs 

218 self._force_master_ip = force_master_ip 

219 

220 async def execute_command(self, *args, **kwargs): 

221 """ 

222 Execute Sentinel command in sentinel nodes. 

223 once - If set to True, then execute the resulting command on a single 

224 node at random, rather than across the entire sentinel cluster. 

225 """ 

226 once = bool(kwargs.pop("once", False)) 

227 

228 # Check if command is supposed to return the original 

229 # responses instead of boolean value. 

230 return_responses = bool(kwargs.pop("return_responses", False)) 

231 

232 if once: 

233 response = await random.choice(self.sentinels).execute_command( 

234 *args, **kwargs 

235 ) 

236 if return_responses: 

237 return [response] 

238 else: 

239 return True if response else False 

240 

241 tasks = [ 

242 asyncio.Task(sentinel.execute_command(*args, **kwargs)) 

243 for sentinel in self.sentinels 

244 ] 

245 responses = await asyncio.gather(*tasks) 

246 

247 if return_responses: 

248 return responses 

249 

250 return all(responses) 

251 

252 def __repr__(self): 

253 sentinel_addresses = [] 

254 for sentinel in self.sentinels: 

255 sentinel_addresses.append( 

256 f"{sentinel.connection_pool.connection_kwargs['host']}:" 

257 f"{sentinel.connection_pool.connection_kwargs['port']}" 

258 ) 

259 return ( 

260 f"<{self.__class__}.{self.__class__.__name__}" 

261 f"(sentinels=[{','.join(sentinel_addresses)}])>" 

262 ) 

263 

264 def check_master_state(self, state: dict, service_name: str) -> bool: 

265 if not state["is_master"] or state["is_sdown"] or state["is_odown"]: 

266 return False 

267 # Check if our sentinel doesn't see other nodes 

268 if state["num-other-sentinels"] < self.min_other_sentinels: 

269 return False 

270 return True 

271 

272 async def discover_master(self, service_name: str): 

273 """ 

274 Asks sentinel servers for the Redis master's address corresponding 

275 to the service labeled ``service_name``. 

276 

277 Returns a pair (address, port) or raises MasterNotFoundError if no 

278 master is found. 

279 """ 

280 collected_errors = list() 

281 for sentinel_no, sentinel in enumerate(self.sentinels): 

282 try: 

283 masters = await sentinel.sentinel_masters() 

284 except (ConnectionError, TimeoutError) as e: 

285 collected_errors.append(f"{sentinel} - {e!r}") 

286 continue 

287 state = masters.get(service_name) 

288 if state and self.check_master_state(state, service_name): 

289 # Put this sentinel at the top of the list 

290 self.sentinels[0], self.sentinels[sentinel_no] = ( 

291 sentinel, 

292 self.sentinels[0], 

293 ) 

294 

295 ip = ( 

296 self._force_master_ip 

297 if self._force_master_ip is not None 

298 else state["ip"] 

299 ) 

300 return ip, state["port"] 

301 

302 error_info = "" 

303 if len(collected_errors) > 0: 

304 error_info = f" : {', '.join(collected_errors)}" 

305 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}") 

306 

307 def filter_slaves( 

308 self, slaves: Iterable[Mapping] 

309 ) -> Sequence[Tuple[EncodableT, EncodableT]]: 

310 """Remove slaves that are in an ODOWN or SDOWN state""" 

311 slaves_alive = [] 

312 for slave in slaves: 

313 if slave["is_odown"] or slave["is_sdown"]: 

314 continue 

315 slaves_alive.append((slave["ip"], slave["port"])) 

316 return slaves_alive 

317 

318 async def discover_slaves( 

319 self, service_name: str 

320 ) -> Sequence[Tuple[EncodableT, EncodableT]]: 

321 """Returns a list of alive slaves for service ``service_name``""" 

322 for sentinel in self.sentinels: 

323 try: 

324 slaves = await sentinel.sentinel_slaves(service_name) 

325 except (ConnectionError, ResponseError, TimeoutError): 

326 continue 

327 slaves = self.filter_slaves(slaves) 

328 if slaves: 

329 return slaves 

330 return [] 

331 

332 def master_for( 

333 self, 

334 service_name: str, 

335 redis_class: Type[Redis] = Redis, 

336 connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, 

337 **kwargs, 

338 ): 

339 """ 

340 Returns a redis client instance for the ``service_name`` master. 

341 Sentinel client will detect failover and reconnect Redis clients 

342 automatically. 

343 

344 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is 

345 used to retrieve the master's address before establishing a new 

346 connection. 

347 

348 NOTE: If the master's address has changed, any cached connections to 

349 the old master are closed. 

350 

351 By default clients will be a :py:class:`~redis.Redis` instance. 

352 Specify a different class to the ``redis_class`` argument if you 

353 desire something different. 

354 

355 The ``connection_pool_class`` specifies the connection pool to 

356 use. The :py:class:`~redis.sentinel.SentinelConnectionPool` 

357 will be used by default. 

358 

359 All other keyword arguments are merged with any connection_kwargs 

360 passed to this class and passed to the connection pool as keyword 

361 arguments to be used to initialize Redis connections. 

362 """ 

363 kwargs["is_master"] = True 

364 connection_kwargs = dict(self.connection_kwargs) 

365 connection_kwargs.update(kwargs) 

366 

367 connection_pool = connection_pool_class(service_name, self, **connection_kwargs) 

368 # The Redis object "owns" the pool 

369 return redis_class.from_pool(connection_pool) 

370 

371 def slave_for( 

372 self, 

373 service_name: str, 

374 redis_class: Type[Redis] = Redis, 

375 connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, 

376 **kwargs, 

377 ): 

378 """ 

379 Returns redis client instance for the ``service_name`` slave(s). 

380 

381 A SentinelConnectionPool class is used to retrieve the slave's 

382 address before establishing a new connection. 

383 

384 By default clients will be a :py:class:`~redis.Redis` instance. 

385 Specify a different class to the ``redis_class`` argument if you 

386 desire something different. 

387 

388 The ``connection_pool_class`` specifies the connection pool to use. 

389 The SentinelConnectionPool will be used by default. 

390 

391 All other keyword arguments are merged with any connection_kwargs 

392 passed to this class and passed to the connection pool as keyword 

393 arguments to be used to initialize Redis connections. 

394 """ 

395 kwargs["is_master"] = False 

396 connection_kwargs = dict(self.connection_kwargs) 

397 connection_kwargs.update(kwargs) 

398 

399 connection_pool = connection_pool_class(service_name, self, **connection_kwargs) 

400 # The Redis object "owns" the pool 

401 return redis_class.from_pool(connection_pool)