1import asyncio
2import random
3import weakref
4from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type
5
6from redis.asyncio.client import Redis
7from redis.asyncio.connection import (
8 Connection,
9 ConnectionPool,
10 EncodableT,
11 SSLConnection,
12)
13from redis.commands import AsyncSentinelCommands
14from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
15from redis.utils import str_if_bytes
16
17
18class MasterNotFoundError(ConnectionError):
19 pass
20
21
22class SlaveNotFoundError(ConnectionError):
23 pass
24
25
26class SentinelManagedConnection(Connection):
27 def __init__(self, **kwargs):
28 self.connection_pool = kwargs.pop("connection_pool")
29 super().__init__(**kwargs)
30
31 def __repr__(self):
32 s = f"<{self.__class__.__module__}.{self.__class__.__name__}"
33 if self.host:
34 host_info = f",host={self.host},port={self.port}"
35 s += host_info
36 return s + ")>"
37
38 async def connect_to(self, address):
39 self.host, self.port = address
40 await super().connect()
41 if self.connection_pool.check_connection:
42 await self.send_command("PING")
43 if str_if_bytes(await self.read_response()) != "PONG":
44 raise ConnectionError("PING failed")
45
46 async def _connect_retry(self):
47 if self._reader:
48 return # already connected
49 if self.connection_pool.is_master:
50 await self.connect_to(await self.connection_pool.get_master_address())
51 else:
52 async for slave in self.connection_pool.rotate_slaves():
53 try:
54 return await self.connect_to(slave)
55 except ConnectionError:
56 continue
57 raise SlaveNotFoundError # Never be here
58
59 async def connect(self):
60 return await self.retry.call_with_retry(
61 self._connect_retry,
62 lambda error: asyncio.sleep(0),
63 )
64
65 async def read_response(
66 self,
67 disable_decoding: bool = False,
68 timeout: Optional[float] = None,
69 *,
70 disconnect_on_error: Optional[float] = True,
71 push_request: Optional[bool] = False,
72 ):
73 try:
74 return await super().read_response(
75 disable_decoding=disable_decoding,
76 timeout=timeout,
77 disconnect_on_error=disconnect_on_error,
78 push_request=push_request,
79 )
80 except ReadOnlyError:
81 if self.connection_pool.is_master:
82 # When talking to a master, a ReadOnlyError when likely
83 # indicates that the previous master that we're still connected
84 # to has been demoted to a slave and there's a new master.
85 # calling disconnect will force the connection to re-query
86 # sentinel during the next connect() attempt.
87 await self.disconnect()
88 raise ConnectionError("The previous master is now a slave")
89 raise
90
91
92class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
93 pass
94
95
96class SentinelConnectionPool(ConnectionPool):
97 """
98 Sentinel backed connection pool.
99
100 If ``check_connection`` flag is set to True, SentinelManagedConnection
101 sends a PING command right after establishing the connection.
102 """
103
104 def __init__(self, service_name, sentinel_manager, **kwargs):
105 kwargs["connection_class"] = kwargs.get(
106 "connection_class",
107 (
108 SentinelManagedSSLConnection
109 if kwargs.pop("ssl", False)
110 else SentinelManagedConnection
111 ),
112 )
113 self.is_master = kwargs.pop("is_master", True)
114 self.check_connection = kwargs.pop("check_connection", False)
115 super().__init__(**kwargs)
116 self.connection_kwargs["connection_pool"] = weakref.proxy(self)
117 self.service_name = service_name
118 self.sentinel_manager = sentinel_manager
119 self.master_address = None
120 self.slave_rr_counter = None
121
122 def __repr__(self):
123 return (
124 f"<{self.__class__.__module__}.{self.__class__.__name__}"
125 f"(service={self.service_name}({self.is_master and 'master' or 'slave'}))>"
126 )
127
128 def reset(self):
129 super().reset()
130 self.master_address = None
131 self.slave_rr_counter = None
132
133 def owns_connection(self, connection: Connection):
134 check = not self.is_master or (
135 self.is_master and self.master_address == (connection.host, connection.port)
136 )
137 return check and super().owns_connection(connection)
138
139 async def get_master_address(self):
140 master_address = await self.sentinel_manager.discover_master(self.service_name)
141 if self.is_master:
142 if self.master_address != master_address:
143 self.master_address = master_address
144 # disconnect any idle connections so that they reconnect
145 # to the new master the next time that they are used.
146 await self.disconnect(inuse_connections=False)
147 return master_address
148
149 async def rotate_slaves(self) -> AsyncIterator:
150 """Round-robin slave balancer"""
151 slaves = await self.sentinel_manager.discover_slaves(self.service_name)
152 if slaves:
153 if self.slave_rr_counter is None:
154 self.slave_rr_counter = random.randint(0, len(slaves) - 1)
155 for _ in range(len(slaves)):
156 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
157 slave = slaves[self.slave_rr_counter]
158 yield slave
159 # Fallback to the master connection
160 try:
161 yield await self.get_master_address()
162 except MasterNotFoundError:
163 pass
164 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
165
166
167class Sentinel(AsyncSentinelCommands):
168 """
169 Redis Sentinel cluster client
170
171 >>> from redis.sentinel import Sentinel
172 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
173 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
174 >>> await master.set('foo', 'bar')
175 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
176 >>> await slave.get('foo')
177 b'bar'
178
179 ``sentinels`` is a list of sentinel nodes. Each node is represented by
180 a pair (hostname, port).
181
182 ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
183 When querying a sentinel, if it doesn't meet this threshold, responses
184 from that sentinel won't be considered valid.
185
186 ``sentinel_kwargs`` is a dictionary of connection arguments used when
187 connecting to sentinel instances. Any argument that can be passed to
188 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
189 not specified, any socket_timeout and socket_keepalive options specified
190 in ``connection_kwargs`` will be used.
191
192 ``connection_kwargs`` are keyword arguments that will be used when
193 establishing a connection to a Redis server.
194 """
195
196 def __init__(
197 self,
198 sentinels,
199 min_other_sentinels=0,
200 sentinel_kwargs=None,
201 force_master_ip=None,
202 **connection_kwargs,
203 ):
204 # if sentinel_kwargs isn't defined, use the socket_* options from
205 # connection_kwargs
206 if sentinel_kwargs is None:
207 sentinel_kwargs = {
208 k: v for k, v in connection_kwargs.items() if k.startswith("socket_")
209 }
210 self.sentinel_kwargs = sentinel_kwargs
211
212 self.sentinels = [
213 Redis(host=hostname, port=port, **self.sentinel_kwargs)
214 for hostname, port in sentinels
215 ]
216 self.min_other_sentinels = min_other_sentinels
217 self.connection_kwargs = connection_kwargs
218 self._force_master_ip = force_master_ip
219
220 async def execute_command(self, *args, **kwargs):
221 """
222 Execute Sentinel command in sentinel nodes.
223 once - If set to True, then execute the resulting command on a single
224 node at random, rather than across the entire sentinel cluster.
225 """
226 once = bool(kwargs.pop("once", False))
227
228 # Check if command is supposed to return the original
229 # responses instead of boolean value.
230 return_responses = bool(kwargs.pop("return_responses", False))
231
232 if once:
233 response = await random.choice(self.sentinels).execute_command(
234 *args, **kwargs
235 )
236 if return_responses:
237 return [response]
238 else:
239 return True if response else False
240
241 tasks = [
242 asyncio.Task(sentinel.execute_command(*args, **kwargs))
243 for sentinel in self.sentinels
244 ]
245 responses = await asyncio.gather(*tasks)
246
247 if return_responses:
248 return responses
249
250 return all(responses)
251
252 def __repr__(self):
253 sentinel_addresses = []
254 for sentinel in self.sentinels:
255 sentinel_addresses.append(
256 f"{sentinel.connection_pool.connection_kwargs['host']}:"
257 f"{sentinel.connection_pool.connection_kwargs['port']}"
258 )
259 return (
260 f"<{self.__class__}.{self.__class__.__name__}"
261 f"(sentinels=[{','.join(sentinel_addresses)}])>"
262 )
263
264 def check_master_state(self, state: dict, service_name: str) -> bool:
265 if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
266 return False
267 # Check if our sentinel doesn't see other nodes
268 if state["num-other-sentinels"] < self.min_other_sentinels:
269 return False
270 return True
271
272 async def discover_master(self, service_name: str):
273 """
274 Asks sentinel servers for the Redis master's address corresponding
275 to the service labeled ``service_name``.
276
277 Returns a pair (address, port) or raises MasterNotFoundError if no
278 master is found.
279 """
280 collected_errors = list()
281 for sentinel_no, sentinel in enumerate(self.sentinels):
282 try:
283 masters = await sentinel.sentinel_masters()
284 except (ConnectionError, TimeoutError) as e:
285 collected_errors.append(f"{sentinel} - {e!r}")
286 continue
287 state = masters.get(service_name)
288 if state and self.check_master_state(state, service_name):
289 # Put this sentinel at the top of the list
290 self.sentinels[0], self.sentinels[sentinel_no] = (
291 sentinel,
292 self.sentinels[0],
293 )
294
295 ip = (
296 self._force_master_ip
297 if self._force_master_ip is not None
298 else state["ip"]
299 )
300 return ip, state["port"]
301
302 error_info = ""
303 if len(collected_errors) > 0:
304 error_info = f" : {', '.join(collected_errors)}"
305 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}")
306
307 def filter_slaves(
308 self, slaves: Iterable[Mapping]
309 ) -> Sequence[Tuple[EncodableT, EncodableT]]:
310 """Remove slaves that are in an ODOWN or SDOWN state"""
311 slaves_alive = []
312 for slave in slaves:
313 if slave["is_odown"] or slave["is_sdown"]:
314 continue
315 slaves_alive.append((slave["ip"], slave["port"]))
316 return slaves_alive
317
318 async def discover_slaves(
319 self, service_name: str
320 ) -> Sequence[Tuple[EncodableT, EncodableT]]:
321 """Returns a list of alive slaves for service ``service_name``"""
322 for sentinel in self.sentinels:
323 try:
324 slaves = await sentinel.sentinel_slaves(service_name)
325 except (ConnectionError, ResponseError, TimeoutError):
326 continue
327 slaves = self.filter_slaves(slaves)
328 if slaves:
329 return slaves
330 return []
331
332 def master_for(
333 self,
334 service_name: str,
335 redis_class: Type[Redis] = Redis,
336 connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
337 **kwargs,
338 ):
339 """
340 Returns a redis client instance for the ``service_name`` master.
341 Sentinel client will detect failover and reconnect Redis clients
342 automatically.
343
344 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is
345 used to retrieve the master's address before establishing a new
346 connection.
347
348 NOTE: If the master's address has changed, any cached connections to
349 the old master are closed.
350
351 By default clients will be a :py:class:`~redis.Redis` instance.
352 Specify a different class to the ``redis_class`` argument if you
353 desire something different.
354
355 The ``connection_pool_class`` specifies the connection pool to
356 use. The :py:class:`~redis.sentinel.SentinelConnectionPool`
357 will be used by default.
358
359 All other keyword arguments are merged with any connection_kwargs
360 passed to this class and passed to the connection pool as keyword
361 arguments to be used to initialize Redis connections.
362 """
363 kwargs["is_master"] = True
364 connection_kwargs = dict(self.connection_kwargs)
365 connection_kwargs.update(kwargs)
366
367 connection_pool = connection_pool_class(service_name, self, **connection_kwargs)
368 # The Redis object "owns" the pool
369 return redis_class.from_pool(connection_pool)
370
371 def slave_for(
372 self,
373 service_name: str,
374 redis_class: Type[Redis] = Redis,
375 connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
376 **kwargs,
377 ):
378 """
379 Returns redis client instance for the ``service_name`` slave(s).
380
381 A SentinelConnectionPool class is used to retrieve the slave's
382 address before establishing a new connection.
383
384 By default clients will be a :py:class:`~redis.Redis` instance.
385 Specify a different class to the ``redis_class`` argument if you
386 desire something different.
387
388 The ``connection_pool_class`` specifies the connection pool to use.
389 The SentinelConnectionPool will be used by default.
390
391 All other keyword arguments are merged with any connection_kwargs
392 passed to this class and passed to the connection pool as keyword
393 arguments to be used to initialize Redis connections.
394 """
395 kwargs["is_master"] = False
396 connection_kwargs = dict(self.connection_kwargs)
397 connection_kwargs.update(kwargs)
398
399 connection_pool = connection_pool_class(service_name, self, **connection_kwargs)
400 # The Redis object "owns" the pool
401 return redis_class.from_pool(connection_pool)