Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/sentinel.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

187 statements  

1import random 

2import weakref 

3from typing import Optional, Union 

4 

5from redis._parsers.socket import SENTINEL 

6from redis.client import Redis 

7from redis.commands import SentinelCommands 

8from redis.connection import Connection, ConnectionPool, SSLConnection 

9from redis.exceptions import ( 

10 ConnectionError, 

11 ReadOnlyError, 

12 ResponseError, 

13 TimeoutError, 

14) 

15 

16 

17class MasterNotFoundError(ConnectionError): 

18 pass 

19 

20 

21class SlaveNotFoundError(ConnectionError): 

22 pass 

23 

24 

25class SentinelManagedConnection(Connection): 

26 def __init__(self, **kwargs): 

27 self.connection_pool = kwargs.pop("connection_pool") 

28 super().__init__(**kwargs) 

29 

30 def __repr__(self): 

31 pool = self.connection_pool 

32 s = ( 

33 f"<{type(self).__module__}.{type(self).__name__}" 

34 f"(service={pool.service_name}%s)>" 

35 ) 

36 if self.host: 

37 host_info = f",host={self.host},port={self.port}" 

38 s = s % host_info 

39 return s 

40 

41 def connect_to(self, address): 

42 self.host, self.port = address 

43 

44 self.connect_check_health( 

45 check_health=self.connection_pool.check_connection, 

46 retry_socket_connect=False, 

47 ) 

48 

49 def _connect_retry(self): 

50 if self._sock: 

51 return # already connected 

52 if self.connection_pool.is_master: 

53 self.connect_to(self.connection_pool.get_master_address()) 

54 else: 

55 for slave in self.connection_pool.rotate_slaves(): 

56 try: 

57 return self.connect_to(slave) 

58 except ConnectionError: 

59 continue 

60 raise SlaveNotFoundError # Never be here 

61 

62 def connect(self): 

63 return self.retry.call_with_retry(self._connect_retry, lambda error: None) 

64 

65 def read_response( 

66 self, 

67 disable_decoding=False, 

68 *, 

69 timeout: Union[float, object] = SENTINEL, 

70 disconnect_on_error: Optional[bool] = False, 

71 push_request: Optional[bool] = False, 

72 ): 

73 try: 

74 return super().read_response( 

75 disable_decoding=disable_decoding, 

76 timeout=timeout, 

77 disconnect_on_error=disconnect_on_error, 

78 push_request=push_request, 

79 ) 

80 except ReadOnlyError: 

81 if self.connection_pool.is_master: 

82 # When talking to a master, a ReadOnlyError when likely 

83 # indicates that the previous master that we're still connected 

84 # to has been demoted to a slave and there's a new master. 

85 # calling disconnect will force the connection to re-query 

86 # sentinel during the next connect() attempt. 

87 self.disconnect() 

88 raise ConnectionError("The previous master is now a slave") 

89 raise 

90 

91 

92class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection): 

93 pass 

94 

95 

96class SentinelConnectionPoolProxy: 

97 def __init__( 

98 self, 

99 connection_pool, 

100 is_master, 

101 check_connection, 

102 service_name, 

103 sentinel_manager, 

104 ): 

105 self.connection_pool_ref = weakref.ref(connection_pool) 

106 self.is_master = is_master 

107 self.check_connection = check_connection 

108 self.service_name = service_name 

109 self.sentinel_manager = sentinel_manager 

110 self.reset() 

111 

112 def reset(self): 

113 self.master_address = None 

114 self.slave_rr_counter = None 

115 

116 def get_master_address(self): 

117 master_address = self.sentinel_manager.discover_master(self.service_name) 

118 if self.is_master and self.master_address != master_address: 

119 self.master_address = master_address 

120 # disconnect any idle connections so that they reconnect 

121 # to the new master the next time that they are used. 

122 connection_pool = self.connection_pool_ref() 

123 if connection_pool is not None: 

124 connection_pool.disconnect(inuse_connections=False) 

125 return master_address 

126 

127 def rotate_slaves(self): 

128 slaves = self.sentinel_manager.discover_slaves(self.service_name) 

129 if slaves: 

130 if self.slave_rr_counter is None: 

131 self.slave_rr_counter = random.randint(0, len(slaves) - 1) 

132 for _ in range(len(slaves)): 

133 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) 

134 slave = slaves[self.slave_rr_counter] 

135 yield slave 

136 # Fallback to the master connection 

137 try: 

138 yield self.get_master_address() 

139 except MasterNotFoundError: 

140 pass 

141 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") 

142 

143 

144class SentinelConnectionPool(ConnectionPool): 

145 """ 

146 Sentinel backed connection pool. 

147 

148 If ``check_connection`` flag is set to True, SentinelManagedConnection 

149 sends a PING command right after establishing the connection. 

150 """ 

151 

152 def __init__(self, service_name, sentinel_manager, **kwargs): 

