Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/sentinel.py: 23%

165 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:16 +0000

1import random 

2import weakref 

3from typing import Optional 

4 

5from redis.client import Redis 

6from redis.commands import SentinelCommands 

7from redis.connection import Connection, ConnectionPool, SSLConnection 

8from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError 

9from redis.utils import str_if_bytes 

10 

11 

12class MasterNotFoundError(ConnectionError): 

13 pass 

14 

15 

16class SlaveNotFoundError(ConnectionError): 

17 pass 

18 

19 

20class SentinelManagedConnection(Connection): 

21 def __init__(self, **kwargs): 

22 self.connection_pool = kwargs.pop("connection_pool") 

23 super().__init__(**kwargs) 

24 

25 def __repr__(self): 

26 pool = self.connection_pool 

27 s = f"{type(self).__name__}<service={pool.service_name}%s>" 

28 if self.host: 

29 host_info = f",host={self.host},port={self.port}" 

30 s = s % host_info 

31 return s 

32 

33 def connect_to(self, address): 

34 self.host, self.port = address 

35 super().connect() 

36 if self.connection_pool.check_connection: 

37 self.send_command("PING") 

38 if str_if_bytes(self.read_response()) != "PONG": 

39 raise ConnectionError("PING failed") 

40 

41 def _connect_retry(self): 

42 if self._sock: 

43 return # already connected 

44 if self.connection_pool.is_master: 

45 self.connect_to(self.connection_pool.get_master_address()) 

46 else: 

47 for slave in self.connection_pool.rotate_slaves(): 

48 try: 

49 return self.connect_to(slave) 

50 except ConnectionError: 

51 continue 

52 raise SlaveNotFoundError # Never be here 

53 

54 def connect(self): 

55 return self.retry.call_with_retry(self._connect_retry, lambda error: None) 

56 

57 def read_response( 

58 self, disable_decoding=False, *, disconnect_on_error: Optional[bool] = False 

59 ): 

60 try: 

61 return super().read_response( 

62 disable_decoding=disable_decoding, 

63 disconnect_on_error=disconnect_on_error, 

64 ) 

65 except ReadOnlyError: 

66 if self.connection_pool.is_master: 

67 # When talking to a master, a ReadOnlyError when likely 

68 # indicates that the previous master that we're still connected 

69 # to has been demoted to a slave and there's a new master. 

70 # calling disconnect will force the connection to re-query 

71 # sentinel during the next connect() attempt. 

72 self.disconnect() 

73 raise ConnectionError("The previous master is now a slave") 

74 raise 

75 

76 

77class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection): 

78 pass 

79 

80 

81class SentinelConnectionPool(ConnectionPool): 

82 """ 

83 Sentinel backed connection pool. 

84 

85 If ``check_connection`` flag is set to True, SentinelManagedConnection 

86 sends a PING command right after establishing the connection. 

87 """ 

88 

89 def __init__(self, service_name, sentinel_manager, **kwargs): 

90 kwargs["connection_class"] = kwargs.get( 

91 "connection_class", 

92 SentinelManagedSSLConnection 

93 if kwargs.pop("ssl", False) 

94 else SentinelManagedConnection, 

95 ) 

96 self.is_master = kwargs.pop("is_master", True) 

97 self.check_connection = kwargs.pop("check_connection", False) 

98 super().__init__(**kwargs) 

99 self.connection_kwargs["connection_pool"] = weakref.proxy(self) 

100 self.service_name = service_name 

101 self.sentinel_manager = sentinel_manager 

102 

103 def __repr__(self): 

104 role = "master" if self.is_master else "slave" 

105 return f"{type(self).__name__}<service={self.service_name}({role})" 

106 

107 def reset(self): 

108 super().reset() 

109 self.master_address = None 

110 self.slave_rr_counter = None 

111 

112 def owns_connection(self, connection): 

113 check = not self.is_master or ( 

114 self.is_master and self.master_address == (connection.host, connection.port) 

115 ) 

116 parent = super() 

117 return check and parent.owns_connection(connection) 

118 

119 def get_master_address(self): 

120 master_address = self.sentinel_manager.discover_master(self.service_name) 

