Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/sentinel.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

186 statements  

1import random 

2import weakref 

3from typing import Optional 

4 

5from redis.client import Redis 

6from redis.commands import SentinelCommands 

7from redis.connection import Connection, ConnectionPool, SSLConnection 

8from redis.exceptions import ( 

9 ConnectionError, 

10 ReadOnlyError, 

11 ResponseError, 

12 TimeoutError, 

13) 

14 

15 

16class MasterNotFoundError(ConnectionError): 

17 pass 

18 

19 

20class SlaveNotFoundError(ConnectionError): 

21 pass 

22 

23 

24class SentinelManagedConnection(Connection): 

25 def __init__(self, **kwargs): 

26 self.connection_pool = kwargs.pop("connection_pool") 

27 super().__init__(**kwargs) 

28 

29 def __repr__(self): 

30 pool = self.connection_pool 

31 s = ( 

32 f"<{type(self).__module__}.{type(self).__name__}" 

33 f"(service={pool.service_name}%s)>" 

34 ) 

35 if self.host: 

36 host_info = f",host={self.host},port={self.port}" 

37 s = s % host_info 

38 return s 

39 

40 def connect_to(self, address): 

41 self.host, self.port = address 

42 

43 self.connect_check_health( 

44 check_health=self.connection_pool.check_connection, 

45 retry_socket_connect=False, 

46 ) 

47 

48 def _connect_retry(self): 

49 if self._sock: 

50 return # already connected 

51 if self.connection_pool.is_master: 

52 self.connect_to(self.connection_pool.get_master_address()) 

53 else: 

54 for slave in self.connection_pool.rotate_slaves(): 

55 try: 

56 return self.connect_to(slave) 

57 except ConnectionError: 

58 continue 

59 raise SlaveNotFoundError # Never be here 

60 

61 def connect(self): 

62 return self.retry.call_with_retry(self._connect_retry, lambda error: None) 

63 

64 def read_response( 

65 self, 

66 disable_decoding=False, 

67 *, 

68 disconnect_on_error: Optional[bool] = False, 

69 push_request: Optional[bool] = False, 

70 ): 

71 try: 

72 return super().read_response( 

73 disable_decoding=disable_decoding, 

74 disconnect_on_error=disconnect_on_error, 

75 push_request=push_request, 

76 ) 

77 except ReadOnlyError: 

78 if self.connection_pool.is_master: 

79 # When talking to a master, a ReadOnlyError when likely 

80 # indicates that the previous master that we're still connected 

81 # to has been demoted to a slave and there's a new master. 

82 # calling disconnect will force the connection to re-query 

83 # sentinel during the next connect() attempt. 

84 self.disconnect() 

85 raise ConnectionError("The previous master is now a slave") 

86 raise 

87 

88 

89class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection): 

90 pass 

91 

92 

93class SentinelConnectionPoolProxy: 

94 def __init__( 

95 self, 

96 connection_pool, 

97 is_master, 

98 check_connection, 

99 service_name, 

100 sentinel_manager, 

101 ): 

102 self.connection_pool_ref = weakref.ref(connection_pool) 

103 self.is_master = is_master 

104 self.check_connection = check_connection 

105 self.service_name = service_name 

106 self.sentinel_manager = sentinel_manager 

107 self.reset() 

108 

109 def reset(self): 

110 self.master_address = None 

111 self.slave_rr_counter = None 

112 

113 def get_master_address(self): 

114 master_address = self.sentinel_manager.discover_master(self.service_name) 

115 if self.is_master and self.master_address != master_address: 

116 self.master_address = master_address 

117 # disconnect any idle connections so that they reconnect 

118 # to the new master the next time that they are used. 

119 connection_pool = self.connection_pool_ref() 

120 if connection_pool is not None: 

121 connection_pool.disconnect(inuse_connections=False) 

122 return master_address 

123 

124 def rotate_slaves(self): 

125 slaves = self.sentinel_manager.discover_slaves(self.service_name) 

126 if slaves: 

127 if self.slave_rr_counter is None: 

128 self.slave_rr_counter = random.randint(0, len(slaves) - 1) 

129 for _ in range(len(slaves)): 

130 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) 

131 slave = slaves[self.slave_rr_counter] 

132 yield slave 

133 # Fallback to the master connection 

134 try: 

135 yield self.get_master_address() 

136 except MasterNotFoundError: 

137 pass 

138 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") 

139 

140 

141class SentinelConnectionPool(ConnectionPool): 

142 """ 

143 Sentinel backed connection pool. 

144 

145 If ``check_connection`` flag is set to True, SentinelManagedConnection 

146 sends a PING command right after establishing the connection. 

147 """ 

148 

149 def __init__(self, service_name, sentinel_manager, **kwargs): 

