Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/asyncio/sentinel.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

168 statements  

1import asyncio 

2import random 

3import weakref 

4from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type 

5 

6from redis.asyncio.client import Redis 

7from redis.asyncio.connection import ( 

8 Connection, 

9 ConnectionPool, 

10 EncodableT, 

11 SSLConnection, 

12) 

13from redis.commands import AsyncSentinelCommands 

14from redis.exceptions import ( 

15 ConnectionError, 

16 ReadOnlyError, 

17 ResponseError, 

18 TimeoutError, 

19) 

20 

21 

22class MasterNotFoundError(ConnectionError): 

23 pass 

24 

25 

26class SlaveNotFoundError(ConnectionError): 

27 pass 

28 

29 

30class SentinelManagedConnection(Connection): 

31 def __init__(self, **kwargs): 

32 self.connection_pool = kwargs.pop("connection_pool") 

33 super().__init__(**kwargs) 

34 

35 def __repr__(self): 

36 s = f"<{self.__class__.__module__}.{self.__class__.__name__}" 

37 if self.host: 

38 host_info = f",host={self.host},port={self.port}" 

39 s += host_info 

40 return s + ")>" 

41 

42 async def connect_to(self, address): 

43 self.host, self.port = address 

44 await self.connect_check_health( 

45 check_health=self.connection_pool.check_connection, 

46 retry_socket_connect=False, 

47 ) 

48 

49 async def _connect_retry(self): 

50 if self._reader: 

51 return # already connected 

52 if self.connection_pool.is_master: 

53 await self.connect_to(await self.connection_pool.get_master_address()) 

54 else: 

55 async for slave in self.connection_pool.rotate_slaves(): 

56 try: 

57 return await self.connect_to(slave) 

58 except ConnectionError: 

59 continue 

60 raise SlaveNotFoundError # Never be here 

61 

62 async def connect(self): 

63 return await self.retry.call_with_retry( 

64 self._connect_retry, 

65 lambda error: asyncio.sleep(0), 

66 ) 

67 

68 async def read_response( 

69 self, 

70 disable_decoding: bool = False, 

71 timeout: Optional[float] = None, 

72 *, 

73 disconnect_on_error: Optional[float] = True, 

74 push_request: Optional[bool] = False, 

75 ): 

76 try: 

77 return await super().read_response( 

78 disable_decoding=disable_decoding, 

79 timeout=timeout, 

80 disconnect_on_error=disconnect_on_error, 

81 push_request=push_request, 

82 ) 

83 except ReadOnlyError: 

84 if self.connection_pool.is_master: 

85 # When talking to a master, a ReadOnlyError when likely 

86 # indicates that the previous master that we're still connected 

87 # to has been demoted to a slave and there's a new master. 

88 # calling disconnect will force the connection to re-query 

89 # sentinel during the next connect() attempt. 

90 await self.disconnect() 

91 raise ConnectionError("The previous master is now a slave") 

92 raise 

93 

94 

95class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection): 

96 pass 

97 

98 

99class SentinelConnectionPool(ConnectionPool): 

100 """ 

101 Sentinel backed connection pool. 

102 

103 If ``check_connection`` flag is set to True, SentinelManagedConnection 

104 sends a PING command right after establishing the connection. 

105 """ 

106 

107 def __init__(self, service_name, sentinel_manager, **kwargs): 

108 kwargs["connection_class"] = kwargs.get( 

109 "connection_class", 

110 ( 

111 SentinelManagedSSLConnection 

112 if kwargs.pop("ssl", False) 

113 else SentinelManagedConnection 

114 ), 

115 ) 

116 self.is_master = kwargs.pop("is_master", True) 

117 self.check_connection = kwargs.pop("check_connection", False) 

118 super().__init__(**kwargs) 

119 self.connection_kwargs["connection_pool"] = weakref.proxy(self) 

120 self.service_name = service_name 

121 self.sentinel_manager = sentinel_manager 

122 self.master_address = None 

123 self.slave_rr_counter = None 

124 

125 def __repr__(self): 

126 return ( 

127 f"<{self.__class__.__module__}.{self.__class__.__name__}" 

128 f"(service={self.service_name}({self.is_master and 'master' or 'slave'}))>" 

129 ) 

130 

131 def reset(self): 

132 super().reset() 

133 self.master_address = None 

134 self.slave_rr_counter = None 

135 

136 def owns_connection(self, connection: Connection): 

137 check = not self.is_master or ( 

138 self.is_master and self.master_address == (connection.host, connection.port) 

139 ) 

140 return check and super().owns_connection(connection) 

141 

142 async def get_master_address(self): 

143 master_address = await self.sentinel_manager.discover_master(self.service_name) 

144 if self.is_master: 

145 if self.master_address != master_address: 

