Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/sentinel.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

191 statements  

1import random 

2import weakref 

3from typing import Optional 

4 

5from redis.client import Redis 

6from redis.commands import SentinelCommands 

7from redis.connection import Connection, ConnectionPool, SSLConnection 

8from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError 

9from redis.utils import str_if_bytes 

10 

11 

12class MasterNotFoundError(ConnectionError): 

13 pass 

14 

15 

16class SlaveNotFoundError(ConnectionError): 

17 pass 

18 

19 

20class SentinelManagedConnection(Connection): 

21 def __init__(self, **kwargs): 

22 self.connection_pool = kwargs.pop("connection_pool") 

23 super().__init__(**kwargs) 

24 

25 def __repr__(self): 

26 pool = self.connection_pool 

27 s = ( 

28 f"<{type(self).__module__}.{type(self).__name__}" 

29 f"(service={pool.service_name}%s)>" 

30 ) 

31 if self.host: 

32 host_info = f",host={self.host},port={self.port}" 

33 s = s % host_info 

34 return s 

35 

36 def connect_to(self, address): 

37 self.host, self.port = address 

38 super().connect() 

39 if self.connection_pool.check_connection: 

40 self.send_command("PING") 

41 if str_if_bytes(self.read_response()) != "PONG": 

42 raise ConnectionError("PING failed") 

43 

44 def _connect_retry(self): 

45 if self._sock: 

46 return # already connected 

47 if self.connection_pool.is_master: 

48 self.connect_to(self.connection_pool.get_master_address()) 

49 else: 

50 for slave in self.connection_pool.rotate_slaves(): 

51 try: 

52 return self.connect_to(slave) 

53 except ConnectionError: 

54 continue 

55 raise SlaveNotFoundError # Never be here 

56 

57 def connect(self): 

58 return self.retry.call_with_retry(self._connect_retry, lambda error: None) 

59 

60 def read_response( 

61 self, 

62 disable_decoding=False, 

63 *, 

64 disconnect_on_error: Optional[bool] = False, 

65 push_request: Optional[bool] = False, 

66 ): 

67 try: 

68 return super().read_response( 

69 disable_decoding=disable_decoding, 

70 disconnect_on_error=disconnect_on_error, 

71 push_request=push_request, 

72 ) 

73 except ReadOnlyError: 

74 if self.connection_pool.is_master: 

75 # When talking to a master, a ReadOnlyError when likely 

76 # indicates that the previous master that we're still connected 

77 # to has been demoted to a slave and there's a new master. 

78 # calling disconnect will force the connection to re-query 

79 # sentinel during the next connect() attempt. 

80 self.disconnect() 

81 raise ConnectionError("The previous master is now a slave") 

82 raise 

83 

84 

85class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection): 

86 pass 

87 

88 

89class SentinelConnectionPoolProxy: 

90 def __init__( 

91 self, 

92 connection_pool, 

93 is_master, 

94 check_connection, 

95 service_name, 

96 sentinel_manager, 

97 ): 

98 self.connection_pool_ref = weakref.ref(connection_pool) 

99 self.is_master = is_master 

100 self.check_connection = check_connection 

101 self.service_name = service_name 

102 self.sentinel_manager = sentinel_manager 

103 self.reset() 

104 

105 def reset(self): 

106 self.master_address = None 

107 self.slave_rr_counter = None 

108 

109 def get_master_address(self): 

110 master_address = self.sentinel_manager.discover_master(self.service_name) 

111 if self.is_master and self.master_address != master_address: 

112 self.master_address = master_address 

113 # disconnect any idle connections so that they reconnect 

114 # to the new master the next time that they are used. 

115 connection_pool = self.connection_pool_ref() 

116 if connection_pool is not None: 

117 connection_pool.disconnect(inuse_connections=False) 

118 return master_address 

119 

120 def rotate_slaves(self): 

121 slaves = self.sentinel_manager.discover_slaves(self.service_name) 

122 if slaves: 

123 if self.slave_rr_counter is None: 

124 self.slave_rr_counter = random.randint(0, len(slaves) - 1) 

125 for _ in range(len(slaves)): 

126 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) 

127 slave = slaves[self.slave_rr_counter] 

128 yield slave 

129 # Fallback to the master connection 

130 try: 

131 yield self.get_master_address() 

132 except MasterNotFoundError: 

133 pass 

134 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") 

135 

136 

137class SentinelConnectionPool(ConnectionPool): 

138 """ 

139 Sentinel backed connection pool. 

140 

141 If ``check_connection`` flag is set to True, SentinelManagedConnection 

142 sends a PING command right after establishing the connection. 

143 """ 

144 

145 def __init__(self, service_name, sentinel_manager, **kwargs): 

