Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/sentinel.py: 23%

159 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 07:09 +0000

1import random 

2import weakref 

3 

4from redis.client import Redis 

5from redis.commands import SentinelCommands 

6from redis.connection import Connection, ConnectionPool, SSLConnection 

7from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError 

8from redis.utils import str_if_bytes 

9 

10 

11class MasterNotFoundError(ConnectionError): 

12 pass 

13 

14 

15class SlaveNotFoundError(ConnectionError): 

16 pass 

17 

18 

19class SentinelManagedConnection(Connection): 

20 def __init__(self, **kwargs): 

21 self.connection_pool = kwargs.pop("connection_pool") 

22 super().__init__(**kwargs) 

23 

24 def __repr__(self): 

25 pool = self.connection_pool 

26 s = f"{type(self).__name__}<service={pool.service_name}%s>" 

27 if self.host: 

28 host_info = f",host={self.host},port={self.port}" 

29 s = s % host_info 

30 return s 

31 

32 def connect_to(self, address): 

33 self.host, self.port = address 

34 super().connect() 

35 if self.connection_pool.check_connection: 

36 self.send_command("PING") 

37 if str_if_bytes(self.read_response()) != "PONG": 

38 raise ConnectionError("PING failed") 

39 

40 def _connect_retry(self): 

41 if self._sock: 

42 return # already connected 

43 if self.connection_pool.is_master: 

44 self.connect_to(self.connection_pool.get_master_address()) 

45 else: 

46 for slave in self.connection_pool.rotate_slaves(): 

47 try: 

48 return self.connect_to(slave) 

49 except ConnectionError: 

50 continue 

51 raise SlaveNotFoundError # Never be here 

52 

53 def connect(self): 

54 return self.retry.call_with_retry(self._connect_retry, lambda error: None) 

55 

56 def read_response(self, disable_decoding=False): 

57 try: 

58 return super().read_response(disable_decoding=disable_decoding) 

59 except ReadOnlyError: 

60 if self.connection_pool.is_master: 

61 # When talking to a master, a ReadOnlyError when likely 

62 # indicates that the previous master that we're still connected 

63 # to has been demoted to a slave and there's a new master. 

64 # calling disconnect will force the connection to re-query 

65 # sentinel during the next connect() attempt. 

66 self.disconnect() 

67 raise ConnectionError("The previous master is now a slave") 

68 raise 

69 

70 

71class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection): 

72 pass 

73 

74 

75class SentinelConnectionPool(ConnectionPool): 

76 """ 

77 Sentinel backed connection pool. 

78 

79 If ``check_connection`` flag is set to True, SentinelManagedConnection 

80 sends a PING command right after establishing the connection. 

81 """ 

82 

83 def __init__(self, service_name, sentinel_manager, **kwargs): 

84 kwargs["connection_class"] = kwargs.get( 

85 "connection_class", 

86 SentinelManagedSSLConnection 

87 if kwargs.pop("ssl", False) 

88 else SentinelManagedConnection, 

89 ) 

90 self.is_master = kwargs.pop("is_master", True) 

91 self.check_connection = kwargs.pop("check_connection", False) 

92 super().__init__(**kwargs) 

93 self.connection_kwargs["connection_pool"] = weakref.proxy(self) 

94 self.service_name = service_name 

95 self.sentinel_manager = sentinel_manager 

96 

97 def __repr__(self): 

98 role = "master" if self.is_master else "slave" 

99 return f"{type(self).__name__}<service={self.service_name}({role})" 

100 

101 def reset(self): 

102 super().reset() 

103 self.master_address = None 

104 self.slave_rr_counter = None 

105 

106 def owns_connection(self, connection): 

107 check = not self.is_master or ( 

108 self.is_master and self.master_address == (connection.host, connection.port) 

109 ) 

110 parent = super() 

111 return check and parent.owns_connection(connection) 

112 

113 def get_master_address(self): 

114 master_address = self.sentinel_manager.discover_master(self.service_name) 

115 if self.is_master: 

116 if self.master_address != master_address: 

