Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/sentinel.py: 23%
165 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:16 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:16 +0000
1import random
2import weakref
3from typing import Optional
5from redis.client import Redis
6from redis.commands import SentinelCommands
7from redis.connection import Connection, ConnectionPool, SSLConnection
8from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
9from redis.utils import str_if_bytes
12class MasterNotFoundError(ConnectionError):
13 pass
16class SlaveNotFoundError(ConnectionError):
17 pass
20class SentinelManagedConnection(Connection):
21 def __init__(self, **kwargs):
22 self.connection_pool = kwargs.pop("connection_pool")
23 super().__init__(**kwargs)
25 def __repr__(self):
26 pool = self.connection_pool
27 s = f"{type(self).__name__}<service={pool.service_name}%s>"
28 if self.host:
29 host_info = f",host={self.host},port={self.port}"
30 s = s % host_info
31 return s
33 def connect_to(self, address):
34 self.host, self.port = address
35 super().connect()
36 if self.connection_pool.check_connection:
37 self.send_command("PING")
38 if str_if_bytes(self.read_response()) != "PONG":
39 raise ConnectionError("PING failed")
41 def _connect_retry(self):
42 if self._sock:
43 return # already connected
44 if self.connection_pool.is_master:
45 self.connect_to(self.connection_pool.get_master_address())
46 else:
47 for slave in self.connection_pool.rotate_slaves():
48 try:
49 return self.connect_to(slave)
50 except ConnectionError:
51 continue
52 raise SlaveNotFoundError # Never be here
54 def connect(self):
55 return self.retry.call_with_retry(self._connect_retry, lambda error: None)
57 def read_response(
58 self, disable_decoding=False, *, disconnect_on_error: Optional[bool] = False
59 ):
60 try:
61 return super().read_response(
62 disable_decoding=disable_decoding,
63 disconnect_on_error=disconnect_on_error,
64 )
65 except ReadOnlyError:
66 if self.connection_pool.is_master:
67 # When talking to a master, a ReadOnlyError when likely
68 # indicates that the previous master that we're still connected
69 # to has been demoted to a slave and there's a new master.
70 # calling disconnect will force the connection to re-query
71 # sentinel during the next connect() attempt.
72 self.disconnect()
73 raise ConnectionError("The previous master is now a slave")
74 raise
77class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
78 pass
81class SentinelConnectionPool(ConnectionPool):
82 """
83 Sentinel backed connection pool.
85 If ``check_connection`` flag is set to True, SentinelManagedConnection
86 sends a PING command right after establishing the connection.
87 """
89 def __init__(self, service_name, sentinel_manager, **kwargs):
90 kwargs["connection_class"] = kwargs.get(
91 "connection_class",
92 SentinelManagedSSLConnection
93 if kwargs.pop("ssl", False)
94 else SentinelManagedConnection,
95 )
96 self.is_master = kwargs.pop("is_master", True)
97 self.check_connection = kwargs.pop("check_connection", False)
98 super().__init__(**kwargs)
99 self.connection_kwargs["connection_pool"] = weakref.proxy(self)
100 self.service_name = service_name
101 self.sentinel_manager = sentinel_manager
103 def __repr__(self):
104 role = "master" if self.is_master else "slave"
105 return f"{type(self).__name__}<service={self.service_name}({role})"
107 def reset(self):
108 super().reset()
109 self.master_address = None
110 self.slave_rr_counter = None
112 def owns_connection(self, connection):
113 check = not self.is_master or (
114 self.is_master and self.master_address == (connection.host, connection.port)
115 )
116 parent = super()
117 return check and parent.owns_connection(connection)
119 def get_master_address(self):
120 master_address = self.sentinel_manager.discover_master(self.service_name)
121 if self.is_master:
122 if self.master_address != master_address:
123 self.master_address = master_address
124 # disconnect any idle connections so that they reconnect
125 # to the new master the next time that they are used.
126 self.disconnect(inuse_connections=False)
127 return master_address
129 def rotate_slaves(self):
130 "Round-robin slave balancer"
131 slaves = self.sentinel_manager.discover_slaves(self.service_name)
132 if slaves:
133 if self.slave_rr_counter is None:
134 self.slave_rr_counter = random.randint(0, len(slaves) - 1)
135 for _ in range(len(slaves)):
136 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
137 slave = slaves[self.slave_rr_counter]
138 yield slave
139 # Fallback to the master connection
140 try:
141 yield self.get_master_address()
142 except MasterNotFoundError:
143 pass
144 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
147class Sentinel(SentinelCommands):
148 """
149 Redis Sentinel cluster client
151 >>> from redis.sentinel import Sentinel
152 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
153 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
154 >>> master.set('foo', 'bar')
155 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
156 >>> slave.get('foo')
157 b'bar'
159 ``sentinels`` is a list of sentinel nodes. Each node is represented by
160 a pair (hostname, port).
162 ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
163 When querying a sentinel, if it doesn't meet this threshold, responses
164 from that sentinel won't be considered valid.
166 ``sentinel_kwargs`` is a dictionary of connection arguments used when
167 connecting to sentinel instances. Any argument that can be passed to
168 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
169 not specified, any socket_timeout and socket_keepalive options specified
170 in ``connection_kwargs`` will be used.
172 ``connection_kwargs`` are keyword arguments that will be used when
173 establishing a connection to a Redis server.
174 """
176 def __init__(
177 self,
178 sentinels,
179 min_other_sentinels=0,
180 sentinel_kwargs=None,
181 **connection_kwargs,
182 ):
183 # if sentinel_kwargs isn't defined, use the socket_* options from
184 # connection_kwargs
185 if sentinel_kwargs is None:
186 sentinel_kwargs = {
187 k: v for k, v in connection_kwargs.items() if k.startswith("socket_")
188 }
189 self.sentinel_kwargs = sentinel_kwargs
191 self.sentinels = [
192 Redis(hostname, port, **self.sentinel_kwargs)
193 for hostname, port in sentinels
194 ]
195 self.min_other_sentinels = min_other_sentinels
196 self.connection_kwargs = connection_kwargs
198 def execute_command(self, *args, **kwargs):
199 """
200 Execute Sentinel command in sentinel nodes.
201 once - If set to True, then execute the resulting command on a single
202 node at random, rather than across the entire sentinel cluster.
203 """
204 once = bool(kwargs.get("once", False))
205 if "once" in kwargs.keys():
206 kwargs.pop("once")
208 if once:
209 random.choice(self.sentinels).execute_command(*args, **kwargs)
210 else:
211 for sentinel in self.sentinels:
212 sentinel.execute_command(*args, **kwargs)
213 return True
215 def __repr__(self):
216 sentinel_addresses = []
217 for sentinel in self.sentinels:
218 sentinel_addresses.append(
219 "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs)
220 )
221 return f'{type(self).__name__}<sentinels=[{",".join(sentinel_addresses)}]>'
223 def check_master_state(self, state, service_name):
224 if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
225 return False
226 # Check if our sentinel doesn't see other nodes
227 if state["num-other-sentinels"] < self.min_other_sentinels:
228 return False
229 return True
231 def discover_master(self, service_name):
232 """
233 Asks sentinel servers for the Redis master's address corresponding
234 to the service labeled ``service_name``.
236 Returns a pair (address, port) or raises MasterNotFoundError if no
237 master is found.
238 """
239 collected_errors = list()
240 for sentinel_no, sentinel in enumerate(self.sentinels):
241 try:
242 masters = sentinel.sentinel_masters()
243 except (ConnectionError, TimeoutError) as e:
244 collected_errors.append(f"{sentinel} - {e!r}")
245 continue
246 state = masters.get(service_name)
247 if state and self.check_master_state(state, service_name):
248 # Put this sentinel at the top of the list
249 self.sentinels[0], self.sentinels[sentinel_no] = (
250 sentinel,
251 self.sentinels[0],
252 )
253 return state["ip"], state["port"]
255 error_info = ""
256 if len(collected_errors) > 0:
257 error_info = f" : {', '.join(collected_errors)}"
258 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}")
260 def filter_slaves(self, slaves):
261 "Remove slaves that are in an ODOWN or SDOWN state"
262 slaves_alive = []
263 for slave in slaves:
264 if slave["is_odown"] or slave["is_sdown"]:
265 continue
266 slaves_alive.append((slave["ip"], slave["port"]))
267 return slaves_alive
269 def discover_slaves(self, service_name):
270 "Returns a list of alive slaves for service ``service_name``"
271 for sentinel in self.sentinels:
272 try:
273 slaves = sentinel.sentinel_slaves(service_name)
274 except (ConnectionError, ResponseError, TimeoutError):
275 continue
276 slaves = self.filter_slaves(slaves)
277 if slaves:
278 return slaves
279 return []
281 def master_for(
282 self,
283 service_name,
284 redis_class=Redis,
285 connection_pool_class=SentinelConnectionPool,
286 **kwargs,
287 ):
288 """
289 Returns a redis client instance for the ``service_name`` master.
291 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is
292 used to retrieve the master's address before establishing a new
293 connection.
295 NOTE: If the master's address has changed, any cached connections to
296 the old master are closed.
298 By default clients will be a :py:class:`~redis.Redis` instance.
299 Specify a different class to the ``redis_class`` argument if you
300 desire something different.
302 The ``connection_pool_class`` specifies the connection pool to
303 use. The :py:class:`~redis.sentinel.SentinelConnectionPool`
304 will be used by default.
306 All other keyword arguments are merged with any connection_kwargs
307 passed to this class and passed to the connection pool as keyword
308 arguments to be used to initialize Redis connections.
309 """
310 kwargs["is_master"] = True
311 connection_kwargs = dict(self.connection_kwargs)
312 connection_kwargs.update(kwargs)
313 return redis_class(
314 connection_pool=connection_pool_class(
315 service_name, self, **connection_kwargs
316 )
317 )
319 def slave_for(
320 self,
321 service_name,
322 redis_class=Redis,
323 connection_pool_class=SentinelConnectionPool,
324 **kwargs,
325 ):
326 """
327 Returns redis client instance for the ``service_name`` slave(s).
329 A SentinelConnectionPool class is used to retrieve the slave's
330 address before establishing a new connection.
332 By default clients will be a :py:class:`~redis.Redis` instance.
333 Specify a different class to the ``redis_class`` argument if you
334 desire something different.
336 The ``connection_pool_class`` specifies the connection pool to use.
337 The SentinelConnectionPool will be used by default.
339 All other keyword arguments are merged with any connection_kwargs
340 passed to this class and passed to the connection pool as keyword
341 arguments to be used to initialize Redis connections.
342 """
343 kwargs["is_master"] = False
344 connection_kwargs = dict(self.connection_kwargs)
345 connection_kwargs.update(kwargs)
346 return redis_class(
347 connection_pool=connection_pool_class(
348 service_name, self, **connection_kwargs
349 )
350 )