Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/limits/aio/storage/redis.py: 14%

143 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1import time 

2import urllib 

3from typing import TYPE_CHECKING, cast 

4 

5from deprecated.sphinx import versionadded 

6from packaging.version import Version 

7 

8from limits.aio.storage.base import MovingWindowSupport, Storage 

9from limits.errors import ConfigurationError 

10from limits.typing import AsyncRedisClient, Dict, Optional, Tuple, Union 

11from limits.util import get_package_data 

12 

13if TYPE_CHECKING: 

14 import coredis 

15 import coredis.commands 

16 

17 

18class RedisInteractor: 

19 RES_DIR = "resources/redis/lua_scripts" 

20 

21 SCRIPT_MOVING_WINDOW = get_package_data(f"{RES_DIR}/moving_window.lua") 

22 SCRIPT_ACQUIRE_MOVING_WINDOW = get_package_data( 

23 f"{RES_DIR}/acquire_moving_window.lua" 

24 ) 

25 SCRIPT_CLEAR_KEYS = get_package_data(f"{RES_DIR}/clear_keys.lua") 

26 SCRIPT_INCR_EXPIRE = get_package_data(f"{RES_DIR}/incr_expire.lua") 

27 

28 lua_moving_window: "coredis.commands.Script[bytes]" 

29 lua_acquire_window: "coredis.commands.Script[bytes]" 

30 lua_clear_keys: "coredis.commands.Script[bytes]" 

31 lua_incr_expire: "coredis.commands.Script[bytes]" 

32 

33 async def _incr( 

34 self, 

35 key: str, 

36 expiry: int, 

37 connection: AsyncRedisClient, 

38 elastic_expiry: bool = False, 

39 amount: int = 1, 

40 ) -> int: 

41 """ 

42 increments the counter for a given rate limit key 

43 

44 :param connection: Redis connection 

45 :param key: the key to increment 

46 :param expiry: amount in seconds for the key to expire in 

47 :param amount: the number to increment by 

48 """ 

49 value = await connection.incrby(key, amount) 

50 

51 if elastic_expiry or value == amount: 

52 await connection.expire(key, expiry) 

53 

54 return value 

55 

56 async def _get(self, key: str, connection: AsyncRedisClient) -> int: 

57 """ 

58 :param connection: Redis connection 

59 :param key: the key to get the counter value for 

60 """ 

61 

62 return int(await connection.get(key) or 0) 

63 

64 async def _clear(self, key: str, connection: AsyncRedisClient) -> None: 

65 """ 

66 :param key: the key to clear rate limits for 

67 :param connection: Redis connection 

68 """ 

69 await connection.delete([key]) 

70 

71 async def get_moving_window( 

72 self, key: str, limit: int, expiry: int 

73 ) -> Tuple[int, int]: 

74 """ 

75 returns the starting point and the number of entries in the moving 

76 window 

77 

78 :param key: rate limit key 

79 :param expiry: expiry of entry 

80 :return: (start of window, number of acquired entries) 

81 """ 

82 timestamp = int(time.time()) 

83 window = await self.lua_moving_window.execute( 

84 [key], [int(timestamp - expiry), limit] 

85 ) 

86 if window: 

87 return tuple(window) # type: ignore 

88 return timestamp, 0 

89 

90 async def _acquire_entry( 

91 self, 

92 key: str, 

93 limit: int, 

94 expiry: int, 

95 connection: AsyncRedisClient, 

96 amount: int = 1, 

97 ) -> bool: 

98 """ 

99 :param key: rate limit key to acquire an entry in 

100 :param limit: amount of entries allowed 

101 :param expiry: expiry of the entry 

102 :param connection: Redis connection 

103 """ 

104 timestamp = time.time() 

105 acquired = await self.lua_acquire_window.execute( 

106 [key], [timestamp, limit, expiry, amount] 

107 ) 

108 

109 return bool(acquired) 

110 

111 async def _get_expiry(self, key: str, connection: AsyncRedisClient) -> int: 