146 self.master_address = master_address 

147 # disconnect any idle connections so that they reconnect 

148 # to the new master the next time that they are used. 

149 await self.disconnect(inuse_connections=False) 

150 return master_address 

151 

152 async def rotate_slaves(self) -> AsyncIterator: 

153 """Round-robin slave balancer""" 

154 slaves = await self.sentinel_manager.discover_slaves(self.service_name) 

155 if slaves: 

156 if self.slave_rr_counter is None: 

157 self.slave_rr_counter = random.randint(0, len(slaves) - 1) 

158 for _ in range(len(slaves)): 

159 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) 

160 slave = slaves[self.slave_rr_counter] 

161 yield slave 

162 # Fallback to the master connection 

163 try: 

164 yield await self.get_master_address() 

165 except MasterNotFoundError: 

166 pass 

167 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") 

168 

169 

170class Sentinel(AsyncSentinelCommands): 

171 """ 

172 Redis Sentinel cluster client 

173 

174 >>> from redis.sentinel import Sentinel 

175 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1) 

176 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1) 

177 >>> await master.set('foo', 'bar') 

178 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1) 

179 >>> await slave.get('foo') 

180 b'bar' 

181 

182 ``sentinels`` is a list of sentinel nodes. Each node is represented by 

183 a pair (hostname, port). 

184 

185 ``min_other_sentinels`` defined a minimum number of peers for a sentinel. 

186 When querying a sentinel, if it doesn't meet this threshold, responses 

187 from that sentinel won't be considered valid. 

188 

189 ``sentinel_kwargs`` is a dictionary of connection arguments used when 

190 connecting to sentinel instances. Any argument that can be passed to 

191 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is 

192 not specified, any socket_timeout and socket_keepalive options specified 

193 in ``connection_kwargs`` will be used. 

194 

195 ``connection_kwargs`` are keyword arguments that will be used when 

196 establishing a connection to a Redis server. 

197 """ 

198 

199 def __init__( 

200 self, 

201 sentinels, 

202 min_other_sentinels=0, 

203 sentinel_kwargs=None, 

204 force_master_ip=None, 

205 **connection_kwargs, 

206 ): 

207 # if sentinel_kwargs isn't defined, use the socket_* options from 

208 # connection_kwargs 

209 if sentinel_kwargs is None: 

210 sentinel_kwargs = { 

211 k: v for k, v in connection_kwargs.items() if k.startswith("socket_") 

212 } 

213 self.sentinel_kwargs = sentinel_kwargs 

214 

215 self.sentinels = [ 

216 Redis(host=hostname, port=port, **self.sentinel_kwargs) 

217 for hostname, port in sentinels 

218 ] 

219 self.min_other_sentinels = min_other_sentinels 

220 self.connection_kwargs = connection_kwargs 

221 self._force_master_ip = force_master_ip 

222 

223 async def execute_command(self, *args, **kwargs): 

224 """ 

225 Execute Sentinel command in sentinel nodes. 

226 once - If set to True, then execute the resulting command on a single 

227 node at random, rather than across the entire sentinel cluster. 

228 """ 

229 once = bool(kwargs.pop("once", False)) 

230 

231 # Check if command is supposed to return the original 

232 # responses instead of boolean value. 

233 return_responses = bool(kwargs.pop("return_responses", False)) 

234 

235 if once: 

236 response = await random.choice(self.sentinels).execute_command( 

237 *args, **kwargs 

238 ) 

239 if return_responses: 

240 return [response] 

241 else: 

242 return True if response else False 

243 

244 tasks = [ 

245 asyncio.Task(sentinel.execute_command(*args, **kwargs)) 

246 for sentinel in self.sentinels 

247 ] 

248 responses = await asyncio.gather(*tasks) 

249 

250 if return_responses: 

251 return responses 

252 

253 return all(responses) 

254 

255 def __repr__(self): 

256 sentinel_addresses = [] 

257 for sentinel in self.sentinels: 

258 sentinel_addresses.append( 

259 f"{sentinel.connection_pool.connection_kwargs['host']}:" 

260 f"{sentinel.connection_pool.connection_kwargs['port']}" 

261 ) 

262 return ( 

263 f"<{self.__class__}.{self.__class__.__name__}" 

264 f"(sentinels=[{','.join(sentinel_addresses)}])>" 

265 ) 

266 

267 def check_master_state(self, state: dict, service_name: str) -> bool: 

268 if not state["is_master"] or state["is_sdown"] or state["is_odown"]: 

269 return False 

270 # Check if our sentinel doesn't see other nodes 

271 if state["num-other-sentinels"] < self.min_other_sentinels: 

272 return False 

273 return True 

274 

275 async def discover_master(self, service_name: str): 