153 kwargs["connection_class"] = kwargs.get( 

154 "connection_class", 

155 ( 

156 SentinelManagedSSLConnection 

157 if kwargs.pop("ssl", False) 

158 else SentinelManagedConnection 

159 ), 

160 ) 

161 self.is_master = kwargs.pop("is_master", True) 

162 self.check_connection = kwargs.pop("check_connection", False) 

163 self.proxy = SentinelConnectionPoolProxy( 

164 connection_pool=self, 

165 is_master=self.is_master, 

166 check_connection=self.check_connection, 

167 service_name=service_name, 

168 sentinel_manager=sentinel_manager, 

169 ) 

170 super().__init__(**kwargs) 

171 self.connection_kwargs["connection_pool"] = self.proxy 

172 self.service_name = service_name 

173 self.sentinel_manager = sentinel_manager 

174 

175 def __repr__(self): 

176 role = "master" if self.is_master else "slave" 

177 return ( 

178 f"<{type(self).__module__}.{type(self).__name__}" 

179 f"(service={self.service_name}({role}))>" 

180 ) 

181 

182 def reset(self): 

183 super().reset() 

184 self.proxy.reset() 

185 

186 @property 

187 def master_address(self): 

188 return self.proxy.master_address 

189 

190 def owns_connection(self, connection): 

191 check = not self.is_master or ( 

192 self.is_master and self.master_address == (connection.host, connection.port) 

193 ) 

194 parent = super() 

195 return check and parent.owns_connection(connection) 

196 

197 def get_master_address(self): 

198 return self.proxy.get_master_address() 

199 

200 def rotate_slaves(self): 

201 "Round-robin slave balancer" 

202 return self.proxy.rotate_slaves() 

203 

204 

205class Sentinel(SentinelCommands): 

206 """ 

207 Redis Sentinel cluster client 

208 

209 >>> from redis.sentinel import Sentinel 

210 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1) 

211 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1) 

212 >>> master.set('foo', 'bar') 

213 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1) 

214 >>> slave.get('foo') 

215 b'bar' 

216 

217 ``sentinels`` is a list of sentinel nodes. Each node is represented by 

218 a pair (hostname, port). 

219 

220 ``min_other_sentinels`` defined a minimum number of peers for a sentinel. 

221 When querying a sentinel, if it doesn't meet this threshold, responses 

222 from that sentinel won't be considered valid. 

223 

224 ``sentinel_kwargs`` is a dictionary of connection arguments used when 

225 connecting to sentinel instances. Any argument that can be passed to 

226 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is 

227 not specified, any socket_timeout and socket_keepalive options specified 

228 in ``connection_kwargs`` will be used. 

229 

230 ``connection_kwargs`` are keyword arguments that will be used when 

231 establishing a connection to a Redis server. 

232 """ 

233 

234 def __init__( 

235 self, 

236 sentinels, 

237 min_other_sentinels=0, 

238 sentinel_kwargs=None, 

239 force_master_ip=None, 

240 **connection_kwargs, 

241 ): 

242 # if sentinel_kwargs isn't defined, use the socket_* options from 

243 # connection_kwargs 

244 if sentinel_kwargs is None: 

245 sentinel_kwargs = { 

246 k: v for k, v in connection_kwargs.items() if k.startswith("socket_") 

247 } 

248 self.sentinel_kwargs = sentinel_kwargs 

249 

250 self.sentinels = [ 

251 Redis(hostname, port, **self.sentinel_kwargs) 

252 for hostname, port in sentinels 

253 ] 

254 self.min_other_sentinels = min_other_sentinels 

255 self.connection_kwargs = connection_kwargs 

256 self._force_master_ip = force_master_ip 

257 

258 def execute_command(self, *args, **kwargs): 

259 """ 

260 Execute Sentinel command in sentinel nodes. 

261 once - If set to True, then execute the resulting command on a single 

262 node at random, rather than across the entire sentinel cluster. 

263 """ 

264 once = bool(kwargs.pop("once", False)) 

265 

266 # Check if command is supposed to return the original 

267 # responses instead of boolean value. 

268 return_responses = bool(kwargs.pop("return_responses", False)) 

269 

270 if once: 

271 response = random.choice(self.sentinels).execute_command(*args, **kwargs) 

272 if return_responses: 

273 return [response] 

274 else: 

275 return True if response else False 

276 

277 responses = [] 

278 for sentinel in self.sentinels: 

279 responses.append(sentinel.execute_command(*args, **kwargs)) 

280 

281 if return_responses: 

282 return responses 

283 

284 return all(responses) 

285 

286 def __repr__(self): 

287 sentinel_addresses = [] 

288 for sentinel in self.sentinels: 

289 sentinel_addresses.append( 

290 "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs) 

291 ) 

292 return ( 

293 f"<{type(self).__module__}.{type(self).__name__}" 

294 f"(sentinels=[{','.join(sentinel_addresses)}])>" 

295 ) 