112 """ 

113 :param key: the key to get the expiry for 

114 :param connection: Redis connection 

115 """ 

116 

117 return int(max(await connection.ttl(key), 0) + time.time()) 

118 

119 async def _check(self, connection: AsyncRedisClient) -> bool: 

120 """ 

121 check if storage is healthy 

122 

123 :param connection: Redis connection 

124 """ 

125 try: 

126 await connection.ping() 

127 

128 return True 

129 except: # noqa 

130 return False 

131 

132 

133@versionadded(version="2.1") 

134class RedisStorage(RedisInteractor, Storage, MovingWindowSupport): 

135 """ 

136 Rate limit storage with redis as backend. 

137 

138 Depends on :pypi:`coredis` 

139 """ 

140 

141 STORAGE_SCHEME = ["async+redis", "async+rediss", "async+redis+unix"] 

142 """ 

143 The storage schemes for redis to be used in an async context 

144 """ 

145 DEPENDENCIES = {"coredis": Version("3.4.0")} 

146 

147 def __init__( 

148 self, 

149 uri: str, 

150 connection_pool: Optional["coredis.ConnectionPool"] = None, 

151 **options: Union[float, str, bool], 

152 ) -> None: 

153 """ 

154 :param uri: uri of the form: 

155 

156 - ``async+redis://[:password]@host:port`` 

157 - ``async+redis://[:password]@host:port/db`` 

158 - ``async+rediss://[:password]@host:port`` 

159 - ``async+unix:///path/to/sock`` etc... 

160 

161 This uri is passed directly to :meth:`coredis.Redis.from_url` with 

162 the initial ``async`` removed, except for the case of ``async+redis+unix`` 

163 where it is replaced with ``unix``. 

164 :param connection_pool: if provided, the redis client is initialized with 

165 the connection pool and any other params passed as :paramref:`options` 

166 :param options: all remaining keyword arguments are passed 

167 directly to the constructor of :class:`coredis.Redis` 

168 :raise ConfigurationError: when the redis library is not available 

169 """ 

170 uri = uri.replace("async+redis", "redis", 1) 

171 uri = uri.replace("redis+unix", "unix") 

172 

173 super().__init__(uri, **options) 

174 

175 self.dependency = self.dependencies["coredis"].module 

176 

177 if connection_pool: 

178 self.storage = self.dependency.Redis( 

179 connection_pool=connection_pool, **options 

180 ) 

181 else: 

182 self.storage = self.dependency.Redis.from_url(uri, **options) 

183 

184 self.initialize_storage(uri) 

185 

186 def initialize_storage(self, _uri: str) -> None: 

187 # all these methods are coroutines, so must be called with await 

188 self.lua_moving_window = self.storage.register_script(self.SCRIPT_MOVING_WINDOW) 

189 self.lua_acquire_window = self.storage.register_script( 

190 self.SCRIPT_ACQUIRE_MOVING_WINDOW 

191 ) 

192 self.lua_clear_keys = self.storage.register_script(self.SCRIPT_CLEAR_KEYS) 

193 self.lua_incr_expire = self.storage.register_script( 

194 RedisStorage.SCRIPT_INCR_EXPIRE 

195 ) 

196 

197 async def incr( 

198 self, key: str, expiry: int, elastic_expiry: bool = False, amount: int = 1 

199 ) -> int: 

200 """ 

201 increments the counter for a given rate limit key 

202 

203 :param key: the key to increment 

204 :param expiry: amount in seconds for the key to expire in 

205 :param amount: the number to increment by 

206 """ 

207 

208 if elastic_expiry: 

209 return await super()._incr( 

210 key, expiry, self.storage, elastic_expiry, amount 

211 ) 

212 else: 

213 return cast( 

214 int, await self.lua_incr_expire.execute([key], [expiry, amount]) 

215 ) 

216 

217 async def get(self, key: str) -> int: 

218 """ 

219 :param key: the key to get the counter value for 

220 """ 

221 

222 return await super()._get(key, self.storage) 

223 

