Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/sentinel.py: 24%

185 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 06:16 +0000

1import random 

2import weakref 

3from typing import Optional 

4 

5from redis.client import Redis 

6from redis.commands import SentinelCommands 

7from redis.connection import Connection, ConnectionPool, SSLConnection 

8from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError 

9from redis.utils import str_if_bytes 

10 

11 

12class MasterNotFoundError(ConnectionError): 

13 pass 

14 

15 

16class SlaveNotFoundError(ConnectionError): 

17 pass 

18 

19 

20class SentinelManagedConnection(Connection): 

21 def __init__(self, **kwargs): 

22 self.connection_pool = kwargs.pop("connection_pool") 

23 super().__init__(**kwargs) 

24 

25 def __repr__(self): 

26 pool = self.connection_pool 

27 s = ( 

28 f"<{type(self).__module__}.{type(self).__name__}" 

29 f"(service={pool.service_name}%s)>" 

30 ) 

31 if self.host: 

32 host_info = f",host={self.host},port={self.port}" 

33 s = s % host_info 

34 return s 

35 

36 def connect_to(self, address): 

37 self.host, self.port = address 

38 super().connect() 

39 if self.connection_pool.check_connection: 

40 self.send_command("PING") 

41 if str_if_bytes(self.read_response()) != "PONG": 

42 raise ConnectionError("PING failed") 

43 

44 def _connect_retry(self): 

45 if self._sock: 

46 return # already connected 

47 if self.connection_pool.is_master: 

48 self.connect_to(self.connection_pool.get_master_address()) 

49 else: 

50 for slave in self.connection_pool.rotate_slaves(): 

51 try: 

52 return self.connect_to(slave) 

53 except ConnectionError: 

54 continue 

55 raise SlaveNotFoundError # Never be here 

56 

57 def connect(self): 

58 return self.retry.call_with_retry(self._connect_retry, lambda error: None) 

59 

60 def read_response( 

61 self, 

62 disable_decoding=False, 

63 *, 

64 disconnect_on_error: Optional[bool] = False, 

65 push_request: Optional[bool] = False, 

66 ): 

67 try: 

68 return super().read_response( 

69 disable_decoding=disable_decoding, 

70 disconnect_on_error=disconnect_on_error, 

71 push_request=push_request, 

72 ) 

73 except ReadOnlyError: 

74 if self.connection_pool.is_master: 

75 # When talking to a master, a ReadOnlyError when likely 

76 # indicates that the previous master that we're still connected 

77 # to has been demoted to a slave and there's a new master. 

78 # calling disconnect will force the connection to re-query 

79 # sentinel during the next connect() attempt. 

80 self.disconnect() 

81 raise ConnectionError("The previous master is now a slave") 

82 raise 

83 

84 

85class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection): 

86 pass 

87 

88 

89class SentinelConnectionPoolProxy: 

90 def __init__( 

91 self, 

92 connection_pool, 

93 is_master, 

94 check_connection, 

95 service_name, 

96 sentinel_manager, 

97 ): 

98 self.connection_pool_ref = weakref.ref(connection_pool) 

99 self.is_master = is_master 

100 self.check_connection = check_connection 

101 self.service_name = service_name 

102 self.sentinel_manager = sentinel_manager 

103 self.reset() 

104 

105 def reset(self): 

106 self.master_address = None 

107 self.slave_rr_counter = None 

108 

109 def get_master_address(self): 

110 master_address = self.sentinel_manager.discover_master(self.service_name) 

111 if self.is_master and self.master_address != master_address: 

112 self.master_address = master_address 

113 # disconnect any idle connections so that they reconnect 

114 # to the new master the next time that they are used. 

115 connection_pool = self.connection_pool_ref() 

116 if connection_pool is not None: 

117 connection_pool.disconnect(inuse_connections=False) 

118 return master_address 

119 

120 def rotate_slaves(self): 

121 slaves = self.sentinel_manager.discover_slaves(self.service_name) 

122 if slaves: 

123 if self.slave_rr_counter is None: 

124 self.slave_rr_counter = random.randint(0, len(slaves) - 1) 

125 for _ in range(len(slaves)): 

126 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) 

127 slave = slaves[self.slave_rr_counter] 

128 yield slave 

129 # Fallback to the master connection 

130 try: 

131 yield self.get_master_address() 

132 except MasterNotFoundError: 

133 pass 

134 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") 

135 

136 

137class SentinelConnectionPool(ConnectionPool): 

138 """ 

139 Sentinel backed connection pool. 

140 

141 If ``check_connection`` flag is set to True, SentinelManagedConnection 

142 sends a PING command right after establishing the connection. 

143 """ 

