Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/limits/aio/storage/redis.py: 14%
143 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1import time
2import urllib
3from typing import TYPE_CHECKING, cast
5from deprecated.sphinx import versionadded
6from packaging.version import Version
8from limits.aio.storage.base import MovingWindowSupport, Storage
9from limits.errors import ConfigurationError
10from limits.typing import AsyncRedisClient, Dict, Optional, Tuple, Union
11from limits.util import get_package_data
13if TYPE_CHECKING:
14 import coredis
15 import coredis.commands
18class RedisInteractor:
19 RES_DIR = "resources/redis/lua_scripts"
21 SCRIPT_MOVING_WINDOW = get_package_data(f"{RES_DIR}/moving_window.lua")
22 SCRIPT_ACQUIRE_MOVING_WINDOW = get_package_data(
23 f"{RES_DIR}/acquire_moving_window.lua"
24 )
25 SCRIPT_CLEAR_KEYS = get_package_data(f"{RES_DIR}/clear_keys.lua")
26 SCRIPT_INCR_EXPIRE = get_package_data(f"{RES_DIR}/incr_expire.lua")
28 lua_moving_window: "coredis.commands.Script[bytes]"
29 lua_acquire_window: "coredis.commands.Script[bytes]"
30 lua_clear_keys: "coredis.commands.Script[bytes]"
31 lua_incr_expire: "coredis.commands.Script[bytes]"
33 async def _incr(
34 self,
35 key: str,
36 expiry: int,
37 connection: AsyncRedisClient,
38 elastic_expiry: bool = False,
39 amount: int = 1,
40 ) -> int:
41 """
42 increments the counter for a given rate limit key
44 :param connection: Redis connection
45 :param key: the key to increment
46 :param expiry: amount in seconds for the key to expire in
47 :param amount: the number to increment by
48 """
49 value = await connection.incrby(key, amount)
51 if elastic_expiry or value == amount:
52 await connection.expire(key, expiry)
54 return value
56 async def _get(self, key: str, connection: AsyncRedisClient) -> int:
57 """
58 :param connection: Redis connection
59 :param key: the key to get the counter value for
60 """
62 return int(await connection.get(key) or 0)
64 async def _clear(self, key: str, connection: AsyncRedisClient) -> None:
65 """
66 :param key: the key to clear rate limits for
67 :param connection: Redis connection
68 """
69 await connection.delete([key])
71 async def get_moving_window(
72 self, key: str, limit: int, expiry: int
73 ) -> Tuple[int, int]:
74 """
75 returns the starting point and the number of entries in the moving
76 window
78 :param key: rate limit key
79 :param expiry: expiry of entry
80 :return: (start of window, number of acquired entries)
81 """
82 timestamp = int(time.time())
83 window = await self.lua_moving_window.execute(
84 [key], [int(timestamp - expiry), limit]
85 )
86 if window:
87 return tuple(window) # type: ignore
88 return timestamp, 0
90 async def _acquire_entry(
91 self,
92 key: str,
93 limit: int,
94 expiry: int,
95 connection: AsyncRedisClient,
96 amount: int = 1,
97 ) -> bool:
98 """
99 :param key: rate limit key to acquire an entry in
100 :param limit: amount of entries allowed
101 :param expiry: expiry of the entry
102 :param connection: Redis connection
103 """
104 timestamp = time.time()
105 acquired = await self.lua_acquire_window.execute(
106 [key], [timestamp, limit, expiry, amount]
107 )
109 return bool(acquired)
111 async def _get_expiry(self, key: str, connection: AsyncRedisClient) -> int:
112 """
113 :param key: the key to get the expiry for
114 :param connection: Redis connection
115 """
117 return int(max(await connection.ttl(key), 0) + time.time())
119 async def _check(self, connection: AsyncRedisClient) -> bool:
120 """
121 check if storage is healthy
123 :param connection: Redis connection
124 """
125 try:
126 await connection.ping()
128 return True
129 except: # noqa
130 return False
133@versionadded(version="2.1")
134class RedisStorage(RedisInteractor, Storage, MovingWindowSupport):
135 """
136 Rate limit storage with redis as backend.
138 Depends on :pypi:`coredis`
139 """
141 STORAGE_SCHEME = ["async+redis", "async+rediss", "async+redis+unix"]
142 """
143 The storage schemes for redis to be used in an async context
144 """
145 DEPENDENCIES = {"coredis": Version("3.4.0")}
147 def __init__(
148 self,
149 uri: str,
150 connection_pool: Optional["coredis.ConnectionPool"] = None,
151 **options: Union[float, str, bool],
152 ) -> None:
153 """
154 :param uri: uri of the form:
156 - ``async+redis://[:password]@host:port``
157 - ``async+redis://[:password]@host:port/db``
158 - ``async+rediss://[:password]@host:port``
159 - ``async+unix:///path/to/sock`` etc...
161 This uri is passed directly to :meth:`coredis.Redis.from_url` with
162 the initial ``async`` removed, except for the case of ``async+redis+unix``
163 where it is replaced with ``unix``.
164 :param connection_pool: if provided, the redis client is initialized with
165 the connection pool and any other params passed as :paramref:`options`
166 :param options: all remaining keyword arguments are passed
167 directly to the constructor of :class:`coredis.Redis`
168 :raise ConfigurationError: when the redis library is not available
169 """
170 uri = uri.replace("async+redis", "redis", 1)
171 uri = uri.replace("redis+unix", "unix")
173 super().__init__(uri, **options)
175 self.dependency = self.dependencies["coredis"].module
177 if connection_pool:
178 self.storage = self.dependency.Redis(
179 connection_pool=connection_pool, **options
180 )
181 else:
182 self.storage = self.dependency.Redis.from_url(uri, **options)
184 self.initialize_storage(uri)
186 def initialize_storage(self, _uri: str) -> None:
187 # all these methods are coroutines, so must be called with await
188 self.lua_moving_window = self.storage.register_script(self.SCRIPT_MOVING_WINDOW)
189 self.lua_acquire_window = self.storage.register_script(
190 self.SCRIPT_ACQUIRE_MOVING_WINDOW
191 )
192 self.lua_clear_keys = self.storage.register_script(self.SCRIPT_CLEAR_KEYS)
193 self.lua_incr_expire = self.storage.register_script(
194 RedisStorage.SCRIPT_INCR_EXPIRE
195 )
197 async def incr(
198 self, key: str, expiry: int, elastic_expiry: bool = False, amount: int = 1
199 ) -> int:
200 """
201 increments the counter for a given rate limit key
203 :param key: the key to increment
204 :param expiry: amount in seconds for the key to expire in
205 :param amount: the number to increment by
206 """
208 if elastic_expiry:
209 return await super()._incr(
210 key, expiry, self.storage, elastic_expiry, amount
211 )
212 else:
213 return cast(
214 int, await self.lua_incr_expire.execute([key], [expiry, amount])
215 )
217 async def get(self, key: str) -> int:
218 """
219 :param key: the key to get the counter value for
220 """
222 return await super()._get(key, self.storage)
224 async def clear(self, key: str) -> None:
225 """
226 :param key: the key to clear rate limits for
227 """
229 return await super()._clear(key, self.storage)
231 async def acquire_entry(
232 self, key: str, limit: int, expiry: int, amount: int = 1
233 ) -> bool:
234 """
235 :param key: rate limit key to acquire an entry in
236 :param limit: amount of entries allowed
237 :param expiry: expiry of the entry
238 :param amount: the number of entries to acquire
239 """
241 return await super()._acquire_entry(key, limit, expiry, self.storage, amount)
243 async def get_expiry(self, key: str) -> int:
244 """
245 :param key: the key to get the expiry for
246 """
248 return await super()._get_expiry(key, self.storage)
250 async def check(self) -> bool:
251 """
252 Check if storage is healthy by calling :meth:`coredis.Redis.ping`
253 """
255 return await super()._check(self.storage)
257 async def reset(self) -> Optional[int]:
258 """
259 This function calls a Lua Script to delete keys prefixed with 'LIMITER'
260 in block of 5000.
262 .. warning:: This operation was designed to be fast, but was not tested
263 on a large production based system. Be careful with its usage as it
264 could be slow on very large data sets.
265 """
267 return cast(int, await self.lua_clear_keys.execute(["LIMITER*"]))
270@versionadded(version="2.1")
271class RedisClusterStorage(RedisStorage):
272 """
273 Rate limit storage with redis cluster as backend
275 Depends on :pypi:`coredis`
276 """
278 STORAGE_SCHEME = ["async+redis+cluster"]
279 """
280 The storage schemes for redis cluster to be used in an async context
281 """
283 DEFAULT_OPTIONS: Dict[str, Union[float, str, bool]] = {
284 "max_connections": 1000,
285 }
286 "Default options passed to :class:`coredis.RedisCluster`"
288 def __init__(self, uri: str, **options: Union[float, str, bool]) -> None:
289 """
290 :param uri: url of the form
291 ``async+redis+cluster://[:password]@host:port,host:port``
292 :param options: all remaining keyword arguments are passed
293 directly to the constructor of :class:`coredis.RedisCluster`
294 :raise ConfigurationError: when the coredis library is not
295 available or if the redis host cannot be pinged.
296 """
297 parsed = urllib.parse.urlparse(uri)
298 parsed_auth: Dict[str, Union[float, str, bool]] = {}
300 if parsed.username:
301 parsed_auth["username"] = parsed.username
302 if parsed.password:
303 parsed_auth["password"] = parsed.password
305 sep = parsed.netloc.find("@") + 1
306 cluster_hosts = []
308 for loc in parsed.netloc[sep:].split(","):
309 host, port = loc.split(":")
310 cluster_hosts.append({"host": host, "port": int(port)})
312 super(RedisStorage, self).__init__(uri, **options)
314 self.dependency = self.dependencies["coredis"].module
316 self.storage: "coredis.RedisCluster[str]" = self.dependency.RedisCluster(
317 startup_nodes=cluster_hosts,
318 **{**self.DEFAULT_OPTIONS, **parsed_auth, **options},
319 )
320 self.initialize_storage(uri)
322 async def reset(self) -> Optional[int]:
323 """
324 Redis Clusters are sharded and deleting across shards
325 can't be done atomically. Because of this, this reset loops over all
326 keys that are prefixed with 'LIMITER' and calls delete on them, one at
327 a time.
329 .. warning:: This operation was not tested with extremely large data sets.
330 On a large production based system, care should be taken with its
331 usage as it could be slow on very large data sets
332 """
334 keys = await self.storage.keys("LIMITER*")
335 count = 0
336 for key in keys:
337 count += await self.storage.delete([key])
338 return count
341@versionadded(version="2.1")
342class RedisSentinelStorage(RedisStorage):
343 """
344 Rate limit storage with redis sentinel as backend
346 Depends on :pypi:`coredis`
347 """
349 STORAGE_SCHEME = ["async+redis+sentinel"]
350 """The storage scheme for redis accessed via a redis sentinel installation"""
352 DEPENDENCIES = {"coredis.sentinel": Version("3.4.0")}
354 def __init__(
355 self,
356 uri: str,
357 service_name: Optional[str] = None,
358 use_replicas: bool = True,
359 sentinel_kwargs: Optional[Dict[str, Union[float, str, bool]]] = None,
360 **options: Union[float, str, bool],
361 ):
362 """
363 :param uri: url of the form
364 ``async+redis+sentinel://host:port,host:port/service_name``
365 :param service_name, optional: sentinel service name
366 (if not provided in `uri`)
367 :param use_replicas: Whether to use replicas for read only operations
368 :param sentinel_kwargs, optional: kwargs to pass as
369 ``sentinel_kwargs`` to :class:`coredis.sentinel.Sentinel`
370 :param options: all remaining keyword arguments are passed
371 directly to the constructor of :class:`coredis.sentinel.Sentinel`
372 :raise ConfigurationError: when the coredis library is not available
373 or if the redis primary host cannot be pinged.
374 """
376 parsed = urllib.parse.urlparse(uri)
377 sentinel_configuration = []
378 connection_options = options.copy()
379 sentinel_options = sentinel_kwargs.copy() if sentinel_kwargs else {}
380 parsed_auth: Dict[str, Union[float, str, bool]] = {}
382 if parsed.username:
383 parsed_auth["username"] = parsed.username
385 if parsed.password:
386 parsed_auth["password"] = parsed.password
388 sep = parsed.netloc.find("@") + 1
390 for loc in parsed.netloc[sep:].split(","):
391 host, port = loc.split(":")
392 sentinel_configuration.append((host, int(port)))
393 self.service_name = (
394 parsed.path.replace("/", "") if parsed.path else service_name
395 )
397 if self.service_name is None:
398 raise ConfigurationError("'service_name' not provided")
400 super(RedisStorage, self).__init__()
402 self.dependency = self.dependencies["coredis.sentinel"].module
404 self.sentinel = self.dependency.Sentinel(
405 sentinel_configuration,
406 sentinel_kwargs={**parsed_auth, **sentinel_options},
407 **{**parsed_auth, **connection_options},
408 )
409 self.storage = self.sentinel.primary_for(self.service_name)
410 self.storage_replica = self.sentinel.replica_for(self.service_name)
411 self.use_replicas = use_replicas
412 self.initialize_storage(uri)
414 async def get(self, key: str) -> int:
415 """
416 :param key: the key to get the counter value for
417 """
419 return await super()._get(
420 key, self.storage_replica if self.use_replicas else self.storage
421 )
423 async def get_expiry(self, key: str) -> int:
424 """
425 :param key: the key to get the expiry for
426 """
428 return await super()._get_expiry(
429 key, self.storage_replica if self.use_replicas else self.storage
430 )
432 async def check(self) -> bool:
433 """
434 Check if storage is healthy by calling :meth:`coredis.Redis.ping`
435 on the replica.
436 """
438 return await super()._check(
439 self.storage_replica if self.use_replicas else self.storage
440 )