Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/asyncio/sentinel.py: 23%

169 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 06:16 +0000

1import asyncio 

2import random 

3import weakref 

4from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type 

5 

6from redis.asyncio.client import Redis 

7from redis.asyncio.connection import ( 

8 Connection, 

9 ConnectionPool, 

10 EncodableT, 

11 SSLConnection, 

12) 

13from redis.commands import AsyncSentinelCommands 

14from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError 

15from redis.utils import str_if_bytes 

16 

17 

18class MasterNotFoundError(ConnectionError): 

19 pass 

20 

21 

22class SlaveNotFoundError(ConnectionError): 

23 pass 

24 

25 

26class SentinelManagedConnection(Connection): 

27 def __init__(self, **kwargs): 

28 self.connection_pool = kwargs.pop("connection_pool") 

29 super().__init__(**kwargs) 

30 

31 def __repr__(self): 

32 pool = self.connection_pool 

33 s = ( 

34 f"<{self.__class__.__module__}.{self.__class__.__name__}" 

35 f"(service={pool.service_name}" 

36 ) 

37 if self.host: 

38 host_info = f",host={self.host},port={self.port}" 

39 s += host_info 

40 return s + ")>" 

41 

42 async def connect_to(self, address): 

43 self.host, self.port = address 

44 await super().connect() 

45 if self.connection_pool.check_connection: 

46 await self.send_command("PING") 

47 if str_if_bytes(await self.read_response()) != "PONG": 

48 raise ConnectionError("PING failed") 

49 

50 async def _connect_retry(self): 

51 if self._reader: 

52 return # already connected 

53 if self.connection_pool.is_master: 

54 await self.connect_to(await self.connection_pool.get_master_address()) 

55 else: 

56 async for slave in self.connection_pool.rotate_slaves(): 

57 try: 

58 return await self.connect_to(slave) 

59 except ConnectionError: 

60 continue 

61 raise SlaveNotFoundError # Never be here 

62 

63 async def connect(self): 

64 return await self.retry.call_with_retry( 

65 self._connect_retry, 

66 lambda error: asyncio.sleep(0), 

67 ) 

68 

69 async def read_response( 

70 self, 

71 disable_decoding: bool = False, 

72 timeout: Optional[float] = None, 

73 *, 

74 disconnect_on_error: Optional[float] = True, 

75 push_request: Optional[bool] = False, 

76 ): 

77 try: 

78 return await super().read_response( 

79 disable_decoding=disable_decoding, 

80 timeout=timeout, 

81 disconnect_on_error=disconnect_on_error, 

82 push_request=push_request, 

83 ) 

84 except ReadOnlyError: 

85 if self.connection_pool.is_master: 

86 # When talking to a master, a ReadOnlyError when likely 

87 # indicates that the previous master that we're still connected 

88 # to has been demoted to a slave and there's a new master. 

89 # calling disconnect will force the connection to re-query 

90 # sentinel during the next connect() attempt. 

91 await self.disconnect() 

92 raise ConnectionError("The previous master is now a slave") 

93 raise 

94 

95 

96class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection): 

97 pass 

98 

99 

100class SentinelConnectionPool(ConnectionPool): 

101 """ 

102 Sentinel backed connection pool. 

103 

104 If ``check_connection`` flag is set to True, SentinelManagedConnection 

105 sends a PING command right after establishing the connection. 

106 """ 

107 

108 def __init__(self, service_name, sentinel_manager, **kwargs): 

109 kwargs["connection_class"] = kwargs.get( 

110 "connection_class", 

111 ( 

112 SentinelManagedSSLConnection 

113 if kwargs.pop("ssl", False) 

114 else SentinelManagedConnection 

115 ), 

116 ) 

117 self.is_master = kwargs.pop("is_master", True) 

118 self.check_connection = kwargs.pop("check_connection", False) 

119 super().__init__(**kwargs) 

120 self.connection_kwargs["connection_pool"] = weakref.proxy(self) 

121 self.service_name = service_name 

122 self.sentinel_manager = sentinel_manager 

123 self.master_address = None 

124 self.slave_rr_counter = None 

125 

126 def __repr__(self): 

127 return ( 

128 f"<{self.__class__.__module__}.{self.__class__.__name__}" 

129 f"(service={self.service_name}({self.is_master and 'master' or 'slave'}))>" 

130 ) 

131 

132 def reset(self): 

133 super().reset() 

134 self.master_address = None 

135 self.slave_rr_counter = None 

136 

137 def owns_connection(self, connection: Connection): 

138 check = not self.is_master or ( 

139 self.is_master and self.master_address == (connection.host, connection.port) 

140 ) 