150 kwargs["connection_class"] = kwargs.get( 

151 "connection_class", 

152 ( 

153 SentinelManagedSSLConnection 

154 if kwargs.pop("ssl", False) 

155 else SentinelManagedConnection 

156 ), 

157 ) 

158 self.is_master = kwargs.pop("is_master", True) 

159 self.check_connection = kwargs.pop("check_connection", False) 

160 self.proxy = SentinelConnectionPoolProxy( 

161 connection_pool=self, 

162 is_master=self.is_master, 

163 check_connection=self.check_connection, 

164 service_name=service_name, 

165 sentinel_manager=sentinel_manager, 

166 ) 

167 super().__init__(**kwargs) 

168 self.connection_kwargs["connection_pool"] = self.proxy 

169 self.service_name = service_name 

170 self.sentinel_manager = sentinel_manager 

171 

172 def __repr__(self): 

173 role = "master" if self.is_master else "slave" 

174 return ( 

175 f"<{type(self).__module__}.{type(self).__name__}" 

176 f"(service={self.service_name}({role}))>" 

177 ) 

178 

179 def reset(self): 

180 super().reset() 

181 self.proxy.reset() 

182 

183 @property 

184 def master_address(self): 

185 return self.proxy.master_address 

186 

187 def owns_connection(self, connection): 

188 check = not self.is_master or ( 

189 self.is_master and self.master_address == (connection.host, connection.port) 

190 ) 

191 parent = super() 

192 return check and parent.owns_connection(connection) 

193 

194 def get_master_address(self): 

195 return self.proxy.get_master_address() 

196 

197 def rotate_slaves(self): 

198 "Round-robin slave balancer" 

199 return self.proxy.rotate_slaves() 

200 

201 

202class Sentinel(SentinelCommands): 

203 """ 

204 Redis Sentinel cluster client 

205 

206 >>> from redis.sentinel import Sentinel 

207 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1) 

208 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1) 

209 >>> master.set('foo', 'bar') 

210 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1) 

211 >>> slave.get('foo') 

212 b'bar' 

213 

214 ``sentinels`` is a list of sentinel nodes. Each node is represented by 

215 a pair (hostname, port). 

216 

217 ``min_other_sentinels`` defined a minimum number of peers for a sentinel. 

218 When querying a sentinel, if it doesn't meet this threshold, responses 

219 from that sentinel won't be considered valid. 

220 

221 ``sentinel_kwargs`` is a dictionary of connection arguments used when 

222 connecting to sentinel instances. Any argument that can be passed to 

223 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is 

224 not specified, any socket_timeout and socket_keepalive options specified 

225 in ``connection_kwargs`` will be used. 

226 

227 ``connection_kwargs`` are keyword arguments that will be used when 

228 establishing a connection to a Redis server. 

229 """ 

230 

231 def __init__( 

232 self, 

233 sentinels, 

234 min_other_sentinels=0, 

235 sentinel_kwargs=None, 

236 force_master_ip=None, 

237 **connection_kwargs, 

238 ): 

239 # if sentinel_kwargs isn't defined, use the socket_* options from 

240 # connection_kwargs 

241 if sentinel_kwargs is None: 

242 sentinel_kwargs = { 

243 k: v for k, v in connection_kwargs.items() if k.startswith("socket_") 

244 } 

245 self.sentinel_kwargs = sentinel_kwargs 

246 

247 self.sentinels = [ 

248 Redis(hostname, port, **self.sentinel_kwargs) 

249 for hostname, port in sentinels 

250 ] 

251 self.min_other_sentinels = min_other_sentinels 

252 self.connection_kwargs = connection_kwargs 

253 self._force_master_ip = force_master_ip 

254 

255 def execute_command(self, *args, **kwargs): 

256 """ 

257 Execute Sentinel command in sentinel nodes. 

258 once - If set to True, then execute the resulting command on a single 

259 node at random, rather than across the entire sentinel cluster. 

260 """ 

261 once = bool(kwargs.pop("once", False)) 

262 

263 # Check if command is supposed to return the original 

264 # responses instead of boolean value. 

265 return_responses = bool(kwargs.pop("return_responses", False)) 

266 

267 if once: 

268 response = random.choice(self.sentinels).execute_command(*args, **kwargs) 

269 if return_responses: 

270 return [response] 

271 else: 

272 return True if response else False 

273 

274 responses = [] 

275 for sentinel in self.sentinels: 

276 responses.append(sentinel.execute_command(*args, **kwargs)) 

277 

278 if return_responses: 

279 return responses 

280 

281 return all(responses) 

282 

283 def __repr__(self): 

284 sentinel_addresses = [] 

285 for sentinel in self.sentinels: 

286 sentinel_addresses.append( 

287 "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs) 

288 ) 

289 return ( 

290 f"<{type(self).__module__}.{type(self).__name__}" 

291 f"(sentinels=[{','.join(sentinel_addresses)}])>" 

292 ) 