144 

145 def __init__(self, service_name, sentinel_manager, **kwargs): 

146 kwargs["connection_class"] = kwargs.get( 

147 "connection_class", 

148 ( 

149 SentinelManagedSSLConnection 

150 if kwargs.pop("ssl", False) 

151 else SentinelManagedConnection 

152 ), 

153 ) 

154 self.is_master = kwargs.pop("is_master", True) 

155 self.check_connection = kwargs.pop("check_connection", False) 

156 self.proxy = SentinelConnectionPoolProxy( 

157 connection_pool=self, 

158 is_master=self.is_master, 

159 check_connection=self.check_connection, 

160 service_name=service_name, 

161 sentinel_manager=sentinel_manager, 

162 ) 

163 super().__init__(**kwargs) 

164 self.connection_kwargs["connection_pool"] = self.proxy 

165 self.service_name = service_name 

166 self.sentinel_manager = sentinel_manager 

167 

168 def __repr__(self): 

169 role = "master" if self.is_master else "slave" 

170 return ( 

171 f"<{type(self).__module__}.{type(self).__name__}" 

172 f"(service={self.service_name}({role}))>" 

173 ) 

174 

175 def reset(self): 

176 super().reset() 

177 self.proxy.reset() 

178 

179 @property 

180 def master_address(self): 

181 return self.proxy.master_address 

182 

183 def owns_connection(self, connection): 

184 check = not self.is_master or ( 

185 self.is_master and self.master_address == (connection.host, connection.port) 

186 ) 

187 parent = super() 

188 return check and parent.owns_connection(connection) 

189 

190 def get_master_address(self): 

191 return self.proxy.get_master_address() 

192 

193 def rotate_slaves(self): 

194 "Round-robin slave balancer" 

195 return self.proxy.rotate_slaves() 

196 

197 

198class Sentinel(SentinelCommands): 

199 """ 

200 Redis Sentinel cluster client 

201 

202 >>> from redis.sentinel import Sentinel 

203 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1) 

204 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1) 

205 >>> master.set('foo', 'bar') 

206 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1) 

207 >>> slave.get('foo') 

208 b'bar' 

209 

210 ``sentinels`` is a list of sentinel nodes. Each node is represented by 

211 a pair (hostname, port). 

212 

213 ``min_other_sentinels`` defined a minimum number of peers for a sentinel. 

214 When querying a sentinel, if it doesn't meet this threshold, responses 

215 from that sentinel won't be considered valid. 

216 

217 ``sentinel_kwargs`` is a dictionary of connection arguments used when 

218 connecting to sentinel instances. Any argument that can be passed to 

219 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is 

220 not specified, any socket_timeout and socket_keepalive options specified 

221 in ``connection_kwargs`` will be used. 

222 

223 ``connection_kwargs`` are keyword arguments that will be used when 

224 establishing a connection to a Redis server. 

225 """ 

226 

227 def __init__( 

228 self, 

229 sentinels, 

230 min_other_sentinels=0, 

231 sentinel_kwargs=None, 

232 **connection_kwargs, 

233 ): 

234 # if sentinel_kwargs isn't defined, use the socket_* options from 

235 # connection_kwargs 

236 if sentinel_kwargs is None: 

237 sentinel_kwargs = { 

238 k: v for k, v in connection_kwargs.items() if k.startswith("socket_") 

239 } 

240 self.sentinel_kwargs = sentinel_kwargs 

241 

242 self.sentinels = [ 

243 Redis(hostname, port, **self.sentinel_kwargs) 

244 for hostname, port in sentinels 

245 ] 

246 self.min_other_sentinels = min_other_sentinels 

247 self.connection_kwargs = connection_kwargs 

248 

249 def execute_command(self, *args, **kwargs): 

250 """ 

251 Execute Sentinel command in sentinel nodes. 

252 once - If set to True, then execute the resulting command on a single 

253 node at random, rather than across the entire sentinel cluster. 

254 """ 

255 kwargs.pop("keys", None) # the keys are used only for client side caching 

256 once = bool(kwargs.get("once", False)) 

257 if "once" in kwargs.keys(): 

258 kwargs.pop("once") 

259 

260 if once: 

261 random.choice(self.sentinels).execute_command(*args, **kwargs) 

262 else: 

263 for sentinel in self.sentinels: 

264 sentinel.execute_command(*args, **kwargs) 

265 return True 

266 

267 def __repr__(self): 

268 sentinel_addresses = [] 

269 for sentinel in self.sentinels: 

270 sentinel_addresses.append( 

271 "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs) 

272 ) 

273 return ( 

274 f"<{type(self).__module__}.{type(self).__name__}" 

275 f'(sentinels=[{",".join(sentinel_addresses)}])>' 

276 ) 