117 self.master_address = master_address 

118 # disconnect any idle connections so that they reconnect 

119 # to the new master the next time that they are used. 

120 self.disconnect(inuse_connections=False) 

121 return master_address 

122 

123 def rotate_slaves(self): 

124 "Round-robin slave balancer" 

125 slaves = self.sentinel_manager.discover_slaves(self.service_name) 

126 if slaves: 

127 if self.slave_rr_counter is None: 

128 self.slave_rr_counter = random.randint(0, len(slaves) - 1) 

129 for _ in range(len(slaves)): 

130 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) 

131 slave = slaves[self.slave_rr_counter] 

132 yield slave 

133 # Fallback to the master connection 

134 try: 

135 yield self.get_master_address() 

136 except MasterNotFoundError: 

137 pass 

138 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") 

139 

140 

141class Sentinel(SentinelCommands): 

142 """ 

143 Redis Sentinel cluster client 

144 

145 >>> from redis.sentinel import Sentinel 

146 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1) 

147 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1) 

148 >>> master.set('foo', 'bar') 

149 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1) 

150 >>> slave.get('foo') 

151 b'bar' 

152 

153 ``sentinels`` is a list of sentinel nodes. Each node is represented by 

154 a pair (hostname, port). 

155 

156 ``min_other_sentinels`` defined a minimum number of peers for a sentinel. 

157 When querying a sentinel, if it doesn't meet this threshold, responses 

158 from that sentinel won't be considered valid. 

159 

160 ``sentinel_kwargs`` is a dictionary of connection arguments used when 

161 connecting to sentinel instances. Any argument that can be passed to 

162 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is 

163 not specified, any socket_timeout and socket_keepalive options specified 

164 in ``connection_kwargs`` will be used. 

165 

166 ``connection_kwargs`` are keyword arguments that will be used when 

167 establishing a connection to a Redis server. 

168 """ 

169 

170 def __init__( 

171 self, 

172 sentinels, 

173 min_other_sentinels=0, 

174 sentinel_kwargs=None, 

175 **connection_kwargs, 

176 ): 

177 # if sentinel_kwargs isn't defined, use the socket_* options from 

178 # connection_kwargs 

179 if sentinel_kwargs is None: 

180 sentinel_kwargs = { 

181 k: v for k, v in connection_kwargs.items() if k.startswith("socket_") 

182 } 

183 self.sentinel_kwargs = sentinel_kwargs 

184 

185 self.sentinels = [ 

186 Redis(hostname, port, **self.sentinel_kwargs) 

187 for hostname, port in sentinels 

188 ] 

189 self.min_other_sentinels = min_other_sentinels 

190 self.connection_kwargs = connection_kwargs 

191 

192 def execute_command(self, *args, **kwargs): 

193 """ 

194 Execute Sentinel command in sentinel nodes. 

195 once - If set to True, then execute the resulting command on a single 

196 node at random, rather than across the entire sentinel cluster. 

197 """ 

198 once = bool(kwargs.get("once", False)) 

199 if "once" in kwargs.keys(): 

200 kwargs.pop("once") 

201 

202 if once: 

203 random.choice(self.sentinels).execute_command(*args, **kwargs) 

204 else: 

205 for sentinel in self.sentinels: 

206 sentinel.execute_command(*args, **kwargs) 

207 return True 

208 

209 def __repr__(self): 

210 sentinel_addresses = [] 

211 for sentinel in self.sentinels: 

212 sentinel_addresses.append( 

213 "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs) 

214 ) 

215 return f'{type(self).__name__}<sentinels=[{",".join(sentinel_addresses)}]>' 

216 

217 def check_master_state(self, state, service_name): 

218 if not state["is_master"] or state["is_sdown"] or state["is_odown"]: 

219 return False 

220 # Check if our sentinel doesn't see other nodes 

221 if state["num-other-sentinels"] < self.min_other_sentinels: 

222 return False 

223 return True 

224 

225 def discover_master(self, service_name): 