224 async def clear(self, key: str) -> None: 

225 """ 

226 :param key: the key to clear rate limits for 

227 """ 

228 

229 return await super()._clear(key, self.storage) 

230 

231 async def acquire_entry( 

232 self, key: str, limit: int, expiry: int, amount: int = 1 

233 ) -> bool: 

234 """ 

235 :param key: rate limit key to acquire an entry in 

236 :param limit: amount of entries allowed 

237 :param expiry: expiry of the entry 

238 :param amount: the number of entries to acquire 

239 """ 

240 

241 return await super()._acquire_entry(key, limit, expiry, self.storage, amount) 

242 

243 async def get_expiry(self, key: str) -> int: 

244 """ 

245 :param key: the key to get the expiry for 

246 """ 

247 

248 return await super()._get_expiry(key, self.storage) 

249 

250 async def check(self) -> bool: 

251 """ 

252 Check if storage is healthy by calling :meth:`coredis.Redis.ping` 

253 """ 

254 

255 return await super()._check(self.storage) 

256 

257 async def reset(self) -> Optional[int]: 

258 """ 

259 This function calls a Lua Script to delete keys prefixed with 'LIMITER' 

260 in block of 5000. 

261 

262 .. warning:: This operation was designed to be fast, but was not tested 

263 on a large production based system. Be careful with its usage as it 

264 could be slow on very large data sets. 

265 """ 

266 

267 return cast(int, await self.lua_clear_keys.execute(["LIMITER*"])) 

268 

269 

270@versionadded(version="2.1") 

271class RedisClusterStorage(RedisStorage): 

272 """ 

273 Rate limit storage with redis cluster as backend 

274 

275 Depends on :pypi:`coredis` 

276 """ 

277 

278 STORAGE_SCHEME = ["async+redis+cluster"] 

279 """ 

280 The storage schemes for redis cluster to be used in an async context 

281 """ 

282 

283 DEFAULT_OPTIONS: Dict[str, Union[float, str, bool]] = { 

284 "max_connections": 1000, 

285 } 

286 "Default options passed to :class:`coredis.RedisCluster`" 

287 

288 def __init__(self, uri: str, **options: Union[float, str, bool]) -> None: 

289 """ 

290 :param uri: url of the form 

291 ``async+redis+cluster://[:password]@host:port,host:port`` 

292 :param options: all remaining keyword arguments are passed 

293 directly to the constructor of :class:`coredis.RedisCluster` 

294 :raise ConfigurationError: when the coredis library is not 

295 available or if the redis host cannot be pinged. 

296 """ 

297 parsed = urllib.parse.urlparse(uri) 

298 parsed_auth: Dict[str, Union[float, str, bool]] = {} 

299 

300 if parsed.username: 

301 parsed_auth["username"] = parsed.username 

302 if parsed.password: 

303 parsed_auth["password"] = parsed.password 

304 

305 sep = parsed.netloc.find("@") + 1 

306 cluster_hosts = [] 

307 

308 for loc in parsed.netloc[sep:].split(","): 

309 host, port = loc.split(":") 

310 cluster_hosts.append({"host": host, "port": int(port)}) 

311 

312 super(RedisStorage, self).__init__(uri, **options) 

313 

314 self.dependency = self.dependencies["coredis"].module 

315 

316 self.storage: "coredis.RedisCluster[str]" = self.dependency.RedisCluster( 

317 startup_nodes=cluster_hosts, 

318 **{**self.DEFAULT_OPTIONS, **parsed_auth, **options}, 

319 ) 

320 self.initialize_storage(uri) 

321 

322 async def reset(self) -> Optional[int]: 

323 """ 

324 Redis Clusters are sharded and deleting across shards 

325 can't be done atomically. Because of this, this reset loops over all 

326 keys that are prefixed with 'LIMITER' and calls delete on them, one at 

327 a time. 

328 

329 .. warning:: This operation was not tested with extremely large data sets. 

330 On a large production based system, care should be taken with its 

331 usage as it could be slow on very large data sets 

332 """ 

