Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/sentinel.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import random
2import weakref
3from typing import Optional
5from redis.client import Redis
6from redis.commands import SentinelCommands
7from redis.connection import Connection, ConnectionPool, SSLConnection
8from redis.exceptions import (
9 ConnectionError,
10 ReadOnlyError,
11 ResponseError,
12 TimeoutError,
13)
16class MasterNotFoundError(ConnectionError):
17 pass
20class SlaveNotFoundError(ConnectionError):
21 pass
24class SentinelManagedConnection(Connection):
25 def __init__(self, **kwargs):
26 self.connection_pool = kwargs.pop("connection_pool")
27 super().__init__(**kwargs)
29 def __repr__(self):
30 pool = self.connection_pool
31 s = (
32 f"<{type(self).__module__}.{type(self).__name__}"
33 f"(service={pool.service_name}%s)>"
34 )
35 if self.host:
36 host_info = f",host={self.host},port={self.port}"
37 s = s % host_info
38 return s
40 def connect_to(self, address):
41 self.host, self.port = address
43 self.connect_check_health(
44 check_health=self.connection_pool.check_connection,
45 retry_socket_connect=False,
46 )
48 def _connect_retry(self):
49 if self._sock:
50 return # already connected
51 if self.connection_pool.is_master:
52 self.connect_to(self.connection_pool.get_master_address())
53 else:
54 for slave in self.connection_pool.rotate_slaves():
55 try:
56 return self.connect_to(slave)
57 except ConnectionError:
58 continue
59 raise SlaveNotFoundError # Never be here
61 def connect(self):
62 return self.retry.call_with_retry(self._connect_retry, lambda error: None)
64 def read_response(
65 self,
66 disable_decoding=False,
67 *,
68 disconnect_on_error: Optional[bool] = False,
69 push_request: Optional[bool] = False,
70 ):
71 try:
72 return super().read_response(
73 disable_decoding=disable_decoding,
74 disconnect_on_error=disconnect_on_error,
75 push_request=push_request,
76 )
77 except ReadOnlyError:
78 if self.connection_pool.is_master:
79 # When talking to a master, a ReadOnlyError when likely
80 # indicates that the previous master that we're still connected
81 # to has been demoted to a slave and there's a new master.
82 # calling disconnect will force the connection to re-query
83 # sentinel during the next connect() attempt.
84 self.disconnect()
85 raise ConnectionError("The previous master is now a slave")
86 raise
89class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
90 pass
93class SentinelConnectionPoolProxy:
94 def __init__(
95 self,
96 connection_pool,
97 is_master,
98 check_connection,
99 service_name,
100 sentinel_manager,
101 ):
102 self.connection_pool_ref = weakref.ref(connection_pool)
103 self.is_master = is_master
104 self.check_connection = check_connection
105 self.service_name = service_name
106 self.sentinel_manager = sentinel_manager
107 self.reset()
109 def reset(self):
110 self.master_address = None
111 self.slave_rr_counter = None
113 def get_master_address(self):
114 master_address = self.sentinel_manager.discover_master(self.service_name)
115 if self.is_master and self.master_address != master_address:
116 self.master_address = master_address
117 # disconnect any idle connections so that they reconnect
118 # to the new master the next time that they are used.
119 connection_pool = self.connection_pool_ref()
120 if connection_pool is not None:
121 connection_pool.disconnect(inuse_connections=False)
122 return master_address
124 def rotate_slaves(self):
125 slaves = self.sentinel_manager.discover_slaves(self.service_name)
126 if slaves:
127 if self.slave_rr_counter is None:
128 self.slave_rr_counter = random.randint(0, len(slaves) - 1)
129 for _ in range(len(slaves)):
130 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
131 slave = slaves[self.slave_rr_counter]
132 yield slave
133 # Fallback to the master connection
134 try:
135 yield self.get_master_address()
136 except MasterNotFoundError:
137 pass
138 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
141class SentinelConnectionPool(ConnectionPool):
142 """
143 Sentinel backed connection pool.
145 If ``check_connection`` flag is set to True, SentinelManagedConnection
146 sends a PING command right after establishing the connection.
147 """
149 def __init__(self, service_name, sentinel_manager, **kwargs):
150 kwargs["connection_class"] = kwargs.get(
151 "connection_class",
152 (
153 SentinelManagedSSLConnection
154 if kwargs.pop("ssl", False)
155 else SentinelManagedConnection
156 ),
157 )
158 self.is_master = kwargs.pop("is_master", True)
159 self.check_connection = kwargs.pop("check_connection", False)
160 self.proxy = SentinelConnectionPoolProxy(
161 connection_pool=self,
162 is_master=self.is_master,
163 check_connection=self.check_connection,
164 service_name=service_name,
165 sentinel_manager=sentinel_manager,
166 )
167 super().__init__(**kwargs)
168 self.connection_kwargs["connection_pool"] = self.proxy
169 self.service_name = service_name
170 self.sentinel_manager = sentinel_manager
172 def __repr__(self):
173 role = "master" if self.is_master else "slave"
174 return (
175 f"<{type(self).__module__}.{type(self).__name__}"
176 f"(service={self.service_name}({role}))>"
177 )
179 def reset(self):
180 super().reset()
181 self.proxy.reset()
183 @property
184 def master_address(self):
185 return self.proxy.master_address
187 def owns_connection(self, connection):
188 check = not self.is_master or (
189 self.is_master and self.master_address == (connection.host, connection.port)
190 )
191 parent = super()
192 return check and parent.owns_connection(connection)
194 def get_master_address(self):
195 return self.proxy.get_master_address()
197 def rotate_slaves(self):
198 "Round-robin slave balancer"
199 return self.proxy.rotate_slaves()
202class Sentinel(SentinelCommands):
203 """
204 Redis Sentinel cluster client
206 >>> from redis.sentinel import Sentinel
207 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
208 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
209 >>> master.set('foo', 'bar')
210 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
211 >>> slave.get('foo')
212 b'bar'
214 ``sentinels`` is a list of sentinel nodes. Each node is represented by
215 a pair (hostname, port).
217 ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
218 When querying a sentinel, if it doesn't meet this threshold, responses
219 from that sentinel won't be considered valid.
221 ``sentinel_kwargs`` is a dictionary of connection arguments used when
222 connecting to sentinel instances. Any argument that can be passed to
223 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
224 not specified, any socket_timeout and socket_keepalive options specified
225 in ``connection_kwargs`` will be used.
227 ``connection_kwargs`` are keyword arguments that will be used when
228 establishing a connection to a Redis server.
229 """
231 def __init__(
232 self,
233 sentinels,
234 min_other_sentinels=0,
235 sentinel_kwargs=None,
236 force_master_ip=None,
237 **connection_kwargs,
238 ):
239 # if sentinel_kwargs isn't defined, use the socket_* options from
240 # connection_kwargs
241 if sentinel_kwargs is None:
242 sentinel_kwargs = {
243 k: v for k, v in connection_kwargs.items() if k.startswith("socket_")
244 }
245 self.sentinel_kwargs = sentinel_kwargs
247 self.sentinels = [
248 Redis(hostname, port, **self.sentinel_kwargs)
249 for hostname, port in sentinels
250 ]
251 self.min_other_sentinels = min_other_sentinels
252 self.connection_kwargs = connection_kwargs
253 self._force_master_ip = force_master_ip
255 def execute_command(self, *args, **kwargs):
256 """
257 Execute Sentinel command in sentinel nodes.
258 once - If set to True, then execute the resulting command on a single
259 node at random, rather than across the entire sentinel cluster.
260 """
261 once = bool(kwargs.pop("once", False))
263 # Check if command is supposed to return the original
264 # responses instead of boolean value.
265 return_responses = bool(kwargs.pop("return_responses", False))
267 if once:
268 response = random.choice(self.sentinels).execute_command(*args, **kwargs)
269 if return_responses:
270 return [response]
271 else:
272 return True if response else False
274 responses = []
275 for sentinel in self.sentinels:
276 responses.append(sentinel.execute_command(*args, **kwargs))
278 if return_responses:
279 return responses
281 return all(responses)
283 def __repr__(self):
284 sentinel_addresses = []
285 for sentinel in self.sentinels:
286 sentinel_addresses.append(
287 "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs)
288 )
289 return (
290 f"<{type(self).__module__}.{type(self).__name__}"
291 f"(sentinels=[{','.join(sentinel_addresses)}])>"
292 )
294 def check_master_state(self, state, service_name):
295 if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
296 return False
297 # Check if our sentinel doesn't see other nodes
298 if state["num-other-sentinels"] < self.min_other_sentinels:
299 return False
300 return True
302 def discover_master(self, service_name):
303 """
304 Asks sentinel servers for the Redis master's address corresponding
305 to the service labeled ``service_name``.
307 Returns a pair (address, port) or raises MasterNotFoundError if no
308 master is found.
309 """
310 collected_errors = list()
311 for sentinel_no, sentinel in enumerate(self.sentinels):
312 try:
313 masters = sentinel.sentinel_masters()
314 except (ConnectionError, TimeoutError) as e:
315 collected_errors.append(f"{sentinel} - {e!r}")
316 continue
317 state = masters.get(service_name)
318 if state and self.check_master_state(state, service_name):
319 # Put this sentinel at the top of the list
320 self.sentinels[0], self.sentinels[sentinel_no] = (
321 sentinel,
322 self.sentinels[0],
323 )
325 ip = (
326 self._force_master_ip
327 if self._force_master_ip is not None
328 else state["ip"]
329 )
330 return ip, state["port"]
332 error_info = ""
333 if len(collected_errors) > 0:
334 error_info = f" : {', '.join(collected_errors)}"
335 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}")
337 def filter_slaves(self, slaves):
338 "Remove slaves that are in an ODOWN or SDOWN state"
339 slaves_alive = []
340 for slave in slaves:
341 if slave["is_odown"] or slave["is_sdown"]:
342 continue
343 slaves_alive.append((slave["ip"], slave["port"]))
344 return slaves_alive
346 def discover_slaves(self, service_name):
347 "Returns a list of alive slaves for service ``service_name``"
348 for sentinel in self.sentinels:
349 try:
350 slaves = sentinel.sentinel_slaves(service_name)
351 except (ConnectionError, ResponseError, TimeoutError):
352 continue
353 slaves = self.filter_slaves(slaves)
354 if slaves:
355 return slaves
356 return []
358 def master_for(
359 self,
360 service_name,
361 redis_class=Redis,
362 connection_pool_class=SentinelConnectionPool,
363 **kwargs,
364 ):
365 """
366 Returns a redis client instance for the ``service_name`` master.
367 Sentinel client will detect failover and reconnect Redis clients
368 automatically.
370 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is
371 used to retrieve the master's address before establishing a new
372 connection.
374 NOTE: If the master's address has changed, any cached connections to
375 the old master are closed.
377 By default clients will be a :py:class:`~redis.Redis` instance.
378 Specify a different class to the ``redis_class`` argument if you
379 desire something different.
381 The ``connection_pool_class`` specifies the connection pool to
382 use. The :py:class:`~redis.sentinel.SentinelConnectionPool`
383 will be used by default.
385 All other keyword arguments are merged with any connection_kwargs
386 passed to this class and passed to the connection pool as keyword
387 arguments to be used to initialize Redis connections.
388 """
389 kwargs["is_master"] = True
390 connection_kwargs = dict(self.connection_kwargs)
391 connection_kwargs.update(kwargs)
392 return redis_class.from_pool(
393 connection_pool_class(service_name, self, **connection_kwargs)
394 )
396 def slave_for(
397 self,
398 service_name,
399 redis_class=Redis,
400 connection_pool_class=SentinelConnectionPool,
401 **kwargs,
402 ):
403 """
404 Returns redis client instance for the ``service_name`` slave(s).
406 A SentinelConnectionPool class is used to retrieve the slave's
407 address before establishing a new connection.
409 By default clients will be a :py:class:`~redis.Redis` instance.
410 Specify a different class to the ``redis_class`` argument if you
411 desire something different.
413 The ``connection_pool_class`` specifies the connection pool to use.
414 The SentinelConnectionPool will be used by default.
416 All other keyword arguments are merged with any connection_kwargs
417 passed to this class and passed to the connection pool as keyword
418 arguments to be used to initialize Redis connections.
419 """
420 kwargs["is_master"] = False
421 connection_kwargs = dict(self.connection_kwargs)
422 connection_kwargs.update(kwargs)
423 return redis_class.from_pool(
424 connection_pool_class(service_name, self, **connection_kwargs)
425 )