226 """ 

227 Asks sentinel servers for the Redis master's address corresponding 

228 to the service labeled ``service_name``. 

229 

230 Returns a pair (address, port) or raises MasterNotFoundError if no 

231 master is found. 

232 """ 

233 for sentinel_no, sentinel in enumerate(self.sentinels): 

234 try: 

235 masters = sentinel.sentinel_masters() 

236 except (ConnectionError, TimeoutError): 

237 continue 

238 state = masters.get(service_name) 

239 if state and self.check_master_state(state, service_name): 

240 # Put this sentinel at the top of the list 

241 self.sentinels[0], self.sentinels[sentinel_no] = ( 

242 sentinel, 

243 self.sentinels[0], 

244 ) 

245 return state["ip"], state["port"] 

246 raise MasterNotFoundError(f"No master found for {service_name!r}") 

247 

248 def filter_slaves(self, slaves): 

249 "Remove slaves that are in an ODOWN or SDOWN state" 

250 slaves_alive = [] 

251 for slave in slaves: 

252 if slave["is_odown"] or slave["is_sdown"]: 

253 continue 

254 slaves_alive.append((slave["ip"], slave["port"])) 

255 return slaves_alive 

256 

257 def discover_slaves(self, service_name): 

258 "Returns a list of alive slaves for service ``service_name``" 

259 for sentinel in self.sentinels: 

260 try: 

261 slaves = sentinel.sentinel_slaves(service_name) 

262 except (ConnectionError, ResponseError, TimeoutError): 

263 continue 

264 slaves = self.filter_slaves(slaves) 

265 if slaves: 

266 return slaves 

267 return [] 

268 

269 def master_for( 

270 self, 

271 service_name, 

272 redis_class=Redis, 

273 connection_pool_class=SentinelConnectionPool, 

274 **kwargs, 

275 ): 

276 """ 

277 Returns a redis client instance for the ``service_name`` master. 

278 

279 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is 

280 used to retrieve the master's address before establishing a new 

281 connection. 

282 

283 NOTE: If the master's address has changed, any cached connections to 

284 the old master are closed. 

285 

286 By default clients will be a :py:class:`~redis.Redis` instance. 

287 Specify a different class to the ``redis_class`` argument if you 

288 desire something different. 

289 

290 The ``connection_pool_class`` specifies the connection pool to 

291 use. The :py:class:`~redis.sentinel.SentinelConnectionPool` 

292 will be used by default. 

293 

294 All other keyword arguments are merged with any connection_kwargs 

295 passed to this class and passed to the connection pool as keyword 

296 arguments to be used to initialize Redis connections. 

297 """ 

298 kwargs["is_master"] = True 

299 connection_kwargs = dict(self.connection_kwargs) 

300 connection_kwargs.update(kwargs) 

301 return redis_class( 

302 connection_pool=connection_pool_class( 

303 service_name, self, **connection_kwargs 

304 ) 

305 ) 

306 

307 def slave_for( 

308 self, 

309 service_name, 

310 redis_class=Redis, 

311 connection_pool_class=SentinelConnectionPool, 

312 **kwargs, 

313 ): 

314 """ 

315 Returns redis client instance for the ``service_name`` slave(s). 

316 

317 A SentinelConnectionPool class is used to retrieve the slave's 

318 address before establishing a new connection. 

319 

320 By default clients will be a :py:class:`~redis.Redis` instance. 

321 Specify a different class to the ``redis_class`` argument if you 

322 desire something different. 

323 

324 The ``connection_pool_class`` specifies the connection pool to use. 

325 The SentinelConnectionPool will be used by default. 

326 

327 All other keyword arguments are merged with any connection_kwargs 

328 passed to this class and passed to the connection pool as keyword 

329 arguments to be used to initialize Redis connections. 

330 """ 

331 kwargs["is_master"] = False 

332 connection_kwargs = dict(self.connection_kwargs) 

333 connection_kwargs.update(kwargs) 

334 return redis_class( 

335 connection_pool=connection_pool_class( 

336 service_name, self, **connection_kwargs 

337 ) 

338 )