333 

334 keys = await self.storage.keys("LIMITER*") 

335 count = 0 

336 for key in keys: 

337 count += await self.storage.delete([key]) 

338 return count 

339 

340 

341@versionadded(version="2.1") 

342class RedisSentinelStorage(RedisStorage): 

343 """ 

344 Rate limit storage with redis sentinel as backend 

345 

346 Depends on :pypi:`coredis` 

347 """ 

348 

349 STORAGE_SCHEME = ["async+redis+sentinel"] 

350 """The storage scheme for redis accessed via a redis sentinel installation""" 

351 

352 DEPENDENCIES = {"coredis.sentinel": Version("3.4.0")} 

353 

354 def __init__( 

355 self, 

356 uri: str, 

357 service_name: Optional[str] = None, 

358 use_replicas: bool = True, 

359 sentinel_kwargs: Optional[Dict[str, Union[float, str, bool]]] = None, 

360 **options: Union[float, str, bool], 

361 ): 

362 """ 

363 :param uri: url of the form 

364 ``async+redis+sentinel://host:port,host:port/service_name`` 

365 :param service_name, optional: sentinel service name 

366 (if not provided in `uri`) 

367 :param use_replicas: Whether to use replicas for read only operations 

368 :param sentinel_kwargs, optional: kwargs to pass as 

369 ``sentinel_kwargs`` to :class:`coredis.sentinel.Sentinel` 

370 :param options: all remaining keyword arguments are passed 

371 directly to the constructor of :class:`coredis.sentinel.Sentinel` 

372 :raise ConfigurationError: when the coredis library is not available 

373 or if the redis primary host cannot be pinged. 

374 """ 

375 

376 parsed = urllib.parse.urlparse(uri) 

377 sentinel_configuration = [] 

378 connection_options = options.copy() 

379 sentinel_options = sentinel_kwargs.copy() if sentinel_kwargs else {} 

380 parsed_auth: Dict[str, Union[float, str, bool]] = {} 

381 

382 if parsed.username: 

383 parsed_auth["username"] = parsed.username 

384 

385 if parsed.password: 

386 parsed_auth["password"] = parsed.password 

387 

388 sep = parsed.netloc.find("@") + 1 

389 

390 for loc in parsed.netloc[sep:].split(","): 

391 host, port = loc.split(":") 

392 sentinel_configuration.append((host, int(port))) 

393 self.service_name = ( 

394 parsed.path.replace("/", "") if parsed.path else service_name 

395 ) 

396 

397 if self.service_name is None: 

398 raise ConfigurationError("'service_name' not provided") 

399 

400 super(RedisStorage, self).__init__() 

401 

402 self.dependency = self.dependencies["coredis.sentinel"].module 

403 

404 self.sentinel = self.dependency.Sentinel( 

405 sentinel_configuration, 

406 sentinel_kwargs={**parsed_auth, **sentinel_options}, 

407 **{**parsed_auth, **connection_options}, 

408 ) 

409 self.storage = self.sentinel.primary_for(self.service_name) 

410 self.storage_replica = self.sentinel.replica_for(self.service_name) 

411 self.use_replicas = use_replicas 

412 self.initialize_storage(uri) 

413 

414 async def get(self, key: str) -> int: 

415 """ 

416 :param key: the key to get the counter value for 

417 """ 

418 

419 return await super()._get( 

420 key, self.storage_replica if self.use_replicas else self.storage 

421 ) 

422 

423 async def get_expiry(self, key: str) -> int: 

424 """ 

425 :param key: the key to get the expiry for 

426 """ 

427 

428 return await super()._get_expiry( 

429 key, self.storage_replica if self.use_replicas else self.storage 

430 ) 

431 

432 async def check(self) -> bool: 

433 """ 

434 Check if storage is healthy by calling :meth:`coredis.Redis.ping` 

435 on the replica. 

436 """ 

437 

438 return await super()._check( 

439 self.storage_replica if self.use_replicas else self.storage 

440 )