121 if self.is_master: 

122 if self.master_address != master_address: 

123 self.master_address = master_address 

124 # disconnect any idle connections so that they reconnect 

125 # to the new master the next time that they are used. 

126 self.disconnect(inuse_connections=False) 

127 return master_address 

128 

129 def rotate_slaves(self): 

130 "Round-robin slave balancer" 

131 slaves = self.sentinel_manager.discover_slaves(self.service_name) 

132 if slaves: 

133 if self.slave_rr_counter is None: 

134 self.slave_rr_counter = random.randint(0, len(slaves) - 1) 

135 for _ in range(len(slaves)): 

136 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) 

137 slave = slaves[self.slave_rr_counter] 

138 yield slave 

139 # Fallback to the master connection 

140 try: 

141 yield self.get_master_address() 

142 except MasterNotFoundError: 

143 pass 

144 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") 

145 

146 

147class Sentinel(SentinelCommands): 

148 """ 

149 Redis Sentinel cluster client 

150 

151 >>> from redis.sentinel import Sentinel 

152 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1) 

153 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1) 

154 >>> master.set('foo', 'bar') 

155 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1) 

156 >>> slave.get('foo') 

157 b'bar' 

158 

159 ``sentinels`` is a list of sentinel nodes. Each node is represented by 

160 a pair (hostname, port). 

161 

162 ``min_other_sentinels`` defined a minimum number of peers for a sentinel. 

163 When querying a sentinel, if it doesn't meet this threshold, responses 

164 from that sentinel won't be considered valid. 

165 

166 ``sentinel_kwargs`` is a dictionary of connection arguments used when 

167 connecting to sentinel instances. Any argument that can be passed to 

168 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is 

169 not specified, any socket_timeout and socket_keepalive options specified 

170 in ``connection_kwargs`` will be used. 

171 

172 ``connection_kwargs`` are keyword arguments that will be used when 

173 establishing a connection to a Redis server. 

174 """ 

175 

176 def __init__( 

177 self, 

178 sentinels, 

179 min_other_sentinels=0, 

180 sentinel_kwargs=None, 

181 **connection_kwargs, 

182 ): 

183 # if sentinel_kwargs isn't defined, use the socket_* options from 

184 # connection_kwargs 

185 if sentinel_kwargs is None: 

186 sentinel_kwargs = { 

187 k: v for k, v in connection_kwargs.items() if k.startswith("socket_") 

188 } 

189 self.sentinel_kwargs = sentinel_kwargs 

190 

191 self.sentinels = [ 

192 Redis(hostname, port, **self.sentinel_kwargs) 

193 for hostname, port in sentinels 

194 ] 

195 self.min_other_sentinels = min_other_sentinels 

196 self.connection_kwargs = connection_kwargs 

197 

198 def execute_command(self, *args, **kwargs): 

199 """ 

200 Execute Sentinel command in sentinel nodes. 

201 once - If set to True, then execute the resulting command on a single 

202 node at random, rather than across the entire sentinel cluster. 

203 """ 

204 once = bool(kwargs.get("once", False)) 

205 if "once" in kwargs.keys(): 

206 kwargs.pop("once") 

207 

208 if once: 

209 random.choice(self.sentinels).execute_command(*args, **kwargs) 

210 else: 

211 for sentinel in self.sentinels: 

212 sentinel.execute_command(*args, **kwargs) 

213 return True 

214 

215 def __repr__(self): 

216 sentinel_addresses = [] 

217 for sentinel in self.sentinels: 

218 sentinel_addresses.append( 

219 "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs) 

220 ) 

221 return f'{type(self).__name__}<sentinels=[{",".join(sentinel_addresses)}]>' 

222 

223 def check_master_state(self, state, service_name): 

224 if not state["is_master"] or state["is_sdown"] or state["is_odown"]: 

225 return False 

226 # Check if our sentinel doesn't see other nodes 

227 if state["num-other-sentinels"] < self.min_other_sentinels: 

228 return False 

229 return True 

230 

231 def discover_master(self, service_name): 

