Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/sentinel.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import random
2import weakref
3from typing import Optional
5from redis.client import Redis
6from redis.commands import SentinelCommands
7from redis.connection import Connection, ConnectionPool, SSLConnection
8from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
9from redis.utils import str_if_bytes
12class MasterNotFoundError(ConnectionError):
13 pass
16class SlaveNotFoundError(ConnectionError):
17 pass
20class SentinelManagedConnection(Connection):
21 def __init__(self, **kwargs):
22 self.connection_pool = kwargs.pop("connection_pool")
23 super().__init__(**kwargs)
25 def __repr__(self):
26 pool = self.connection_pool
27 s = (
28 f"<{type(self).__module__}.{type(self).__name__}"
29 f"(service={pool.service_name}%s)>"
30 )
31 if self.host:
32 host_info = f",host={self.host},port={self.port}"
33 s = s % host_info
34 return s
36 def connect_to(self, address):
37 self.host, self.port = address
38 super().connect()
39 if self.connection_pool.check_connection:
40 self.send_command("PING")
41 if str_if_bytes(self.read_response()) != "PONG":
42 raise ConnectionError("PING failed")
44 def _connect_retry(self):
45 if self._sock:
46 return # already connected
47 if self.connection_pool.is_master:
48 self.connect_to(self.connection_pool.get_master_address())
49 else:
50 for slave in self.connection_pool.rotate_slaves():
51 try:
52 return self.connect_to(slave)
53 except ConnectionError:
54 continue
55 raise SlaveNotFoundError # Never be here
57 def connect(self):
58 return self.retry.call_with_retry(self._connect_retry, lambda error: None)
60 def read_response(
61 self,
62 disable_decoding=False,
63 *,
64 disconnect_on_error: Optional[bool] = False,
65 push_request: Optional[bool] = False,
66 ):
67 try:
68 return super().read_response(
69 disable_decoding=disable_decoding,
70 disconnect_on_error=disconnect_on_error,
71 push_request=push_request,
72 )
73 except ReadOnlyError:
74 if self.connection_pool.is_master:
75 # When talking to a master, a ReadOnlyError when likely
76 # indicates that the previous master that we're still connected
77 # to has been demoted to a slave and there's a new master.
78 # calling disconnect will force the connection to re-query
79 # sentinel during the next connect() attempt.
80 self.disconnect()
81 raise ConnectionError("The previous master is now a slave")
82 raise
85class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
86 pass
89class SentinelConnectionPoolProxy:
90 def __init__(
91 self,
92 connection_pool,
93 is_master,
94 check_connection,
95 service_name,
96 sentinel_manager,
97 ):
98 self.connection_pool_ref = weakref.ref(connection_pool)
99 self.is_master = is_master
100 self.check_connection = check_connection
101 self.service_name = service_name
102 self.sentinel_manager = sentinel_manager
103 self.reset()
105 def reset(self):
106 self.master_address = None
107 self.slave_rr_counter = None
109 def get_master_address(self):
110 master_address = self.sentinel_manager.discover_master(self.service_name)
111 if self.is_master and self.master_address != master_address:
112 self.master_address = master_address
113 # disconnect any idle connections so that they reconnect
114 # to the new master the next time that they are used.
115 connection_pool = self.connection_pool_ref()
116 if connection_pool is not None:
117 connection_pool.disconnect(inuse_connections=False)
118 return master_address
120 def rotate_slaves(self):
121 slaves = self.sentinel_manager.discover_slaves(self.service_name)
122 if slaves:
123 if self.slave_rr_counter is None:
124 self.slave_rr_counter = random.randint(0, len(slaves) - 1)
125 for _ in range(len(slaves)):
126 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
127 slave = slaves[self.slave_rr_counter]
128 yield slave
129 # Fallback to the master connection
130 try:
131 yield self.get_master_address()
132 except MasterNotFoundError:
133 pass
134 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
137class SentinelConnectionPool(ConnectionPool):
138 """
139 Sentinel backed connection pool.
141 If ``check_connection`` flag is set to True, SentinelManagedConnection
142 sends a PING command right after establishing the connection.
143 """
145 def __init__(self, service_name, sentinel_manager, **kwargs):
146 kwargs["connection_class"] = kwargs.get(
147 "connection_class",
148 (
149 SentinelManagedSSLConnection
150 if kwargs.pop("ssl", False)
151 else SentinelManagedConnection
152 ),
153 )
154 self.is_master = kwargs.pop("is_master", True)
155 self.check_connection = kwargs.pop("check_connection", False)
156 self.proxy = SentinelConnectionPoolProxy(
157 connection_pool=self,
158 is_master=self.is_master,
159 check_connection=self.check_connection,
160 service_name=service_name,
161 sentinel_manager=sentinel_manager,
162 )
163 super().__init__(**kwargs)
164 self.connection_kwargs["connection_pool"] = self.proxy
165 self.service_name = service_name
166 self.sentinel_manager = sentinel_manager
168 def __repr__(self):
169 role = "master" if self.is_master else "slave"
170 return (
171 f"<{type(self).__module__}.{type(self).__name__}"
172 f"(service={self.service_name}({role}))>"
173 )
175 def reset(self):
176 super().reset()
177 self.proxy.reset()
179 @property
180 def master_address(self):
181 return self.proxy.master_address
183 def owns_connection(self, connection):
184 check = not self.is_master or (
185 self.is_master and self.master_address == (connection.host, connection.port)
186 )
187 parent = super()
188 return check and parent.owns_connection(connection)
190 def get_master_address(self):
191 return self.proxy.get_master_address()
193 def rotate_slaves(self):
194 "Round-robin slave balancer"
195 return self.proxy.rotate_slaves()
198class Sentinel(SentinelCommands):
199 """
200 Redis Sentinel cluster client
202 >>> from redis.sentinel import Sentinel
203 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
204 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
205 >>> master.set('foo', 'bar')
206 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
207 >>> slave.get('foo')
208 b'bar'
210 ``sentinels`` is a list of sentinel nodes. Each node is represented by
211 a pair (hostname, port).
213 ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
214 When querying a sentinel, if it doesn't meet this threshold, responses
215 from that sentinel won't be considered valid.
217 ``sentinel_kwargs`` is a dictionary of connection arguments used when
218 connecting to sentinel instances. Any argument that can be passed to
219 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
220 not specified, any socket_timeout and socket_keepalive options specified
221 in ``connection_kwargs`` will be used.
223 ``connection_kwargs`` are keyword arguments that will be used when
224 establishing a connection to a Redis server.
225 """
227 def __init__(
228 self,
229 sentinels,
230 min_other_sentinels=0,
231 sentinel_kwargs=None,
232 force_master_ip=None,
233 **connection_kwargs,
234 ):
235 # if sentinel_kwargs isn't defined, use the socket_* options from
236 # connection_kwargs
237 if sentinel_kwargs is None:
238 sentinel_kwargs = {
239 k: v for k, v in connection_kwargs.items() if k.startswith("socket_")
240 }
241 self.sentinel_kwargs = sentinel_kwargs
243 self.sentinels = [
244 Redis(hostname, port, **self.sentinel_kwargs)
245 for hostname, port in sentinels
246 ]
247 self.min_other_sentinels = min_other_sentinels
248 self.connection_kwargs = connection_kwargs
249 self._force_master_ip = force_master_ip
251 def execute_command(self, *args, **kwargs):
252 """
253 Execute Sentinel command in sentinel nodes.
254 once - If set to True, then execute the resulting command on a single
255 node at random, rather than across the entire sentinel cluster.
256 """
257 once = bool(kwargs.pop("once", False))
259 # Check if command is supposed to return the original
260 # responses instead of boolean value.
261 return_responses = bool(kwargs.pop("return_responses", False))
263 if once:
264 response = random.choice(self.sentinels).execute_command(*args, **kwargs)
265 if return_responses:
266 return [response]
267 else:
268 return True if response else False
270 responses = []
271 for sentinel in self.sentinels:
272 responses.append(sentinel.execute_command(*args, **kwargs))
274 if return_responses:
275 return responses
277 return all(responses)
279 def __repr__(self):
280 sentinel_addresses = []
281 for sentinel in self.sentinels:
282 sentinel_addresses.append(
283 "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs)
284 )
285 return (
286 f"<{type(self).__module__}.{type(self).__name__}"
287 f"(sentinels=[{','.join(sentinel_addresses)}])>"
288 )
290 def check_master_state(self, state, service_name):
291 if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
292 return False
293 # Check if our sentinel doesn't see other nodes
294 if state["num-other-sentinels"] < self.min_other_sentinels:
295 return False
296 return True
298 def discover_master(self, service_name):
299 """
300 Asks sentinel servers for the Redis master's address corresponding
301 to the service labeled ``service_name``.
303 Returns a pair (address, port) or raises MasterNotFoundError if no
304 master is found.
305 """
306 collected_errors = list()
307 for sentinel_no, sentinel in enumerate(self.sentinels):
308 try:
309 masters = sentinel.sentinel_masters()
310 except (ConnectionError, TimeoutError) as e:
311 collected_errors.append(f"{sentinel} - {e!r}")
312 continue
313 state = masters.get(service_name)
314 if state and self.check_master_state(state, service_name):
315 # Put this sentinel at the top of the list
316 self.sentinels[0], self.sentinels[sentinel_no] = (
317 sentinel,
318 self.sentinels[0],
319 )
321 ip = (
322 self._force_master_ip
323 if self._force_master_ip is not None
324 else state["ip"]
325 )
326 return ip, state["port"]
328 error_info = ""
329 if len(collected_errors) > 0:
330 error_info = f" : {', '.join(collected_errors)}"
331 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}")
333 def filter_slaves(self, slaves):
334 "Remove slaves that are in an ODOWN or SDOWN state"
335 slaves_alive = []
336 for slave in slaves:
337 if slave["is_odown"] or slave["is_sdown"]:
338 continue
339 slaves_alive.append((slave["ip"], slave["port"]))
340 return slaves_alive
342 def discover_slaves(self, service_name):
343 "Returns a list of alive slaves for service ``service_name``"
344 for sentinel in self.sentinels:
345 try:
346 slaves = sentinel.sentinel_slaves(service_name)
347 except (ConnectionError, ResponseError, TimeoutError):
348 continue
349 slaves = self.filter_slaves(slaves)
350 if slaves:
351 return slaves
352 return []
354 def master_for(
355 self,
356 service_name,
357 redis_class=Redis,
358 connection_pool_class=SentinelConnectionPool,
359 **kwargs,
360 ):
361 """
362 Returns a redis client instance for the ``service_name`` master.
363 Sentinel client will detect failover and reconnect Redis clients
364 automatically.
366 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is
367 used to retrieve the master's address before establishing a new
368 connection.
370 NOTE: If the master's address has changed, any cached connections to
371 the old master are closed.
373 By default clients will be a :py:class:`~redis.Redis` instance.
374 Specify a different class to the ``redis_class`` argument if you
375 desire something different.
377 The ``connection_pool_class`` specifies the connection pool to
378 use. The :py:class:`~redis.sentinel.SentinelConnectionPool`
379 will be used by default.
381 All other keyword arguments are merged with any connection_kwargs
382 passed to this class and passed to the connection pool as keyword
383 arguments to be used to initialize Redis connections.
384 """
385 kwargs["is_master"] = True
386 connection_kwargs = dict(self.connection_kwargs)
387 connection_kwargs.update(kwargs)
388 return redis_class.from_pool(
389 connection_pool_class(service_name, self, **connection_kwargs)
390 )
392 def slave_for(
393 self,
394 service_name,
395 redis_class=Redis,
396 connection_pool_class=SentinelConnectionPool,
397 **kwargs,
398 ):
399 """
400 Returns redis client instance for the ``service_name`` slave(s).
402 A SentinelConnectionPool class is used to retrieve the slave's
403 address before establishing a new connection.
405 By default clients will be a :py:class:`~redis.Redis` instance.
406 Specify a different class to the ``redis_class`` argument if you
407 desire something different.
409 The ``connection_pool_class`` specifies the connection pool to use.
410 The SentinelConnectionPool will be used by default.
412 All other keyword arguments are merged with any connection_kwargs
413 passed to this class and passed to the connection pool as keyword
414 arguments to be used to initialize Redis connections.
415 """
416 kwargs["is_master"] = False
417 connection_kwargs = dict(self.connection_kwargs)
418 connection_kwargs.update(kwargs)
419 return redis_class.from_pool(
420 connection_pool_class(service_name, self, **connection_kwargs)
421 )