296 

297 def check_master_state(self, state, service_name): 

298 if not state["is_master"] or state["is_sdown"] or state["is_odown"]: 

299 return False 

300 # Check if our sentinel doesn't see other nodes 

301 if state["num-other-sentinels"] < self.min_other_sentinels: 

302 return False 

303 return True 

304 

305 def discover_master(self, service_name): 

306 """ 

307 Asks sentinel servers for the Redis master's address corresponding 

308 to the service labeled ``service_name``. 

309 

310 Returns a pair (address, port) or raises MasterNotFoundError if no 

311 master is found. 

312 """ 

313 collected_errors = list() 

314 for sentinel_no, sentinel in enumerate(self.sentinels): 

315 try: 

316 masters = sentinel.sentinel_masters() 

317 except (ConnectionError, TimeoutError) as e: 

318 collected_errors.append(f"{sentinel} - {e!r}") 

319 continue 

320 state = masters.get(service_name) 

321 if state and self.check_master_state(state, service_name): 

322 # Put this sentinel at the top of the list 

323 self.sentinels[0], self.sentinels[sentinel_no] = ( 

324 sentinel, 

325 self.sentinels[0], 

326 ) 

327 

328 ip = ( 

329 self._force_master_ip 

330 if self._force_master_ip is not None 

331 else state["ip"] 

332 ) 

333 return ip, state["port"] 

334 

335 error_info = "" 

336 if len(collected_errors) > 0: 

337 error_info = f" : {', '.join(collected_errors)}" 

338 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}") 

339 

340 def filter_slaves(self, slaves): 

341 "Remove slaves that are in an ODOWN or SDOWN state" 

342 slaves_alive = [] 

343 for slave in slaves: 

344 if slave["is_odown"] or slave["is_sdown"]: 

345 continue 

346 slaves_alive.append((slave["ip"], slave["port"])) 

347 return slaves_alive 

348 

349 def discover_slaves(self, service_name): 

350 "Returns a list of alive slaves for service ``service_name``" 

351 for sentinel in self.sentinels: 

352 try: 

353 slaves = sentinel.sentinel_slaves(service_name) 

354 except (ConnectionError, ResponseError, TimeoutError): 

355 continue 

356 slaves = self.filter_slaves(slaves) 

357 if slaves: 

358 return slaves 

359 return [] 

360 

361 def master_for( 

362 self, 

363 service_name, 

364 redis_class=Redis, 

365 connection_pool_class=SentinelConnectionPool, 

366 **kwargs, 

367 ): 

368 """ 

369 Returns a redis client instance for the ``service_name`` master. 

370 Sentinel client will detect failover and reconnect Redis clients 

371 automatically. 

372 

373 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is 

374 used to retrieve the master's address before establishing a new 

375 connection. 

376 

377 NOTE: If the master's address has changed, any cached connections to 

378 the old master are closed. 

379 

380 By default clients will be a :py:class:`~redis.Redis` instance. 

381 Specify a different class to the ``redis_class`` argument if you 

382 desire something different. 

383 

384 The ``connection_pool_class`` specifies the connection pool to 

385 use. The :py:class:`~redis.sentinel.SentinelConnectionPool` 

386 will be used by default. 

387 

388 All other keyword arguments are merged with any connection_kwargs 

389 passed to this class and passed to the connection pool as keyword 

390 arguments to be used to initialize Redis connections. 

391 """ 

392 kwargs["is_master"] = True 

393 connection_kwargs = dict(self.connection_kwargs) 

394 connection_kwargs.update(kwargs) 

395 return redis_class.from_pool( 

396 connection_pool_class(service_name, self, **connection_kwargs) 

397 ) 

398 

399 def slave_for( 

400 self, 

401 service_name, 

402 redis_class=Redis, 

403 connection_pool_class=SentinelConnectionPool, 

404 **kwargs, 

405 ): 

406 """ 

407 Returns redis client instance for the ``service_name`` slave(s). 

408 

409 A SentinelConnectionPool class is used to retrieve the slave's 

410 address before establishing a new connection. 

411 

412 By default clients will be a :py:class:`~redis.Redis` instance. 

413 Specify a different class to the ``redis_class`` argument if you 

414 desire something different. 

415 

416 The ``connection_pool_class`` specifies the connection pool to use. 

417 The SentinelConnectionPool will be used by default. 

418 

419 All other keyword arguments are merged with any connection_kwargs 

420 passed to this class and passed to the connection pool as keyword 

421 arguments to be used to initialize Redis connections. 

422 """ 

423 kwargs["is_master"] = False 

424 connection_kwargs = dict(self.connection_kwargs) 

425 connection_kwargs.update(kwargs) 

426 return redis_class.from_pool( 

427 connection_pool_class(service_name, self, **connection_kwargs) 

428 )