293 

294 def check_master_state(self, state, service_name): 

295 if not state["is_master"] or state["is_sdown"] or state["is_odown"]: 

296 return False 

297 # Check if our sentinel doesn't see other nodes 

298 if state["num-other-sentinels"] < self.min_other_sentinels: 

299 return False 

300 return True 

301 

302 def discover_master(self, service_name): 

303 """ 

304 Asks sentinel servers for the Redis master's address corresponding 

305 to the service labeled ``service_name``. 

306 

307 Returns a pair (address, port) or raises MasterNotFoundError if no 

308 master is found. 

309 """ 

310 collected_errors = list() 

311 for sentinel_no, sentinel in enumerate(self.sentinels): 

312 try: 

313 masters = sentinel.sentinel_masters() 

314 except (ConnectionError, TimeoutError) as e: 

315 collected_errors.append(f"{sentinel} - {e!r}") 

316 continue 

317 state = masters.get(service_name) 

318 if state and self.check_master_state(state, service_name): 

319 # Put this sentinel at the top of the list 

320 self.sentinels[0], self.sentinels[sentinel_no] = ( 

321 sentinel, 

322 self.sentinels[0], 

323 ) 

324 

325 ip = ( 

326 self._force_master_ip 

327 if self._force_master_ip is not None 

328 else state["ip"] 

329 ) 

330 return ip, state["port"] 

331 

332 error_info = "" 

333 if len(collected_errors) > 0: 

334 error_info = f" : {', '.join(collected_errors)}" 

335 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}") 

336 

337 def filter_slaves(self, slaves): 

338 "Remove slaves that are in an ODOWN or SDOWN state" 

339 slaves_alive = [] 

340 for slave in slaves: 

341 if slave["is_odown"] or slave["is_sdown"]: 

342 continue 

343 slaves_alive.append((slave["ip"], slave["port"])) 

344 return slaves_alive 

345 

346 def discover_slaves(self, service_name): 

347 "Returns a list of alive slaves for service ``service_name``" 

348 for sentinel in self.sentinels: 

349 try: 

350 slaves = sentinel.sentinel_slaves(service_name) 

351 except (ConnectionError, ResponseError, TimeoutError): 

352 continue 

353 slaves = self.filter_slaves(slaves) 

354 if slaves: 

355 return slaves 

356 return [] 

357 

358 def master_for( 

359 self, 

360 service_name, 

361 redis_class=Redis, 

362 connection_pool_class=SentinelConnectionPool, 

363 **kwargs, 

364 ): 

365 """ 

366 Returns a redis client instance for the ``service_name`` master. 

367 Sentinel client will detect failover and reconnect Redis clients 

368 automatically. 

369 

370 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is 

371 used to retrieve the master's address before establishing a new 

372 connection. 

373 

374 NOTE: If the master's address has changed, any cached connections to 

375 the old master are closed. 

376 

377 By default clients will be a :py:class:`~redis.Redis` instance. 

378 Specify a different class to the ``redis_class`` argument if you 

379 desire something different. 

380 

381 The ``connection_pool_class`` specifies the connection pool to 

382 use. The :py:class:`~redis.sentinel.SentinelConnectionPool` 

383 will be used by default. 

384 

385 All other keyword arguments are merged with any connection_kwargs 

386 passed to this class and passed to the connection pool as keyword 

387 arguments to be used to initialize Redis connections. 

388 """ 

389 kwargs["is_master"] = True 

390 connection_kwargs = dict(self.connection_kwargs) 

391 connection_kwargs.update(kwargs) 

392 return redis_class.from_pool( 

393 connection_pool_class(service_name, self, **connection_kwargs) 

394 ) 

395 

396 def slave_for( 

397 self, 

398 service_name, 

399 redis_class=Redis, 

400 connection_pool_class=SentinelConnectionPool, 

401 **kwargs, 

402 ): 

403 """ 

404 Returns redis client instance for the ``service_name`` slave(s). 

405 

406 A SentinelConnectionPool class is used to retrieve the slave's 

407 address before establishing a new connection. 

408 

409 By default clients will be a :py:class:`~redis.Redis` instance. 

410 Specify a different class to the ``redis_class`` argument if you 

411 desire something different. 

412 

413 The ``connection_pool_class`` specifies the connection pool to use. 

414 The SentinelConnectionPool will be used by default. 

415 

416 All other keyword arguments are merged with any connection_kwargs 

417 passed to this class and passed to the connection pool as keyword 

418 arguments to be used to initialize Redis connections. 

419 """ 

420 kwargs["is_master"] = False 

421 connection_kwargs = dict(self.connection_kwargs) 

422 connection_kwargs.update(kwargs) 

423 return redis_class.from_pool( 

424 connection_pool_class(service_name, self, **connection_kwargs) 

425 )