Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/sentinel.py: 24%
185 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
1import random
2import weakref
3from typing import Optional
5from redis.client import Redis
6from redis.commands import SentinelCommands
7from redis.connection import Connection, ConnectionPool, SSLConnection
8from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
9from redis.utils import str_if_bytes
12class MasterNotFoundError(ConnectionError):
13 pass
16class SlaveNotFoundError(ConnectionError):
17 pass
20class SentinelManagedConnection(Connection):
21 def __init__(self, **kwargs):
22 self.connection_pool = kwargs.pop("connection_pool")
23 super().__init__(**kwargs)
25 def __repr__(self):
26 pool = self.connection_pool
27 s = (
28 f"<{type(self).__module__}.{type(self).__name__}"
29 f"(service={pool.service_name}%s)>"
30 )
31 if self.host:
32 host_info = f",host={self.host},port={self.port}"
33 s = s % host_info
34 return s
36 def connect_to(self, address):
37 self.host, self.port = address
38 super().connect()
39 if self.connection_pool.check_connection:
40 self.send_command("PING")
41 if str_if_bytes(self.read_response()) != "PONG":
42 raise ConnectionError("PING failed")
44 def _connect_retry(self):
45 if self._sock:
46 return # already connected
47 if self.connection_pool.is_master:
48 self.connect_to(self.connection_pool.get_master_address())
49 else:
50 for slave in self.connection_pool.rotate_slaves():
51 try:
52 return self.connect_to(slave)
53 except ConnectionError:
54 continue
55 raise SlaveNotFoundError # Never be here
57 def connect(self):
58 return self.retry.call_with_retry(self._connect_retry, lambda error: None)
60 def read_response(
61 self,
62 disable_decoding=False,
63 *,
64 disconnect_on_error: Optional[bool] = False,
65 push_request: Optional[bool] = False,
66 ):
67 try:
68 return super().read_response(
69 disable_decoding=disable_decoding,
70 disconnect_on_error=disconnect_on_error,
71 push_request=push_request,
72 )
73 except ReadOnlyError:
74 if self.connection_pool.is_master:
75 # When talking to a master, a ReadOnlyError when likely
76 # indicates that the previous master that we're still connected
77 # to has been demoted to a slave and there's a new master.
78 # calling disconnect will force the connection to re-query
79 # sentinel during the next connect() attempt.
80 self.disconnect()
81 raise ConnectionError("The previous master is now a slave")
82 raise
85class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
86 pass
89class SentinelConnectionPoolProxy:
90 def __init__(
91 self,
92 connection_pool,
93 is_master,
94 check_connection,
95 service_name,
96 sentinel_manager,
97 ):
98 self.connection_pool_ref = weakref.ref(connection_pool)
99 self.is_master = is_master
100 self.check_connection = check_connection
101 self.service_name = service_name
102 self.sentinel_manager = sentinel_manager
103 self.reset()
105 def reset(self):
106 self.master_address = None
107 self.slave_rr_counter = None
109 def get_master_address(self):
110 master_address = self.sentinel_manager.discover_master(self.service_name)
111 if self.is_master and self.master_address != master_address:
112 self.master_address = master_address
113 # disconnect any idle connections so that they reconnect
114 # to the new master the next time that they are used.
115 connection_pool = self.connection_pool_ref()
116 if connection_pool is not None:
117 connection_pool.disconnect(inuse_connections=False)
118 return master_address
120 def rotate_slaves(self):
121 slaves = self.sentinel_manager.discover_slaves(self.service_name)
122 if slaves:
123 if self.slave_rr_counter is None:
124 self.slave_rr_counter = random.randint(0, len(slaves) - 1)
125 for _ in range(len(slaves)):
126 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
127 slave = slaves[self.slave_rr_counter]
128 yield slave
129 # Fallback to the master connection
130 try:
131 yield self.get_master_address()
132 except MasterNotFoundError:
133 pass
134 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
137class SentinelConnectionPool(ConnectionPool):
138 """
139 Sentinel backed connection pool.
141 If ``check_connection`` flag is set to True, SentinelManagedConnection
142 sends a PING command right after establishing the connection.
143 """
145 def __init__(self, service_name, sentinel_manager, **kwargs):
146 kwargs["connection_class"] = kwargs.get(
147 "connection_class",
148 (
149 SentinelManagedSSLConnection
150 if kwargs.pop("ssl", False)
151 else SentinelManagedConnection
152 ),
153 )
154 self.is_master = kwargs.pop("is_master", True)
155 self.check_connection = kwargs.pop("check_connection", False)
156 self.proxy = SentinelConnectionPoolProxy(
157 connection_pool=self,
158 is_master=self.is_master,
159 check_connection=self.check_connection,
160 service_name=service_name,
161 sentinel_manager=sentinel_manager,
162 )
163 super().__init__(**kwargs)
164 self.connection_kwargs["connection_pool"] = self.proxy
165 self.service_name = service_name
166 self.sentinel_manager = sentinel_manager
168 def __repr__(self):
169 role = "master" if self.is_master else "slave"
170 return (
171 f"<{type(self).__module__}.{type(self).__name__}"
172 f"(service={self.service_name}({role}))>"
173 )
175 def reset(self):
176 super().reset()
177 self.proxy.reset()
179 @property
180 def master_address(self):
181 return self.proxy.master_address
183 def owns_connection(self, connection):
184 check = not self.is_master or (
185 self.is_master and self.master_address == (connection.host, connection.port)
186 )
187 parent = super()
188 return check and parent.owns_connection(connection)
190 def get_master_address(self):
191 return self.proxy.get_master_address()
193 def rotate_slaves(self):
194 "Round-robin slave balancer"
195 return self.proxy.rotate_slaves()
198class Sentinel(SentinelCommands):
199 """
200 Redis Sentinel cluster client
202 >>> from redis.sentinel import Sentinel
203 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
204 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
205 >>> master.set('foo', 'bar')
206 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
207 >>> slave.get('foo')
208 b'bar'
210 ``sentinels`` is a list of sentinel nodes. Each node is represented by
211 a pair (hostname, port).
213 ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
214 When querying a sentinel, if it doesn't meet this threshold, responses
215 from that sentinel won't be considered valid.
217 ``sentinel_kwargs`` is a dictionary of connection arguments used when
218 connecting to sentinel instances. Any argument that can be passed to
219 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
220 not specified, any socket_timeout and socket_keepalive options specified
221 in ``connection_kwargs`` will be used.
223 ``connection_kwargs`` are keyword arguments that will be used when
224 establishing a connection to a Redis server.
225 """
227 def __init__(
228 self,
229 sentinels,
230 min_other_sentinels=0,
231 sentinel_kwargs=None,
232 **connection_kwargs,
233 ):
234 # if sentinel_kwargs isn't defined, use the socket_* options from
235 # connection_kwargs
236 if sentinel_kwargs is None:
237 sentinel_kwargs = {
238 k: v for k, v in connection_kwargs.items() if k.startswith("socket_")
239 }
240 self.sentinel_kwargs = sentinel_kwargs
242 self.sentinels = [
243 Redis(hostname, port, **self.sentinel_kwargs)
244 for hostname, port in sentinels
245 ]
246 self.min_other_sentinels = min_other_sentinels
247 self.connection_kwargs = connection_kwargs
249 def execute_command(self, *args, **kwargs):
250 """
251 Execute Sentinel command in sentinel nodes.
252 once - If set to True, then execute the resulting command on a single
253 node at random, rather than across the entire sentinel cluster.
254 """
255 kwargs.pop("keys", None) # the keys are used only for client side caching
256 once = bool(kwargs.get("once", False))
257 if "once" in kwargs.keys():
258 kwargs.pop("once")
260 if once:
261 random.choice(self.sentinels).execute_command(*args, **kwargs)
262 else:
263 for sentinel in self.sentinels:
264 sentinel.execute_command(*args, **kwargs)
265 return True
267 def __repr__(self):
268 sentinel_addresses = []
269 for sentinel in self.sentinels:
270 sentinel_addresses.append(
271 "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs)
272 )
273 return (
274 f"<{type(self).__module__}.{type(self).__name__}"
275 f'(sentinels=[{",".join(sentinel_addresses)}])>'
276 )
278 def check_master_state(self, state, service_name):
279 if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
280 return False
281 # Check if our sentinel doesn't see other nodes
282 if state["num-other-sentinels"] < self.min_other_sentinels:
283 return False
284 return True
286 def discover_master(self, service_name):
287 """
288 Asks sentinel servers for the Redis master's address corresponding
289 to the service labeled ``service_name``.
291 Returns a pair (address, port) or raises MasterNotFoundError if no
292 master is found.
293 """
294 collected_errors = list()
295 for sentinel_no, sentinel in enumerate(self.sentinels):
296 try:
297 masters = sentinel.sentinel_masters()
298 except (ConnectionError, TimeoutError) as e:
299 collected_errors.append(f"{sentinel} - {e!r}")
300 continue
301 state = masters.get(service_name)
302 if state and self.check_master_state(state, service_name):
303 # Put this sentinel at the top of the list
304 self.sentinels[0], self.sentinels[sentinel_no] = (
305 sentinel,
306 self.sentinels[0],
307 )
308 return state["ip"], state["port"]
310 error_info = ""
311 if len(collected_errors) > 0:
312 error_info = f" : {', '.join(collected_errors)}"
313 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}")
315 def filter_slaves(self, slaves):
316 "Remove slaves that are in an ODOWN or SDOWN state"
317 slaves_alive = []
318 for slave in slaves:
319 if slave["is_odown"] or slave["is_sdown"]:
320 continue
321 slaves_alive.append((slave["ip"], slave["port"]))
322 return slaves_alive
324 def discover_slaves(self, service_name):
325 "Returns a list of alive slaves for service ``service_name``"
326 for sentinel in self.sentinels:
327 try:
328 slaves = sentinel.sentinel_slaves(service_name)
329 except (ConnectionError, ResponseError, TimeoutError):
330 continue
331 slaves = self.filter_slaves(slaves)
332 if slaves:
333 return slaves
334 return []
336 def master_for(
337 self,
338 service_name,
339 redis_class=Redis,
340 connection_pool_class=SentinelConnectionPool,
341 **kwargs,
342 ):
343 """
344 Returns a redis client instance for the ``service_name`` master.
346 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is
347 used to retrieve the master's address before establishing a new
348 connection.
350 NOTE: If the master's address has changed, any cached connections to
351 the old master are closed.
353 By default clients will be a :py:class:`~redis.Redis` instance.
354 Specify a different class to the ``redis_class`` argument if you
355 desire something different.
357 The ``connection_pool_class`` specifies the connection pool to
358 use. The :py:class:`~redis.sentinel.SentinelConnectionPool`
359 will be used by default.
361 All other keyword arguments are merged with any connection_kwargs
362 passed to this class and passed to the connection pool as keyword
363 arguments to be used to initialize Redis connections.
364 """
365 kwargs["is_master"] = True
366 connection_kwargs = dict(self.connection_kwargs)
367 connection_kwargs.update(kwargs)
368 return redis_class.from_pool(
369 connection_pool_class(service_name, self, **connection_kwargs)
370 )
372 def slave_for(
373 self,
374 service_name,
375 redis_class=Redis,
376 connection_pool_class=SentinelConnectionPool,
377 **kwargs,
378 ):
379 """
380 Returns redis client instance for the ``service_name`` slave(s).
382 A SentinelConnectionPool class is used to retrieve the slave's
383 address before establishing a new connection.
385 By default clients will be a :py:class:`~redis.Redis` instance.
386 Specify a different class to the ``redis_class`` argument if you
387 desire something different.
389 The ``connection_pool_class`` specifies the connection pool to use.
390 The SentinelConnectionPool will be used by default.
392 All other keyword arguments are merged with any connection_kwargs
393 passed to this class and passed to the connection pool as keyword
394 arguments to be used to initialize Redis connections.
395 """
396 kwargs["is_master"] = False
397 connection_kwargs = dict(self.connection_kwargs)
398 connection_kwargs.update(kwargs)
399 return redis_class.from_pool(
400 connection_pool_class(service_name, self, **connection_kwargs)
401 )