Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/asyncio/sentinel.py: 23%
169 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
1import asyncio
2import random
3import weakref
4from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type
6from redis.asyncio.client import Redis
7from redis.asyncio.connection import (
8 Connection,
9 ConnectionPool,
10 EncodableT,
11 SSLConnection,
12)
13from redis.commands import AsyncSentinelCommands
14from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
15from redis.utils import str_if_bytes
18class MasterNotFoundError(ConnectionError):
19 pass
22class SlaveNotFoundError(ConnectionError):
23 pass
26class SentinelManagedConnection(Connection):
27 def __init__(self, **kwargs):
28 self.connection_pool = kwargs.pop("connection_pool")
29 super().__init__(**kwargs)
31 def __repr__(self):
32 pool = self.connection_pool
33 s = (
34 f"<{self.__class__.__module__}.{self.__class__.__name__}"
35 f"(service={pool.service_name}"
36 )
37 if self.host:
38 host_info = f",host={self.host},port={self.port}"
39 s += host_info
40 return s + ")>"
42 async def connect_to(self, address):
43 self.host, self.port = address
44 await super().connect()
45 if self.connection_pool.check_connection:
46 await self.send_command("PING")
47 if str_if_bytes(await self.read_response()) != "PONG":
48 raise ConnectionError("PING failed")
50 async def _connect_retry(self):
51 if self._reader:
52 return # already connected
53 if self.connection_pool.is_master:
54 await self.connect_to(await self.connection_pool.get_master_address())
55 else:
56 async for slave in self.connection_pool.rotate_slaves():
57 try:
58 return await self.connect_to(slave)
59 except ConnectionError:
60 continue
61 raise SlaveNotFoundError # Never be here
63 async def connect(self):
64 return await self.retry.call_with_retry(
65 self._connect_retry,
66 lambda error: asyncio.sleep(0),
67 )
69 async def read_response(
70 self,
71 disable_decoding: bool = False,
72 timeout: Optional[float] = None,
73 *,
74 disconnect_on_error: Optional[float] = True,
75 push_request: Optional[bool] = False,
76 ):
77 try:
78 return await super().read_response(
79 disable_decoding=disable_decoding,
80 timeout=timeout,
81 disconnect_on_error=disconnect_on_error,
82 push_request=push_request,
83 )
84 except ReadOnlyError:
85 if self.connection_pool.is_master:
86 # When talking to a master, a ReadOnlyError when likely
87 # indicates that the previous master that we're still connected
88 # to has been demoted to a slave and there's a new master.
89 # calling disconnect will force the connection to re-query
90 # sentinel during the next connect() attempt.
91 await self.disconnect()
92 raise ConnectionError("The previous master is now a slave")
93 raise
96class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
97 pass
100class SentinelConnectionPool(ConnectionPool):
101 """
102 Sentinel backed connection pool.
104 If ``check_connection`` flag is set to True, SentinelManagedConnection
105 sends a PING command right after establishing the connection.
106 """
108 def __init__(self, service_name, sentinel_manager, **kwargs):
109 kwargs["connection_class"] = kwargs.get(
110 "connection_class",
111 (
112 SentinelManagedSSLConnection
113 if kwargs.pop("ssl", False)
114 else SentinelManagedConnection
115 ),
116 )
117 self.is_master = kwargs.pop("is_master", True)
118 self.check_connection = kwargs.pop("check_connection", False)
119 super().__init__(**kwargs)
120 self.connection_kwargs["connection_pool"] = weakref.proxy(self)
121 self.service_name = service_name
122 self.sentinel_manager = sentinel_manager
123 self.master_address = None
124 self.slave_rr_counter = None
126 def __repr__(self):
127 return (
128 f"<{self.__class__.__module__}.{self.__class__.__name__}"
129 f"(service={self.service_name}({self.is_master and 'master' or 'slave'}))>"
130 )
132 def reset(self):
133 super().reset()
134 self.master_address = None
135 self.slave_rr_counter = None
137 def owns_connection(self, connection: Connection):
138 check = not self.is_master or (
139 self.is_master and self.master_address == (connection.host, connection.port)
140 )
141 return check and super().owns_connection(connection)
143 async def get_master_address(self):
144 master_address = await self.sentinel_manager.discover_master(self.service_name)
145 if self.is_master:
146 if self.master_address != master_address:
147 self.master_address = master_address
148 # disconnect any idle connections so that they reconnect
149 # to the new master the next time that they are used.
150 await self.disconnect(inuse_connections=False)
151 return master_address
153 async def rotate_slaves(self) -> AsyncIterator:
154 """Round-robin slave balancer"""
155 slaves = await self.sentinel_manager.discover_slaves(self.service_name)
156 if slaves:
157 if self.slave_rr_counter is None:
158 self.slave_rr_counter = random.randint(0, len(slaves) - 1)
159 for _ in range(len(slaves)):
160 self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
161 slave = slaves[self.slave_rr_counter]
162 yield slave
163 # Fallback to the master connection
164 try:
165 yield await self.get_master_address()
166 except MasterNotFoundError:
167 pass
168 raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
171class Sentinel(AsyncSentinelCommands):
172 """
173 Redis Sentinel cluster client
175 >>> from redis.sentinel import Sentinel
176 >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
177 >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
178 >>> await master.set('foo', 'bar')
179 >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
180 >>> await slave.get('foo')
181 b'bar'
183 ``sentinels`` is a list of sentinel nodes. Each node is represented by
184 a pair (hostname, port).
186 ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
187 When querying a sentinel, if it doesn't meet this threshold, responses
188 from that sentinel won't be considered valid.
190 ``sentinel_kwargs`` is a dictionary of connection arguments used when
191 connecting to sentinel instances. Any argument that can be passed to
192 a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
193 not specified, any socket_timeout and socket_keepalive options specified
194 in ``connection_kwargs`` will be used.
196 ``connection_kwargs`` are keyword arguments that will be used when
197 establishing a connection to a Redis server.
198 """
200 def __init__(
201 self,
202 sentinels,
203 min_other_sentinels=0,
204 sentinel_kwargs=None,
205 **connection_kwargs,
206 ):
207 # if sentinel_kwargs isn't defined, use the socket_* options from
208 # connection_kwargs
209 if sentinel_kwargs is None:
210 sentinel_kwargs = {
211 k: v for k, v in connection_kwargs.items() if k.startswith("socket_")
212 }
213 self.sentinel_kwargs = sentinel_kwargs
215 self.sentinels = [
216 Redis(host=hostname, port=port, **self.sentinel_kwargs)
217 for hostname, port in sentinels
218 ]
219 self.min_other_sentinels = min_other_sentinels
220 self.connection_kwargs = connection_kwargs
222 async def execute_command(self, *args, **kwargs):
223 """
224 Execute Sentinel command in sentinel nodes.
225 once - If set to True, then execute the resulting command on a single
226 node at random, rather than across the entire sentinel cluster.
227 """
228 kwargs.pop("keys", None) # the keys are used only for client side caching
229 once = bool(kwargs.get("once", False))
230 if "once" in kwargs.keys():
231 kwargs.pop("once")
233 if once:
234 await random.choice(self.sentinels).execute_command(*args, **kwargs)
235 else:
236 tasks = [
237 asyncio.Task(sentinel.execute_command(*args, **kwargs))
238 for sentinel in self.sentinels
239 ]
240 await asyncio.gather(*tasks)
241 return True
243 def __repr__(self):
244 sentinel_addresses = []
245 for sentinel in self.sentinels:
246 sentinel_addresses.append(
247 f"{sentinel.connection_pool.connection_kwargs['host']}:"
248 f"{sentinel.connection_pool.connection_kwargs['port']}"
249 )
250 return (
251 f"<{self.__class__}.{self.__class__.__name__}"
252 f"(sentinels=[{','.join(sentinel_addresses)}])>"
253 )
255 def check_master_state(self, state: dict, service_name: str) -> bool:
256 if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
257 return False
258 # Check if our sentinel doesn't see other nodes
259 if state["num-other-sentinels"] < self.min_other_sentinels:
260 return False
261 return True
263 async def discover_master(self, service_name: str):
264 """
265 Asks sentinel servers for the Redis master's address corresponding
266 to the service labeled ``service_name``.
268 Returns a pair (address, port) or raises MasterNotFoundError if no
269 master is found.
270 """
271 collected_errors = list()
272 for sentinel_no, sentinel in enumerate(self.sentinels):
273 try:
274 masters = await sentinel.sentinel_masters()
275 except (ConnectionError, TimeoutError) as e:
276 collected_errors.append(f"{sentinel} - {e!r}")
277 continue
278 state = masters.get(service_name)
279 if state and self.check_master_state(state, service_name):
280 # Put this sentinel at the top of the list
281 self.sentinels[0], self.sentinels[sentinel_no] = (
282 sentinel,
283 self.sentinels[0],
284 )
285 return state["ip"], state["port"]
287 error_info = ""
288 if len(collected_errors) > 0:
289 error_info = f" : {', '.join(collected_errors)}"
290 raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}")
292 def filter_slaves(
293 self, slaves: Iterable[Mapping]
294 ) -> Sequence[Tuple[EncodableT, EncodableT]]:
295 """Remove slaves that are in an ODOWN or SDOWN state"""
296 slaves_alive = []
297 for slave in slaves:
298 if slave["is_odown"] or slave["is_sdown"]:
299 continue
300 slaves_alive.append((slave["ip"], slave["port"]))
301 return slaves_alive
303 async def discover_slaves(
304 self, service_name: str
305 ) -> Sequence[Tuple[EncodableT, EncodableT]]:
306 """Returns a list of alive slaves for service ``service_name``"""
307 for sentinel in self.sentinels:
308 try:
309 slaves = await sentinel.sentinel_slaves(service_name)
310 except (ConnectionError, ResponseError, TimeoutError):
311 continue
312 slaves = self.filter_slaves(slaves)
313 if slaves:
314 return slaves
315 return []
317 def master_for(
318 self,
319 service_name: str,
320 redis_class: Type[Redis] = Redis,
321 connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
322 **kwargs,
323 ):
324 """
325 Returns a redis client instance for the ``service_name`` master.
327 A :py:class:`~redis.sentinel.SentinelConnectionPool` class is
328 used to retrieve the master's address before establishing a new
329 connection.
331 NOTE: If the master's address has changed, any cached connections to
332 the old master are closed.
334 By default clients will be a :py:class:`~redis.Redis` instance.
335 Specify a different class to the ``redis_class`` argument if you
336 desire something different.
338 The ``connection_pool_class`` specifies the connection pool to
339 use. The :py:class:`~redis.sentinel.SentinelConnectionPool`
340 will be used by default.
342 All other keyword arguments are merged with any connection_kwargs
343 passed to this class and passed to the connection pool as keyword
344 arguments to be used to initialize Redis connections.
345 """
346 kwargs["is_master"] = True
347 connection_kwargs = dict(self.connection_kwargs)
348 connection_kwargs.update(kwargs)
350 connection_pool = connection_pool_class(service_name, self, **connection_kwargs)
351 # The Redis object "owns" the pool
352 return redis_class.from_pool(connection_pool)
354 def slave_for(
355 self,
356 service_name: str,
357 redis_class: Type[Redis] = Redis,
358 connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
359 **kwargs,
360 ):
361 """
362 Returns redis client instance for the ``service_name`` slave(s).
364 A SentinelConnectionPool class is used to retrieve the slave's
365 address before establishing a new connection.
367 By default clients will be a :py:class:`~redis.Redis` instance.
368 Specify a different class to the ``redis_class`` argument if you
369 desire something different.
371 The ``connection_pool_class`` specifies the connection pool to use.
372 The SentinelConnectionPool will be used by default.
374 All other keyword arguments are merged with any connection_kwargs
375 passed to this class and passed to the connection pool as keyword
376 arguments to be used to initialize Redis connections.
377 """
378 kwargs["is_master"] = False
379 connection_kwargs = dict(self.connection_kwargs)
380 connection_kwargs.update(kwargs)
382 connection_pool = connection_pool_class(service_name, self, **connection_kwargs)
383 # The Redis object "owns" the pool
384 return redis_class.from_pool(connection_pool)