276 """ 

277 Asks sentinel servers for the Redis master's address corresponding 

278 to the service labeled ``service_name``. 

279 

280 Returns a pair (address, port) or raises MasterNotFoundError if no 

281 master is found. 

282 """ 

283 collected_errors = list() 

284 for sentinel_no, sentinel in enumerate(self.sentinels): 

285 try: 

286 masters = await sentinel.sentinel_masters() 

287 except (ConnectionError, TimeoutError) as e: 

288 collected_errors.append(f"{sentinel} - {e!r}") 

289 continue 

290 state = masters.get(service_name) 

291 if state and self.check_master_state(state, service_name): 

292 # Put this sentinel at the top of the list 

293 self.sentinels[0], self.sentinels[sentinel_no] = ( 

294 sentinel, 

295 self.sentinels[0], 

296 ) 

297 

298 ip = ( 

299 self._force_master_ip 

300 if self._force_master_ip is not None 

301 else state["ip"] 

302 ) 

303 return ip, state["port"] 

304 

305 error_info = "" 

306 if len(collected_errors) > 0: 

307 error_info = f" : {', '.join(collected_errors)}" 

308 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}") 

309 

310 def filter_slaves( 

311 self, slaves: Iterable[Mapping] 

312 ) -> Sequence[Tuple[EncodableT, EncodableT]]: 

313 """Remove slaves that are in an ODOWN or SDOWN state""" 

314 slaves_alive = [] 

315 for slave in slaves: 

316 if slave["is_odown"] or slave["is_sdown"]: 

317 continue 

318 slaves_alive.append((slave["ip"], slave["port"])) 

319 return slaves_alive 

320 

321 async def discover_slaves( 

322 self, service_name: str 

323 ) -> Sequence[Tuple[EncodableT, EncodableT]]: 

324 """Returns a list of alive slaves for service ``service_name``""" 

325 for sentinel in self.sentinels: 

326 try: 

327 slaves = await sentinel.sentinel_slaves(service_name) 

328 except (ConnectionError, ResponseError, TimeoutError): 

329 continue 

330 slaves = self.filter_slaves(slaves) 

331 if slaves: 

332 return slaves 

333 return [] 

334 

335 def master_for( 

336 self, 

337 service_name: str, 

338 redis_class: Type[Redis] = Redis, 

339 connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, 

340 **kwargs, 

341 ): 

342 """ 

343 Returns a redis client instance for the ``service_name`` master. 

344 Sentinel client will detect failover and reconnect Redis clients 

345 automatically. 

346 

347 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is 

348 used to retrieve the master's address before establishing a new 

349 connection. 

350 

351 NOTE: If the master's address has changed, any cached connections to 

352 the old master are closed. 

353 

354 By default clients will be a :py:class:`~redis.Redis` instance. 

355 Specify a different class to the ``redis_class`` argument if you 

356 desire something different. 

357 

358 The ``connection_pool_class`` specifies the connection pool to 

359 use. The :py:class:`~redis.sentinel.SentinelConnectionPool` 

360 will be used by default. 

361 

362 All other keyword arguments are merged with any connection_kwargs 

363 passed to this class and passed to the connection pool as keyword 

364 arguments to be used to initialize Redis connections. 

365 """ 

366 kwargs["is_master"] = True 

367 connection_kwargs = dict(self.connection_kwargs) 

368 connection_kwargs.update(kwargs) 

369 

370 connection_pool = connection_pool_class(service_name, self, **connection_kwargs) 

371 # The Redis object "owns" the pool 

372 return redis_class.from_pool(connection_pool) 

373 

374 def slave_for( 

375 self, 

376 service_name: str, 

377 redis_class: Type[Redis] = Redis, 

378 connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, 

379 **kwargs, 

380 ): 

381 """ 

382 Returns redis client instance for the ``service_name`` slave(s). 

383 

384 A SentinelConnectionPool class is used to retrieve the slave's 

385 address before establishing a new connection. 

386 

387 By default clients will be a :py:class:`~redis.Redis` instance. 

388 Specify a different class to the ``redis_class`` argument if you 

389 desire something different. 

390 

391 The ``connection_pool_class`` specifies the connection pool to use. 

392 The SentinelConnectionPool will be used by default. 

393 

394 All other keyword arguments are merged with any connection_kwargs 

395 passed to this class and passed to the connection pool as keyword 

396 arguments to be used to initialize Redis connections. 

397 """ 

398 kwargs["is_master"] = False 

399 connection_kwargs = dict(self.connection_kwargs) 

400 connection_kwargs.update(kwargs) 

401 

402 connection_pool = connection_pool_class(service_name, self, **connection_kwargs) 

403 # The Redis object "owns" the pool 

404 return redis_class.from_pool(connection_pool)