Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/sentinel.py: 23%
159 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:09 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 07:09 +0000
1import random
2import weakref
4from redis.client import Redis
5from redis.commands import SentinelCommands
6from redis.connection import Connection, ConnectionPool, SSLConnection
7from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
8from redis.utils import str_if_bytes
11class MasterNotFoundError(ConnectionError):
12 pass
15class SlaveNotFoundError(ConnectionError):
16 pass
19class SentinelManagedConnection(Connection):
20 def __init__(self, **kwargs):
21 self.connection_pool = kwargs.pop("connection_pool")
22 super().__init__(**kwargs)
24 def __repr__(self):
25 pool = self.connection_pool
26 s = f"{type(self).__name__}<service={pool.service_name}%s>"
27 if self.host:
28 host_info = f",host={self.host},port={self.port}"
29 s = s % host_info
30 return s
32 def connect_to(self, address):
33 self.host, self.port = address
34 super().connect()
35 if self.connection_pool.check_connection:
36 self.send_command("PING")
37 if str_if_bytes(self.read_response()) != "PONG":
38 raise ConnectionError("PING failed")
40 def _connect_retry(self):
41 if self._sock:
42 return # already connected
43 if self.connection_pool.is_master:
44 self.connect_to(self.connection_pool.get_master_address())
45 else:
46 for slave in self.connection_pool.rotate_slaves():
47 try:
48 return self.connect_to(slave)
49 except ConnectionError:
50 continue
51 raise SlaveNotFoundError # Never be here
53 def connect(self):
54 return self.retry.call_with_retry(self._connect_retry, lambda error: None)
56 def read_response(self, disable_decoding=False):
57 try:
58 return super().read_response(disable_decoding=disable_decoding)
59 except ReadOnlyError:
60 if self.connection_pool.is_master:
61 # When talking to a master, a ReadOnlyError when likely
62 # indicates that the previous master that we're still connected
63 # to has been demoted to a slave and there's a new master.
64 # calling disconnect will force the connection to re-query
65 # sentinel during the next connect() attempt.
66 self.disconnect()
67 raise ConnectionError("The previous master is now a slave")
68 raise
71class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
72 pass
75class SentinelConnectionPool(ConnectionPool):
76 """
77 Sentinel backed connection pool.
79 If ``check_connection`` flag is set to True, SentinelManagedConnection
80 sends a PING command right after establishing the connection.
81 """
83 def __init__(self, service_name, sentinel_manager, **kwargs):
84 kwargs["connection_class"] = kwargs.get(
85 "connection_class",
86 SentinelManagedSSLConnection
87 if kwargs.pop("ssl", False)
88 else SentinelManagedConnection,
89 )
90 self.is_master = kwargs.pop("is_master", True)
91 self.check_connection = kwargs.pop("check_connection", False)
92 super().__init__(**kwargs)
93 self.connection_kwargs["connection_pool"] = weakref.proxy(self)
94 self.service_name = service_name
95 self.sentinel_manager = sentinel_manager
97 def __repr__(self):
98 role = "master" if self.is_master else "slave"
99 return f"{type(self).__name__}<service={self.service_name}({role})"
101 def reset(self):
102 super().reset()
103 self.master_address = None
104 self.slave_rr_counter = None
106 def owns_connection(self, connection):
107 check = not self.is_master or (
108 self.is_master and self.master_address == (connection.host, connection.port)
109 )
110 parent = super()
111 return check and parent.owns_connection(connection)
113 def get_master_address(self):
114 master_address = self.sentinel_manager.discover_master(self.service_name)
115 if self.is_master:
116 if self.master_address != master_address:
117 self.master_address = master_address
118 # disconnect any idle connections so that they reconnect
119 # to the new master the next time that they are used.
120 self.disconnect(inuse_connections=False)
121 return master_address
123 def rotate_slaves(self):
124 "Round-robin slave balancer"
125 slaves = self.sentinel_manager.discover_slaves(self.service_name)
126 if slaves:
127 if self.slave_rr_counter is None:
128 self.slave_rr_counter = random.randint(0, len(slaves) - 1)
129 for _ in range(len(slaves)):
130 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
131 slave = slaves[self.slave_rr_counter]
132 yield slave
133 # Fallback to the master connection
134 try:
135 yield self.get_master_address()
136 except MasterNotFoundError:
137 pass
138 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
141class Sentinel(SentinelCommands):
142 """
143 Redis Sentinel cluster client
145 >>> from redis.sentinel import Sentinel
146 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
147 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
148 >>> master.set('foo', 'bar')
149 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
150 >>> slave.get('foo')
151 b'bar'
153 ``sentinels`` is a list of sentinel nodes. Each node is represented by
154 a pair (hostname, port).
156 ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
157 When querying a sentinel, if it doesn't meet this threshold, responses
158 from that sentinel won't be considered valid.
160 ``sentinel_kwargs`` is a dictionary of connection arguments used when
161 connecting to sentinel instances. Any argument that can be passed to
162 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
163 not specified, any socket_timeout and socket_keepalive options specified
164 in ``connection_kwargs`` will be used.
166 ``connection_kwargs`` are keyword arguments that will be used when
167 establishing a connection to a Redis server.
168 """
170 def __init__(
171 self,
172 sentinels,
173 min_other_sentinels=0,
174 sentinel_kwargs=None,
175 **connection_kwargs,
176 ):
177 # if sentinel_kwargs isn't defined, use the socket_* options from
178 # connection_kwargs
179 if sentinel_kwargs is None:
180 sentinel_kwargs = {
181 k: v for k, v in connection_kwargs.items() if k.startswith("socket_")
182 }
183 self.sentinel_kwargs = sentinel_kwargs
185 self.sentinels = [
186 Redis(hostname, port, **self.sentinel_kwargs)
187 for hostname, port in sentinels
188 ]
189 self.min_other_sentinels = min_other_sentinels
190 self.connection_kwargs = connection_kwargs
192 def execute_command(self, *args, **kwargs):
193 """
194 Execute Sentinel command in sentinel nodes.
195 once - If set to True, then execute the resulting command on a single
196 node at random, rather than across the entire sentinel cluster.
197 """
198 once = bool(kwargs.get("once", False))
199 if "once" in kwargs.keys():
200 kwargs.pop("once")
202 if once:
203 random.choice(self.sentinels).execute_command(*args, **kwargs)
204 else:
205 for sentinel in self.sentinels:
206 sentinel.execute_command(*args, **kwargs)
207 return True
209 def __repr__(self):
210 sentinel_addresses = []
211 for sentinel in self.sentinels:
212 sentinel_addresses.append(
213 "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs)
214 )
215 return f'{type(self).__name__}<sentinels=[{",".join(sentinel_addresses)}]>'
217 def check_master_state(self, state, service_name):
218 if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
219 return False
220 # Check if our sentinel doesn't see other nodes
221 if state["num-other-sentinels"] < self.min_other_sentinels:
222 return False
223 return True
225 def discover_master(self, service_name):
226 """
227 Asks sentinel servers for the Redis master's address corresponding
228 to the service labeled ``service_name``.
230 Returns a pair (address, port) or raises MasterNotFoundError if no
231 master is found.
232 """
233 for sentinel_no, sentinel in enumerate(self.sentinels):
234 try:
235 masters = sentinel.sentinel_masters()
236 except (ConnectionError, TimeoutError):
237 continue
238 state = masters.get(service_name)
239 if state and self.check_master_state(state, service_name):
240 # Put this sentinel at the top of the list
241 self.sentinels[0], self.sentinels[sentinel_no] = (
242 sentinel,
243 self.sentinels[0],
244 )
245 return state["ip"], state["port"]
246 raise MasterNotFoundError(f"No master found for {service_name!r}")
248 def filter_slaves(self, slaves):
249 "Remove slaves that are in an ODOWN or SDOWN state"
250 slaves_alive = []
251 for slave in slaves:
252 if slave["is_odown"] or slave["is_sdown"]:
253 continue
254 slaves_alive.append((slave["ip"], slave["port"]))
255 return slaves_alive
257 def discover_slaves(self, service_name):
258 "Returns a list of alive slaves for service ``service_name``"
259 for sentinel in self.sentinels:
260 try:
261 slaves = sentinel.sentinel_slaves(service_name)
262 except (ConnectionError, ResponseError, TimeoutError):
263 continue
264 slaves = self.filter_slaves(slaves)
265 if slaves:
266 return slaves
267 return []
269 def master_for(
270 self,
271 service_name,
272 redis_class=Redis,
273 connection_pool_class=SentinelConnectionPool,
274 **kwargs,
275 ):
276 """
277 Returns a redis client instance for the ``service_name`` master.
279 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is
280 used to retrieve the master's address before establishing a new
281 connection.
283 NOTE: If the master's address has changed, any cached connections to
284 the old master are closed.
286 By default clients will be a :py:class:`~redis.Redis` instance.
287 Specify a different class to the ``redis_class`` argument if you
288 desire something different.
290 The ``connection_pool_class`` specifies the connection pool to
291 use. The :py:class:`~redis.sentinel.SentinelConnectionPool`
292 will be used by default.
294 All other keyword arguments are merged with any connection_kwargs
295 passed to this class and passed to the connection pool as keyword
296 arguments to be used to initialize Redis connections.
297 """
298 kwargs["is_master"] = True
299 connection_kwargs = dict(self.connection_kwargs)
300 connection_kwargs.update(kwargs)
301 return redis_class(
302 connection_pool=connection_pool_class(
303 service_name, self, **connection_kwargs
304 )
305 )
307 def slave_for(
308 self,
309 service_name,
310 redis_class=Redis,
311 connection_pool_class=SentinelConnectionPool,
312 **kwargs,
313 ):
314 """
315 Returns redis client instance for the ``service_name`` slave(s).
317 A SentinelConnectionPool class is used to retrieve the slave's
318 address before establishing a new connection.
320 By default clients will be a :py:class:`~redis.Redis` instance.
321 Specify a different class to the ``redis_class`` argument if you
322 desire something different.
324 The ``connection_pool_class`` specifies the connection pool to use.
325 The SentinelConnectionPool will be used by default.
327 All other keyword arguments are merged with any connection_kwargs
328 passed to this class and passed to the connection pool as keyword
329 arguments to be used to initialize Redis connections.
330 """
331 kwargs["is_master"] = False
332 connection_kwargs = dict(self.connection_kwargs)
333 connection_kwargs.update(kwargs)
334 return redis_class(
335 connection_pool=connection_pool_class(
336 service_name, self, **connection_kwargs
337 )
338 )