Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/sentinel.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import random
2import weakref
3from typing import Optional, Union
5from redis._parsers.socket import SENTINEL
6from redis.client import Redis
7from redis.commands import SentinelCommands
8from redis.connection import Connection, ConnectionPool, SSLConnection
9from redis.exceptions import (
10 ConnectionError,
11 ReadOnlyError,
12 ResponseError,
13 TimeoutError,
14)
17class MasterNotFoundError(ConnectionError):
18 pass
21class SlaveNotFoundError(ConnectionError):
22 pass
25class SentinelManagedConnection(Connection):
26 def __init__(self, **kwargs):
27 self.connection_pool = kwargs.pop("connection_pool")
28 super().__init__(**kwargs)
30 def __repr__(self):
31 pool = self.connection_pool
32 s = (
33 f"<{type(self).__module__}.{type(self).__name__}"
34 f"(service={pool.service_name}%s)>"
35 )
36 if self.host:
37 host_info = f",host={self.host},port={self.port}"
38 s = s % host_info
39 return s
41 def connect_to(self, address):
42 self.host, self.port = address
44 self.connect_check_health(
45 check_health=self.connection_pool.check_connection,
46 retry_socket_connect=False,
47 )
49 def _connect_retry(self):
50 if self._sock:
51 return # already connected
52 if self.connection_pool.is_master:
53 self.connect_to(self.connection_pool.get_master_address())
54 else:
55 for slave in self.connection_pool.rotate_slaves():
56 try:
57 return self.connect_to(slave)
58 except ConnectionError:
59 continue
60 raise SlaveNotFoundError # Never be here
62 def connect(self):
63 return self.retry.call_with_retry(self._connect_retry, lambda error: None)
65 def read_response(
66 self,
67 disable_decoding=False,
68 *,
69 timeout: Union[float, object] = SENTINEL,
70 disconnect_on_error: Optional[bool] = False,
71 push_request: Optional[bool] = False,
72 ):
73 try:
74 return super().read_response(
75 disable_decoding=disable_decoding,
76 timeout=timeout,
77 disconnect_on_error=disconnect_on_error,
78 push_request=push_request,
79 )
80 except ReadOnlyError:
81 if self.connection_pool.is_master:
82 # When talking to a master, a ReadOnlyError when likely
83 # indicates that the previous master that we're still connected
84 # to has been demoted to a slave and there's a new master.
85 # calling disconnect will force the connection to re-query
86 # sentinel during the next connect() attempt.
87 self.disconnect()
88 raise ConnectionError("The previous master is now a slave")
89 raise
92class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
93 pass
96class SentinelConnectionPoolProxy:
97 def __init__(
98 self,
99 connection_pool,
100 is_master,
101 check_connection,
102 service_name,
103 sentinel_manager,
104 ):
105 self.connection_pool_ref = weakref.ref(connection_pool)
106 self.is_master = is_master
107 self.check_connection = check_connection
108 self.service_name = service_name
109 self.sentinel_manager = sentinel_manager
110 self.reset()
112 def reset(self):
113 self.master_address = None
114 self.slave_rr_counter = None
116 def get_master_address(self):
117 master_address = self.sentinel_manager.discover_master(self.service_name)
118 if self.is_master and self.master_address != master_address:
119 self.master_address = master_address
120 # disconnect any idle connections so that they reconnect
121 # to the new master the next time that they are used.
122 connection_pool = self.connection_pool_ref()
123 if connection_pool is not None:
124 connection_pool.disconnect(inuse_connections=False)
125 return master_address
127 def rotate_slaves(self):
128 slaves = self.sentinel_manager.discover_slaves(self.service_name)
129 if slaves:
130 if self.slave_rr_counter is None:
131 self.slave_rr_counter = random.randint(0, len(slaves) - 1)
132 for _ in range(len(slaves)):
133 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
134 slave = slaves[self.slave_rr_counter]
135 yield slave
136 # Fallback to the master connection
137 try:
138 yield self.get_master_address()
139 except MasterNotFoundError:
140 pass
141 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
144class SentinelConnectionPool(ConnectionPool):
145 """
146 Sentinel backed connection pool.
148 If ``check_connection`` flag is set to True, SentinelManagedConnection
149 sends a PING command right after establishing the connection.
150 """
152 def __init__(self, service_name, sentinel_manager, **kwargs):
153 kwargs["connection_class"] = kwargs.get(
154 "connection_class",
155 (
156 SentinelManagedSSLConnection
157 if kwargs.pop("ssl", False)
158 else SentinelManagedConnection
159 ),
160 )
161 self.is_master = kwargs.pop("is_master", True)
162 self.check_connection = kwargs.pop("check_connection", False)
163 self.proxy = SentinelConnectionPoolProxy(
164 connection_pool=self,
165 is_master=self.is_master,
166 check_connection=self.check_connection,
167 service_name=service_name,
168 sentinel_manager=sentinel_manager,
169 )
170 super().__init__(**kwargs)
171 self.connection_kwargs["connection_pool"] = self.proxy
172 self.service_name = service_name
173 self.sentinel_manager = sentinel_manager
175 def __repr__(self):
176 role = "master" if self.is_master else "slave"
177 return (
178 f"<{type(self).__module__}.{type(self).__name__}"
179 f"(service={self.service_name}({role}))>"
180 )
182 def reset(self):
183 super().reset()
184 self.proxy.reset()
186 @property
187 def master_address(self):
188 return self.proxy.master_address
190 def owns_connection(self, connection):
191 check = not self.is_master or (
192 self.is_master and self.master_address == (connection.host, connection.port)
193 )
194 parent = super()
195 return check and parent.owns_connection(connection)
197 def get_master_address(self):
198 return self.proxy.get_master_address()
200 def rotate_slaves(self):
201 "Round-robin slave balancer"
202 return self.proxy.rotate_slaves()
205class Sentinel(SentinelCommands):
206 """
207 Redis Sentinel cluster client
209 >>> from redis.sentinel import Sentinel
210 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
211 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
212 >>> master.set('foo', 'bar')
213 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
214 >>> slave.get('foo')
215 b'bar'
217 ``sentinels`` is a list of sentinel nodes. Each node is represented by
218 a pair (hostname, port).
220 ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
221 When querying a sentinel, if it doesn't meet this threshold, responses
222 from that sentinel won't be considered valid.
224 ``sentinel_kwargs`` is a dictionary of connection arguments used when
225 connecting to sentinel instances. Any argument that can be passed to
226 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
227 not specified, any socket_timeout and socket_keepalive options specified
228 in ``connection_kwargs`` will be used.
230 ``connection_kwargs`` are keyword arguments that will be used when
231 establishing a connection to a Redis server.
232 """
234 def __init__(
235 self,
236 sentinels,
237 min_other_sentinels=0,
238 sentinel_kwargs=None,
239 force_master_ip=None,
240 **connection_kwargs,
241 ):
242 # if sentinel_kwargs isn't defined, use the socket_* options from
243 # connection_kwargs
244 if sentinel_kwargs is None:
245 sentinel_kwargs = {
246 k: v for k, v in connection_kwargs.items() if k.startswith("socket_")
247 }
248 self.sentinel_kwargs = sentinel_kwargs
250 self.sentinels = [
251 Redis(hostname, port, **self.sentinel_kwargs)
252 for hostname, port in sentinels
253 ]
254 self.min_other_sentinels = min_other_sentinels
255 self.connection_kwargs = connection_kwargs
256 self._force_master_ip = force_master_ip
258 def execute_command(self, *args, **kwargs):
259 """
260 Execute Sentinel command in sentinel nodes.
261 once - If set to True, then execute the resulting command on a single
262 node at random, rather than across the entire sentinel cluster.
263 """
264 once = bool(kwargs.pop("once", False))
266 # Check if command is supposed to return the original
267 # responses instead of boolean value.
268 return_responses = bool(kwargs.pop("return_responses", False))
270 if once:
271 response = random.choice(self.sentinels).execute_command(*args, **kwargs)
272 if return_responses:
273 return [response]
274 else:
275 return True if response else False
277 responses = []
278 for sentinel in self.sentinels:
279 responses.append(sentinel.execute_command(*args, **kwargs))
281 if return_responses:
282 return responses
284 return all(responses)
286 def __repr__(self):
287 sentinel_addresses = []
288 for sentinel in self.sentinels:
289 sentinel_addresses.append(
290 "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs)
291 )
292 return (
293 f"<{type(self).__module__}.{type(self).__name__}"
294 f"(sentinels=[{','.join(sentinel_addresses)}])>"
295 )
297 def check_master_state(self, state, service_name):
298 if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
299 return False
300 # Check if our sentinel doesn't see other nodes
301 if state["num-other-sentinels"] < self.min_other_sentinels:
302 return False
303 return True
305 def discover_master(self, service_name):
306 """
307 Asks sentinel servers for the Redis master's address corresponding
308 to the service labeled ``service_name``.
310 Returns a pair (address, port) or raises MasterNotFoundError if no
311 master is found.
312 """
313 collected_errors = list()
314 for sentinel_no, sentinel in enumerate(self.sentinels):
315 try:
316 masters = sentinel.sentinel_masters()
317 except (ConnectionError, TimeoutError) as e:
318 collected_errors.append(f"{sentinel} - {e!r}")
319 continue
320 state = masters.get(service_name)
321 if state and self.check_master_state(state, service_name):
322 # Put this sentinel at the top of the list
323 self.sentinels[0], self.sentinels[sentinel_no] = (
324 sentinel,
325 self.sentinels[0],
326 )
328 ip = (
329 self._force_master_ip
330 if self._force_master_ip is not None
331 else state["ip"]
332 )
333 return ip, state["port"]
335 error_info = ""
336 if len(collected_errors) > 0:
337 error_info = f" : {', '.join(collected_errors)}"
338 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}")
340 def filter_slaves(self, slaves):
341 "Remove slaves that are in an ODOWN or SDOWN state"
342 slaves_alive = []
343 for slave in slaves:
344 if slave["is_odown"] or slave["is_sdown"]:
345 continue
346 slaves_alive.append((slave["ip"], slave["port"]))
347 return slaves_alive
349 def discover_slaves(self, service_name):
350 "Returns a list of alive slaves for service ``service_name``"
351 for sentinel in self.sentinels:
352 try:
353 slaves = sentinel.sentinel_slaves(service_name)
354 except (ConnectionError, ResponseError, TimeoutError):
355 continue
356 slaves = self.filter_slaves(slaves)
357 if slaves:
358 return slaves
359 return []
361 def master_for(
362 self,
363 service_name,
364 redis_class=Redis,
365 connection_pool_class=SentinelConnectionPool,
366 **kwargs,
367 ):
368 """
369 Returns a redis client instance for the ``service_name`` master.
370 Sentinel client will detect failover and reconnect Redis clients
371 automatically.
373 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is
374 used to retrieve the master's address before establishing a new
375 connection.
377 NOTE: If the master's address has changed, any cached connections to
378 the old master are closed.
380 By default clients will be a :py:class:`~redis.Redis` instance.
381 Specify a different class to the ``redis_class`` argument if you
382 desire something different.
384 The ``connection_pool_class`` specifies the connection pool to
385 use. The :py:class:`~redis.sentinel.SentinelConnectionPool`
386 will be used by default.
388 All other keyword arguments are merged with any connection_kwargs
389 passed to this class and passed to the connection pool as keyword
390 arguments to be used to initialize Redis connections.
391 """
392 kwargs["is_master"] = True
393 connection_kwargs = dict(self.connection_kwargs)
394 connection_kwargs.update(kwargs)
395 return redis_class.from_pool(
396 connection_pool_class(service_name, self, **connection_kwargs)
397 )
399 def slave_for(
400 self,
401 service_name,
402 redis_class=Redis,
403 connection_pool_class=SentinelConnectionPool,
404 **kwargs,
405 ):
406 """
407 Returns redis client instance for the ``service_name`` slave(s).
409 A SentinelConnectionPool class is used to retrieve the slave's
410 address before establishing a new connection.
412 By default clients will be a :py:class:`~redis.Redis` instance.
413 Specify a different class to the ``redis_class`` argument if you
414 desire something different.
416 The ``connection_pool_class`` specifies the connection pool to use.
417 The SentinelConnectionPool will be used by default.
419 All other keyword arguments are merged with any connection_kwargs
420 passed to this class and passed to the connection pool as keyword
421 arguments to be used to initialize Redis connections.
422 """
423 kwargs["is_master"] = False
424 connection_kwargs = dict(self.connection_kwargs)
425 connection_kwargs.update(kwargs)
426 return redis_class.from_pool(
427 connection_pool_class(service_name, self, **connection_kwargs)
428 )