141 return check and super().owns_connection(connection) 

142 

143 async def get_master_address(self): 

144 master_address = await self.sentinel_manager.discover_master(self.service_name) 

145 if self.is_master: 

146 if self.master_address != master_address: 

147 self.master_address = master_address 

148 # disconnect any idle connections so that they reconnect 

149 # to the new master the next time that they are used. 

150 await self.disconnect(inuse_connections=False) 

151 return master_address 

152 

153 async def rotate_slaves(self) -> AsyncIterator: 

154 """Round-robin slave balancer""" 

155 slaves = await self.sentinel_manager.discover_slaves(self.service_name) 

156 if slaves: 

157 if self.slave_rr_counter is None: 

158 self.slave_rr_counter = random.randint(0, len(slaves) - 1) 

159 for _ in range(len(slaves)): 

160 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) 

161 slave = slaves[self.slave_rr_counter] 

162 yield slave 

163 # Fallback to the master connection 

164 try: 

165 yield await self.get_master_address() 

166 except MasterNotFoundError: 

167 pass 

168 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") 

169 

170 

171class Sentinel(AsyncSentinelCommands): 

172 """ 

173 Redis Sentinel cluster client 

174 

175 >>> from redis.sentinel import Sentinel 

176 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1) 

177 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1) 

178 >>> await master.set('foo', 'bar') 

179 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1) 

180 >>> await slave.get('foo') 

181 b'bar' 

182 

183 ``sentinels`` is a list of sentinel nodes. Each node is represented by 

184 a pair (hostname, port). 

185 

186 ``min_other_sentinels`` defined a minimum number of peers for a sentinel. 

187 When querying a sentinel, if it doesn't meet this threshold, responses 

188 from that sentinel won't be considered valid. 

189 

190 ``sentinel_kwargs`` is a dictionary of connection arguments used when 

191 connecting to sentinel instances. Any argument that can be passed to 

192 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is 

193 not specified, any socket_timeout and socket_keepalive options specified 

194 in ``connection_kwargs`` will be used. 

195 

196 ``connection_kwargs`` are keyword arguments that will be used when 

197 establishing a connection to a Redis server. 

198 """ 

199 

200 def __init__( 

201 self, 

202 sentinels, 

203 min_other_sentinels=0, 

204 sentinel_kwargs=None, 

205 **connection_kwargs, 

206 ): 

207 # if sentinel_kwargs isn't defined, use the socket_* options from 

208 # connection_kwargs 

209 if sentinel_kwargs is None: 

210 sentinel_kwargs = { 

211 k: v for k, v in connection_kwargs.items() if k.startswith("socket_") 

212 } 

213 self.sentinel_kwargs = sentinel_kwargs 

214 

215 self.sentinels = [ 

216 Redis(host=hostname, port=port, **self.sentinel_kwargs) 

217 for hostname, port in sentinels 

218 ] 

219 self.min_other_sentinels = min_other_sentinels 

220 self.connection_kwargs = connection_kwargs 

221 

222 async def execute_command(self, *args, **kwargs): 

223 """ 

224 Execute Sentinel command in sentinel nodes. 

225 once - If set to True, then execute the resulting command on a single 

226 node at random, rather than across the entire sentinel cluster. 

227 """ 

228 kwargs.pop("keys", None) # the keys are used only for client side caching 

229 once = bool(kwargs.get("once", False)) 

230 if "once" in kwargs.keys(): 

231 kwargs.pop("once") 

232 

233 if once: 

234 await random.choice(self.sentinels).execute_command(*args, **kwargs) 

235 else: 

236 tasks = [ 

237 asyncio.Task(sentinel.execute_command(*args, **kwargs)) 

238 for sentinel in self.sentinels 

239 ] 

240 await asyncio.gather(*tasks) 

241 return True 

242 

243 def __repr__(self): 

244 sentinel_addresses = [] 

245 for sentinel in self.sentinels: 

246 sentinel_addresses.append( 

247 f"{sentinel.connection_pool.connection_kwargs['host']}:" 

248 f"{sentinel.connection_pool.connection_kwargs['port']}" 

249 ) 

250 return ( 

251 f"<{self.__class__}.{self.__class__.__name__}" 

252 f"(sentinels=[{','.join(sentinel_addresses)}])>" 

253 ) 

254 

255 def check_master_state(self, state: dict, service_name: str) -> bool: 

256 if not state["is_master"] or state["is_sdown"] or state["is_odown"]: 

257 return False 

258 # Check if our sentinel doesn't see other nodes 

259 if state["num-other-sentinels"] < self.min_other_sentinels: 

260 return False 

