1import asyncio
2import random
3import weakref
4from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type
5
6from redis.asyncio.client import Redis
7from redis.asyncio.connection import (
8 Connection,
9 ConnectionPool,
10 EncodableT,
11 SSLConnection,
12)
13from redis.commands import AsyncSentinelCommands
14from redis.exceptions import (
15 ConnectionError,
16 ReadOnlyError,
17 ResponseError,
18 TimeoutError,
19)
20
21
22class MasterNotFoundError(ConnectionError):
23 pass
24
25
26class SlaveNotFoundError(ConnectionError):
27 pass
28
29
30class SentinelManagedConnection(Connection):
31 def __init__(self, **kwargs):
32 self.connection_pool = kwargs.pop("connection_pool")
33 super().__init__(**kwargs)
34
35 def __repr__(self):
36 s = f"<{self.__class__.__module__}.{self.__class__.__name__}"
37 if self.host:
38 host_info = f",host={self.host},port={self.port}"
39 s += host_info
40 return s + ")>"
41
42 async def connect_to(self, address):
43 self.host, self.port = address
44 await self.connect_check_health(
45 check_health=self.connection_pool.check_connection,
46 retry_socket_connect=False,
47 )
48
49 async def _connect_retry(self):
50 if self._reader:
51 return # already connected
52 if self.connection_pool.is_master:
53 await self.connect_to(await self.connection_pool.get_master_address())
54 else:
55 async for slave in self.connection_pool.rotate_slaves():
56 try:
57 return await self.connect_to(slave)
58 except ConnectionError:
59 continue
60 raise SlaveNotFoundError # Never be here
61
62 async def connect(self):
63 return await self.retry.call_with_retry(
64 self._connect_retry,
65 lambda error: asyncio.sleep(0),
66 )
67
68 async def read_response(
69 self,
70 disable_decoding: bool = False,
71 timeout: Optional[float] = None,
72 *,
73 disconnect_on_error: Optional[float] = True,
74 push_request: Optional[bool] = False,
75 ):
76 try:
77 return await super().read_response(
78 disable_decoding=disable_decoding,
79 timeout=timeout,
80 disconnect_on_error=disconnect_on_error,
81 push_request=push_request,
82 )
83 except ReadOnlyError:
84 if self.connection_pool.is_master:
85 # When talking to a master, a ReadOnlyError when likely
86 # indicates that the previous master that we're still connected
87 # to has been demoted to a slave and there's a new master.
88 # calling disconnect will force the connection to re-query
89 # sentinel during the next connect() attempt.
90 await self.disconnect()
91 raise ConnectionError("The previous master is now a slave")
92 raise
93
94
95class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
96 pass
97
98
99class SentinelConnectionPool(ConnectionPool):
100 """
101 Sentinel backed connection pool.
102
103 If ``check_connection`` flag is set to True, SentinelManagedConnection
104 sends a PING command right after establishing the connection.
105 """
106
107 def __init__(self, service_name, sentinel_manager, **kwargs):
108 kwargs["connection_class"] = kwargs.get(
109 "connection_class",
110 (
111 SentinelManagedSSLConnection
112 if kwargs.pop("ssl", False)
113 else SentinelManagedConnection
114 ),
115 )
116 self.is_master = kwargs.pop("is_master", True)
117 self.check_connection = kwargs.pop("check_connection", False)
118 super().__init__(**kwargs)
119 self.connection_kwargs["connection_pool"] = weakref.proxy(self)
120 self.service_name = service_name
121 self.sentinel_manager = sentinel_manager
122 self.master_address = None
123 self.slave_rr_counter = None
124
125 def __repr__(self):
126 return (
127 f"<{self.__class__.__module__}.{self.__class__.__name__}"
128 f"(service={self.service_name}({self.is_master and 'master' or 'slave'}))>"
129 )
130
131 def reset(self):
132 super().reset()
133 self.master_address = None
134 self.slave_rr_counter = None
135
136 def owns_connection(self, connection: Connection):
137 check = not self.is_master or (
138 self.is_master and self.master_address == (connection.host, connection.port)
139 )
140 return check and super().owns_connection(connection)
141
142 async def get_master_address(self):
143 master_address = await self.sentinel_manager.discover_master(self.service_name)
144 if self.is_master:
145 if self.master_address != master_address:
146 self.master_address = master_address
147 # disconnect any idle connections so that they reconnect
148 # to the new master the next time that they are used.
149 await self.disconnect(inuse_connections=False)
150 return master_address
151
152 async def rotate_slaves(self) -> AsyncIterator:
153 """Round-robin slave balancer"""
154 slaves = await self.sentinel_manager.discover_slaves(self.service_name)
155 if slaves:
156 if self.slave_rr_counter is None:
157 self.slave_rr_counter = random.randint(0, len(slaves) - 1)
158 for _ in range(len(slaves)):
159 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
160 slave = slaves[self.slave_rr_counter]
161 yield slave
162 # Fallback to the master connection
163 try:
164 yield await self.get_master_address()
165 except MasterNotFoundError:
166 pass
167 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
168
169
170class Sentinel(AsyncSentinelCommands):
171 """
172 Redis Sentinel cluster client
173
174 >>> from redis.sentinel import Sentinel
175 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
176 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
177 >>> await master.set('foo', 'bar')
178 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
179 >>> await slave.get('foo')
180 b'bar'
181
182 ``sentinels`` is a list of sentinel nodes. Each node is represented by
183 a pair (hostname, port).
184
185 ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
186 When querying a sentinel, if it doesn't meet this threshold, responses
187 from that sentinel won't be considered valid.
188
189 ``sentinel_kwargs`` is a dictionary of connection arguments used when
190 connecting to sentinel instances. Any argument that can be passed to
191 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
192 not specified, any socket_timeout and socket_keepalive options specified
193 in ``connection_kwargs`` will be used.
194
195 ``connection_kwargs`` are keyword arguments that will be used when
196 establishing a connection to a Redis server.
197 """
198
199 def __init__(
200 self,
201 sentinels,
202 min_other_sentinels=0,
203 sentinel_kwargs=None,
204 force_master_ip=None,
205 **connection_kwargs,
206 ):
207 # if sentinel_kwargs isn't defined, use the socket_* options from
208 # connection_kwargs
209 if sentinel_kwargs is None:
210 sentinel_kwargs = {
211 k: v for k, v in connection_kwargs.items() if k.startswith("socket_")
212 }
213 self.sentinel_kwargs = sentinel_kwargs
214
215 self.sentinels = [
216 Redis(host=hostname, port=port, **self.sentinel_kwargs)
217 for hostname, port in sentinels
218 ]
219 self.min_other_sentinels = min_other_sentinels
220 self.connection_kwargs = connection_kwargs
221 self._force_master_ip = force_master_ip
222
223 async def execute_command(self, *args, **kwargs):
224 """
225 Execute Sentinel command in sentinel nodes.
226 once - If set to True, then execute the resulting command on a single
227 node at random, rather than across the entire sentinel cluster.
228 """
229 once = bool(kwargs.pop("once", False))
230
231 # Check if command is supposed to return the original
232 # responses instead of boolean value.
233 return_responses = bool(kwargs.pop("return_responses", False))
234
235 if once:
236 response = await random.choice(self.sentinels).execute_command(
237 *args, **kwargs
238 )
239 if return_responses:
240 return [response]
241 else:
242 return True if response else False
243
244 tasks = [
245 asyncio.Task(sentinel.execute_command(*args, **kwargs))
246 for sentinel in self.sentinels
247 ]
248 responses = await asyncio.gather(*tasks)
249
250 if return_responses:
251 return responses
252
253 return all(responses)
254
255 def __repr__(self):
256 sentinel_addresses = []
257 for sentinel in self.sentinels:
258 sentinel_addresses.append(
259 f"{sentinel.connection_pool.connection_kwargs['host']}:"
260 f"{sentinel.connection_pool.connection_kwargs['port']}"
261 )
262 return (
263 f"<{self.__class__}.{self.__class__.__name__}"
264 f"(sentinels=[{','.join(sentinel_addresses)}])>"
265 )
266
267 def check_master_state(self, state: dict, service_name: str) -> bool:
268 if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
269 return False
270 # Check if our sentinel doesn't see other nodes
271 if state["num-other-sentinels"] < self.min_other_sentinels:
272 return False
273 return True
274
275 async def discover_master(self, service_name: str):
276 """
277 Asks sentinel servers for the Redis master's address corresponding
278 to the service labeled ``service_name``.
279
280 Returns a pair (address, port) or raises MasterNotFoundError if no
281 master is found.
282 """
283 collected_errors = list()
284 for sentinel_no, sentinel in enumerate(self.sentinels):
285 try:
286 masters = await sentinel.sentinel_masters()
287 except (ConnectionError, TimeoutError) as e:
288 collected_errors.append(f"{sentinel} - {e!r}")
289 continue
290 state = masters.get(service_name)
291 if state and self.check_master_state(state, service_name):
292 # Put this sentinel at the top of the list
293 self.sentinels[0], self.sentinels[sentinel_no] = (
294 sentinel,
295 self.sentinels[0],
296 )
297
298 ip = (
299 self._force_master_ip
300 if self._force_master_ip is not None
301 else state["ip"]
302 )
303 return ip, state["port"]
304
305 error_info = ""
306 if len(collected_errors) > 0:
307 error_info = f" : {', '.join(collected_errors)}"
308 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}")
309
310 def filter_slaves(
311 self, slaves: Iterable[Mapping]
312 ) -> Sequence[Tuple[EncodableT, EncodableT]]:
313 """Remove slaves that are in an ODOWN or SDOWN state"""
314 slaves_alive = []
315 for slave in slaves:
316 if slave["is_odown"] or slave["is_sdown"]:
317 continue
318 slaves_alive.append((slave["ip"], slave["port"]))
319 return slaves_alive
320
321 async def discover_slaves(
322 self, service_name: str
323 ) -> Sequence[Tuple[EncodableT, EncodableT]]:
324 """Returns a list of alive slaves for service ``service_name``"""
325 for sentinel in self.sentinels:
326 try:
327 slaves = await sentinel.sentinel_slaves(service_name)
328 except (ConnectionError, ResponseError, TimeoutError):
329 continue
330 slaves = self.filter_slaves(slaves)
331 if slaves:
332 return slaves
333 return []
334
335 def master_for(
336 self,
337 service_name: str,
338 redis_class: Type[Redis] = Redis,
339 connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
340 **kwargs,
341 ):
342 """
343 Returns a redis client instance for the ``service_name`` master.
344 Sentinel client will detect failover and reconnect Redis clients
345 automatically.
346
347 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is
348 used to retrieve the master's address before establishing a new
349 connection.
350
351 NOTE: If the master's address has changed, any cached connections to
352 the old master are closed.
353
354 By default clients will be a :py:class:`~redis.Redis` instance.
355 Specify a different class to the ``redis_class`` argument if you
356 desire something different.
357
358 The ``connection_pool_class`` specifies the connection pool to
359 use. The :py:class:`~redis.sentinel.SentinelConnectionPool`
360 will be used by default.
361
362 All other keyword arguments are merged with any connection_kwargs
363 passed to this class and passed to the connection pool as keyword
364 arguments to be used to initialize Redis connections.
365 """
366 kwargs["is_master"] = True
367 connection_kwargs = dict(self.connection_kwargs)
368 connection_kwargs.update(kwargs)
369
370 connection_pool = connection_pool_class(service_name, self, **connection_kwargs)
371 # The Redis object "owns" the pool
372 return redis_class.from_pool(connection_pool)
373
374 def slave_for(
375 self,
376 service_name: str,
377 redis_class: Type[Redis] = Redis,
378 connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
379 **kwargs,
380 ):
381 """
382 Returns redis client instance for the ``service_name`` slave(s).
383
384 A SentinelConnectionPool class is used to retrieve the slave's
385 address before establishing a new connection.
386
387 By default clients will be a :py:class:`~redis.Redis` instance.
388 Specify a different class to the ``redis_class`` argument if you
389 desire something different.
390
391 The ``connection_pool_class`` specifies the connection pool to use.
392 The SentinelConnectionPool will be used by default.
393
394 All other keyword arguments are merged with any connection_kwargs
395 passed to this class and passed to the connection pool as keyword
396 arguments to be used to initialize Redis connections.
397 """
398 kwargs["is_master"] = False
399 connection_kwargs = dict(self.connection_kwargs)
400 connection_kwargs.update(kwargs)
401
402 connection_pool = connection_pool_class(service_name, self, **connection_kwargs)
403 # The Redis object "owns" the pool
404 return redis_class.from_pool(connection_pool)