146 kwargs["connection_class"] = kwargs.get( 

147 "connection_class", 

148 ( 

149 SentinelManagedSSLConnection 

150 if kwargs.pop("ssl", False) 

151 else SentinelManagedConnection 

152 ), 

153 ) 

154 self.is_master = kwargs.pop("is_master", True) 

155 self.check_connection = kwargs.pop("check_connection", False) 

156 self.proxy = SentinelConnectionPoolProxy( 

157 connection_pool=self, 

158 is_master=self.is_master, 

159 check_connection=self.check_connection, 

160 service_name=service_name, 

161 sentinel_manager=sentinel_manager, 

162 ) 

163 super().__init__(**kwargs) 

164 self.connection_kwargs["connection_pool"] = self.proxy 

165 self.service_name = service_name 

166 self.sentinel_manager = sentinel_manager 

167 

168 def __repr__(self): 

169 role = "master" if self.is_master else "slave" 

170 return ( 

171 f"<{type(self).__module__}.{type(self).__name__}" 

172 f"(service={self.service_name}({role}))>" 

173 ) 

174 

175 def reset(self): 

176 super().reset() 

177 self.proxy.reset() 

178 

179 @property 

180 def master_address(self): 

181 return self.proxy.master_address 

182 

183 def owns_connection(self, connection): 

184 check = not self.is_master or ( 

185 self.is_master and self.master_address == (connection.host, connection.port) 

186 ) 

187 parent = super() 

188 return check and parent.owns_connection(connection) 

189 

190 def get_master_address(self): 

191 return self.proxy.get_master_address() 

192 

193 def rotate_slaves(self): 

194 "Round-robin slave balancer" 

195 return self.proxy.rotate_slaves() 

196 

197 

198class Sentinel(SentinelCommands): 

199 """ 

200 Redis Sentinel cluster client 

201 

202 >>> from redis.sentinel import Sentinel 

203 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1) 

204 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1) 

205 >>> master.set('foo', 'bar') 

206 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1) 

207 >>> slave.get('foo') 

208 b'bar' 

209 

210 ``sentinels`` is a list of sentinel nodes. Each node is represented by 

211 a pair (hostname, port). 

212 

213 ``min_other_sentinels`` defined a minimum number of peers for a sentinel. 

214 When querying a sentinel, if it doesn't meet this threshold, responses 

215 from that sentinel won't be considered valid. 

216 

217 ``sentinel_kwargs`` is a dictionary of connection arguments used when 

218 connecting to sentinel instances. Any argument that can be passed to 

219 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is 

220 not specified, any socket_timeout and socket_keepalive options specified 

221 in ``connection_kwargs`` will be used. 

222 

223 ``connection_kwargs`` are keyword arguments that will be used when 

224 establishing a connection to a Redis server. 

225 """ 

226 

227 def __init__( 

228 self, 

229 sentinels, 

230 min_other_sentinels=0, 

231 sentinel_kwargs=None, 

232 force_master_ip=None, 

233 **connection_kwargs, 

234 ): 

235 # if sentinel_kwargs isn't defined, use the socket_* options from 

236 # connection_kwargs 

237 if sentinel_kwargs is None: 

238 sentinel_kwargs = { 

239 k: v for k, v in connection_kwargs.items() if k.startswith("socket_") 

240 } 

241 self.sentinel_kwargs = sentinel_kwargs 

242 

243 self.sentinels = [ 

244 Redis(hostname, port, **self.sentinel_kwargs) 

245 for hostname, port in sentinels 

246 ] 

247 self.min_other_sentinels = min_other_sentinels 

248 self.connection_kwargs = connection_kwargs 

249 self._force_master_ip = force_master_ip 

250 

251 def execute_command(self, *args, **kwargs): 

252 """ 

253 Execute Sentinel command in sentinel nodes. 

254 once - If set to True, then execute the resulting command on a single 

255 node at random, rather than across the entire sentinel cluster. 

256 """ 

257 once = bool(kwargs.pop("once", False)) 

258 

259 # Check if command is supposed to return the original 

260 # responses instead of boolean value. 

261 return_responses = bool(kwargs.pop("return_responses", False)) 

262 

263 if once: 

264 response = random.choice(self.sentinels).execute_command(*args, **kwargs) 

265 if return_responses: 

266 return [response] 

267 else: 

268 return True if response else False 

269 

270 responses = [] 

271 for sentinel in self.sentinels: 

272 responses.append(sentinel.execute_command(*args, **kwargs)) 

273 

274 if return_responses: 

275 return responses 

276 

277 return all(responses) 

278 

279 def __repr__(self): 

280 sentinel_addresses = [] 

281 for sentinel in self.sentinels: 

282 sentinel_addresses.append( 

283 "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs) 

284 ) 

285 return ( 

286 f"<{type(self).__module__}.{type(self).__name__}" 

287 f"(sentinels=[{','.join(sentinel_addresses)}])>" 

288 ) 