232 """ 

233 Asks sentinel servers for the Redis master's address corresponding 

234 to the service labeled ``service_name``. 

235 

236 Returns a pair (address, port) or raises MasterNotFoundError if no 

237 master is found. 

238 """ 

239 collected_errors = list() 

240 for sentinel_no, sentinel in enumerate(self.sentinels): 

241 try: 

242 masters = sentinel.sentinel_masters() 

243 except (ConnectionError, TimeoutError) as e: 

244 collected_errors.append(f"{sentinel} - {e!r}") 

245 continue 

246 state = masters.get(service_name) 

247 if state and self.check_master_state(state, service_name): 

248 # Put this sentinel at the top of the list 

249 self.sentinels[0], self.sentinels[sentinel_no] = ( 

250 sentinel, 

251 self.sentinels[0], 

252 ) 

253 return state["ip"], state["port"] 

254 

255 error_info = "" 

256 if len(collected_errors) > 0: 

257 error_info = f" : {', '.join(collected_errors)}" 

258 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}") 

259 

260 def filter_slaves(self, slaves): 

261 "Remove slaves that are in an ODOWN or SDOWN state" 

262 slaves_alive = [] 

263 for slave in slaves: 

264 if slave["is_odown"] or slave["is_sdown"]: 

265 continue 

266 slaves_alive.append((slave["ip"], slave["port"])) 

267 return slaves_alive 

268 

269 def discover_slaves(self, service_name): 

270 "Returns a list of alive slaves for service ``service_name``" 

271 for sentinel in self.sentinels: 

272 try: 

273 slaves = sentinel.sentinel_slaves(service_name) 

274 except (ConnectionError, ResponseError, TimeoutError): 

275 continue 

276 slaves = self.filter_slaves(slaves) 

277 if slaves: 

278 return slaves 

279 return [] 

280 

281 def master_for( 

282 self, 

283 service_name, 

284 redis_class=Redis, 

285 connection_pool_class=SentinelConnectionPool, 

286 **kwargs, 

287 ): 

288 """ 

289 Returns a redis client instance for the ``service_name`` master. 

290 

291 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is 

292 used to retrieve the master's address before establishing a new 

293 connection. 

294 

295 NOTE: If the master's address has changed, any cached connections to 

296 the old master are closed. 

297 

298 By default clients will be a :py:class:`~redis.Redis` instance. 

299 Specify a different class to the ``redis_class`` argument if you 

300 desire something different. 

301 

302 The ``connection_pool_class`` specifies the connection pool to 

303 use. The :py:class:`~redis.sentinel.SentinelConnectionPool` 

304 will be used by default. 

305 

306 All other keyword arguments are merged with any connection_kwargs 

307 passed to this class and passed to the connection pool as keyword 

308 arguments to be used to initialize Redis connections. 

309 """ 

310 kwargs["is_master"] = True 

311 connection_kwargs = dict(self.connection_kwargs) 

312 connection_kwargs.update(kwargs) 

313 return redis_class( 

314 connection_pool=connection_pool_class( 

315 service_name, self, **connection_kwargs 

316 ) 

317 ) 

318 

319 def slave_for( 

320 self, 

321 service_name, 

322 redis_class=Redis, 

323 connection_pool_class=SentinelConnectionPool, 

324 **kwargs, 

325 ): 

326 """ 

327 Returns redis client instance for the ``service_name`` slave(s). 

328 

329 A SentinelConnectionPool class is used to retrieve the slave's 

330 address before establishing a new connection. 

331 

332 By default clients will be a :py:class:`~redis.Redis` instance. 

333 Specify a different class to the ``redis_class`` argument if you 

334 desire something different. 

335 

336 The ``connection_pool_class`` specifies the connection pool to use. 

337 The SentinelConnectionPool will be used by default. 

338 

339 All other keyword arguments are merged with any connection_kwargs 

340 passed to this class and passed to the connection pool as keyword 

341 arguments to be used to initialize Redis connections. 

342 """ 

343 kwargs["is_master"] = False 

344 connection_kwargs = dict(self.connection_kwargs) 

345 connection_kwargs.update(kwargs) 

346 return redis_class( 

347 connection_pool=connection_pool_class( 

348 service_name, self, **connection_kwargs 

349 ) 

350 )