277 

278 def check_master_state(self, state, service_name): 

279 if not state["is_master"] or state["is_sdown"] or state["is_odown"]: 

280 return False 

281 # Check if our sentinel doesn't see other nodes 

282 if state["num-other-sentinels"] < self.min_other_sentinels: 

283 return False 

284 return True 

285 

286 def discover_master(self, service_name): 

287 """ 

288 Asks sentinel servers for the Redis master's address corresponding 

289 to the service labeled ``service_name``. 

290 

291 Returns a pair (address, port) or raises MasterNotFoundError if no 

292 master is found. 

293 """ 

294 collected_errors = list() 

295 for sentinel_no, sentinel in enumerate(self.sentinels): 

296 try: 

297 masters = sentinel.sentinel_masters() 

298 except (ConnectionError, TimeoutError) as e: 

299 collected_errors.append(f"{sentinel} - {e!r}") 

300 continue 

301 state = masters.get(service_name) 

302 if state and self.check_master_state(state, service_name): 

303 # Put this sentinel at the top of the list 

304 self.sentinels[0], self.sentinels[sentinel_no] = ( 

305 sentinel, 

306 self.sentinels[0], 

307 ) 

308 return state["ip"], state["port"] 

309 

310 error_info = "" 

311 if len(collected_errors) > 0: 

312 error_info = f" : {', '.join(collected_errors)}" 

313 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}") 

314 

315 def filter_slaves(self, slaves): 

316 "Remove slaves that are in an ODOWN or SDOWN state" 

317 slaves_alive = [] 

318 for slave in slaves: 

319 if slave["is_odown"] or slave["is_sdown"]: 

320 continue 

321 slaves_alive.append((slave["ip"], slave["port"])) 

322 return slaves_alive 

323 

324 def discover_slaves(self, service_name): 

325 "Returns a list of alive slaves for service ``service_name``" 

326 for sentinel in self.sentinels: 

327 try: 

328 slaves = sentinel.sentinel_slaves(service_name) 

329 except (ConnectionError, ResponseError, TimeoutError): 

330 continue 

331 slaves = self.filter_slaves(slaves) 

332 if slaves: 

333 return slaves 

334 return [] 

335 

336 def master_for( 

337 self, 

338 service_name, 

339 redis_class=Redis, 

340 connection_pool_class=SentinelConnectionPool, 

341 **kwargs, 

342 ): 

343 """ 

344 Returns a redis client instance for the ``service_name`` master. 

345 

346 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is 

347 used to retrieve the master's address before establishing a new 

348 connection. 

349 

350 NOTE: If the master's address has changed, any cached connections to 

351 the old master are closed. 

352 

353 By default clients will be a :py:class:`~redis.Redis` instance. 

354 Specify a different class to the ``redis_class`` argument if you 

355 desire something different. 

356 

357 The ``connection_pool_class`` specifies the connection pool to 

358 use. The :py:class:`~redis.sentinel.SentinelConnectionPool` 

359 will be used by default. 

360 

361 All other keyword arguments are merged with any connection_kwargs 

362 passed to this class and passed to the connection pool as keyword 

363 arguments to be used to initialize Redis connections. 

364 """ 

365 kwargs["is_master"] = True 

366 connection_kwargs = dict(self.connection_kwargs) 

367 connection_kwargs.update(kwargs) 

368 return redis_class.from_pool( 

369 connection_pool_class(service_name, self, **connection_kwargs) 

370 ) 

371 

372 def slave_for( 

373 self, 

374 service_name, 

375 redis_class=Redis, 

376 connection_pool_class=SentinelConnectionPool, 

377 **kwargs, 

378 ): 

379 """ 

380 Returns redis client instance for the ``service_name`` slave(s). 

381 

382 A SentinelConnectionPool class is used to retrieve the slave's 

383 address before establishing a new connection. 

384 

385 By default clients will be a :py:class:`~redis.Redis` instance. 

386 Specify a different class to the ``redis_class`` argument if you 

387 desire something different. 

388 

389 The ``connection_pool_class`` specifies the connection pool to use. 

390 The SentinelConnectionPool will be used by default. 

391 

392 All other keyword arguments are merged with any connection_kwargs 

393 passed to this class and passed to the connection pool as keyword 

394 arguments to be used to initialize Redis connections. 

395 """ 

396 kwargs["is_master"] = False 

397 connection_kwargs = dict(self.connection_kwargs) 

398 connection_kwargs.update(kwargs) 

399 return redis_class.from_pool( 

400 connection_pool_class(service_name, self, **connection_kwargs) 

401 )