289 

290 def check_master_state(self, state, service_name): 

291 if not state["is_master"] or state["is_sdown"] or state["is_odown"]: 

292 return False 

293 # Check if our sentinel doesn't see other nodes 

294 if state["num-other-sentinels"] < self.min_other_sentinels: 

295 return False 

296 return True 

297 

298 def discover_master(self, service_name): 

299 """ 

300 Asks sentinel servers for the Redis master's address corresponding 

301 to the service labeled ``service_name``. 

302 

303 Returns a pair (address, port) or raises MasterNotFoundError if no 

304 master is found. 

305 """ 

306 collected_errors = list() 

307 for sentinel_no, sentinel in enumerate(self.sentinels): 

308 try: 

309 masters = sentinel.sentinel_masters() 

310 except (ConnectionError, TimeoutError) as e: 

311 collected_errors.append(f"{sentinel} - {e!r}") 

312 continue 

313 state = masters.get(service_name) 

314 if state and self.check_master_state(state, service_name): 

315 # Put this sentinel at the top of the list 

316 self.sentinels[0], self.sentinels[sentinel_no] = ( 

317 sentinel, 

318 self.sentinels[0], 

319 ) 

320 

321 ip = ( 

322 self._force_master_ip 

323 if self._force_master_ip is not None 

324 else state["ip"] 

325 ) 

326 return ip, state["port"] 

327 

328 error_info = "" 

329 if len(collected_errors) > 0: 

330 error_info = f" : {', '.join(collected_errors)}" 

331 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}") 

332 

333 def filter_slaves(self, slaves): 

334 "Remove slaves that are in an ODOWN or SDOWN state" 

335 slaves_alive = [] 

336 for slave in slaves: 

337 if slave["is_odown"] or slave["is_sdown"]: 

338 continue 

339 slaves_alive.append((slave["ip"], slave["port"])) 

340 return slaves_alive 

341 

342 def discover_slaves(self, service_name): 

343 "Returns a list of alive slaves for service ``service_name``" 

344 for sentinel in self.sentinels: 

345 try: 

346 slaves = sentinel.sentinel_slaves(service_name) 

347 except (ConnectionError, ResponseError, TimeoutError): 

348 continue 

349 slaves = self.filter_slaves(slaves) 

350 if slaves: 

351 return slaves 

352 return [] 

353 

354 def master_for( 

355 self, 

356 service_name, 

357 redis_class=Redis, 

358 connection_pool_class=SentinelConnectionPool, 

359 **kwargs, 

360 ): 

361 """ 

362 Returns a redis client instance for the ``service_name`` master. 

363 Sentinel client will detect failover and reconnect Redis clients 

364 automatically. 

365 

366 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is 

367 used to retrieve the master's address before establishing a new 

368 connection. 

369 

370 NOTE: If the master's address has changed, any cached connections to 

371 the old master are closed. 

372 

373 By default clients will be a :py:class:`~redis.Redis` instance. 

374 Specify a different class to the ``redis_class`` argument if you 

375 desire something different. 

376 

377 The ``connection_pool_class`` specifies the connection pool to 

378 use. The :py:class:`~redis.sentinel.SentinelConnectionPool` 

379 will be used by default. 

380 

381 All other keyword arguments are merged with any connection_kwargs 

382 passed to this class and passed to the connection pool as keyword 

383 arguments to be used to initialize Redis connections. 

384 """ 

385 kwargs["is_master"] = True 

386 connection_kwargs = dict(self.connection_kwargs) 

387 connection_kwargs.update(kwargs) 

388 return redis_class.from_pool( 

389 connection_pool_class(service_name, self, **connection_kwargs) 

390 ) 

391 

392 def slave_for( 

393 self, 

394 service_name, 

395 redis_class=Redis, 

396 connection_pool_class=SentinelConnectionPool, 

397 **kwargs, 

398 ): 

399 """ 

400 Returns redis client instance for the ``service_name`` slave(s). 

401 

402 A SentinelConnectionPool class is used to retrieve the slave's 

403 address before establishing a new connection. 

404 

405 By default clients will be a :py:class:`~redis.Redis` instance. 

406 Specify a different class to the ``redis_class`` argument if you 

407 desire something different. 

408 

409 The ``connection_pool_class`` specifies the connection pool to use. 

410 The SentinelConnectionPool will be used by default. 

411 

412 All other keyword arguments are merged with any connection_kwargs 

413 passed to this class and passed to the connection pool as keyword 

414 arguments to be used to initialize Redis connections. 

415 """ 

416 kwargs["is_master"] = False 

417 connection_kwargs = dict(self.connection_kwargs) 

418 connection_kwargs.update(kwargs) 

419 return redis_class.from_pool( 

420 connection_pool_class(service_name, self, **connection_kwargs) 

421 )