261 return True 

262 

263 async def discover_master(self, service_name: str): 

264 """ 

265 Asks sentinel servers for the Redis master's address corresponding 

266 to the service labeled ``service_name``. 

267 

268 Returns a pair (address, port) or raises MasterNotFoundError if no 

269 master is found. 

270 """ 

271 collected_errors = list() 

272 for sentinel_no, sentinel in enumerate(self.sentinels): 

273 try: 

274 masters = await sentinel.sentinel_masters() 

275 except (ConnectionError, TimeoutError) as e: 

276 collected_errors.append(f"{sentinel} - {e!r}") 

277 continue 

278 state = masters.get(service_name) 

279 if state and self.check_master_state(state, service_name): 

280 # Put this sentinel at the top of the list 

281 self.sentinels[0], self.sentinels[sentinel_no] = ( 

282 sentinel, 

283 self.sentinels[0], 

284 ) 

285 return state["ip"], state["port"] 

286 

287 error_info = "" 

288 if len(collected_errors) > 0: 

289 error_info = f" : {', '.join(collected_errors)}" 

290 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}") 

291 

292 def filter_slaves( 

293 self, slaves: Iterable[Mapping] 

294 ) -> Sequence[Tuple[EncodableT, EncodableT]]: 

295 """Remove slaves that are in an ODOWN or SDOWN state""" 

296 slaves_alive = [] 

297 for slave in slaves: 

298 if slave["is_odown"] or slave["is_sdown"]: 

299 continue 

300 slaves_alive.append((slave["ip"], slave["port"])) 

301 return slaves_alive 

302 

303 async def discover_slaves( 

304 self, service_name: str 

305 ) -> Sequence[Tuple[EncodableT, EncodableT]]: 

306 """Returns a list of alive slaves for service ``service_name``""" 

307 for sentinel in self.sentinels: 

308 try: 

309 slaves = await sentinel.sentinel_slaves(service_name) 

310 except (ConnectionError, ResponseError, TimeoutError): 

311 continue 

312 slaves = self.filter_slaves(slaves) 

313 if slaves: 

314 return slaves 

315 return [] 

316 

317 def master_for( 

318 self, 

319 service_name: str, 

320 redis_class: Type[Redis] = Redis, 

321 connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, 

322 **kwargs, 

323 ): 

324 """ 

325 Returns a redis client instance for the ``service_name`` master. 

326 

327 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is 

328 used to retrieve the master's address before establishing a new 

329 connection. 

330 

331 NOTE: If the master's address has changed, any cached connections to 

332 the old master are closed. 

333 

334 By default clients will be a :py:class:`~redis.Redis` instance. 

335 Specify a different class to the ``redis_class`` argument if you 

336 desire something different. 

337 

338 The ``connection_pool_class`` specifies the connection pool to 

339 use. The :py:class:`~redis.sentinel.SentinelConnectionPool` 

340 will be used by default. 

341 

342 All other keyword arguments are merged with any connection_kwargs 

343 passed to this class and passed to the connection pool as keyword 

344 arguments to be used to initialize Redis connections. 

345 """ 

346 kwargs["is_master"] = True 

347 connection_kwargs = dict(self.connection_kwargs) 

348 connection_kwargs.update(kwargs) 

349 

350 connection_pool = connection_pool_class(service_name, self, **connection_kwargs) 

351 # The Redis object "owns" the pool 

352 return redis_class.from_pool(connection_pool) 

353 

354 def slave_for( 

355 self, 

356 service_name: str, 

357 redis_class: Type[Redis] = Redis, 

358 connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool, 

359 **kwargs, 

360 ): 

361 """ 

362 Returns redis client instance for the ``service_name`` slave(s). 

363 

364 A SentinelConnectionPool class is used to retrieve the slave's 

365 address before establishing a new connection. 

366 

367 By default clients will be a :py:class:`~redis.Redis` instance. 

368 Specify a different class to the ``redis_class`` argument if you 

369 desire something different. 

370 

371 The ``connection_pool_class`` specifies the connection pool to use. 

372 The SentinelConnectionPool will be used by default. 

373 

374 All other keyword arguments are merged with any connection_kwargs 

375 passed to this class and passed to the connection pool as keyword 

376 arguments to be used to initialize Redis connections. 

377 """ 

378 kwargs["is_master"] = False 

379 connection_kwargs = dict(self.connection_kwargs) 

380 connection_kwargs.update(kwargs) 

381 

382 connection_pool = connection_pool_class(service_name, self, **connection_kwargs) 

383 # The Redis object "owns" the pool 

384 return redis_class.from_pool(connection_pool)