Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cache.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

261 statements  

1from abc import ABC, abstractmethod 

2from collections import OrderedDict 

3from dataclasses import dataclass 

4from enum import Enum 

5from typing import Any, List, Optional, Union 

6 

7from redis.observability.attributes import CSCReason 

8 

9 

10class CacheEntryStatus(Enum): 

11 VALID = "VALID" 

12 IN_PROGRESS = "IN_PROGRESS" 

13 

14 

15class EvictionPolicyType(Enum): 

16 time_based = "time_based" 

17 frequency_based = "frequency_based" 

18 

19 

20@dataclass(frozen=True) 

21class CacheKey: 

22 """ 

23 Represents a unique key for a cache entry. 

24 

25 Attributes: 

26 command (str): The Redis command being cached. 

27 redis_keys (tuple): The Redis keys involved in the command. 

28 redis_args (tuple): Additional arguments for the Redis command. 

29 This field is included in the cache key to ensure uniqueness 

30 when commands have the same keys but different arguments. 

31 Changing this field will affect cache key uniqueness. 

32 """ 

33 

34 command: str 

35 redis_keys: tuple 

36 redis_args: tuple = () # Additional arguments for the Redis command; affects cache key uniqueness. 

37 

38 

39class CacheEntry: 

40 def __init__( 

41 self, 

42 cache_key: CacheKey, 

43 cache_value: bytes, 

44 status: CacheEntryStatus, 

45 connection_ref, 

46 ): 

47 self.cache_key = cache_key 

48 self.cache_value = cache_value 

49 self.status = status 

50 self.connection_ref = connection_ref 

51 

52 def __hash__(self): 

53 return hash( 

54 (self.cache_key, self.cache_value, self.status, self.connection_ref) 

55 ) 

56 

57 def __eq__(self, other): 

58 return hash(self) == hash(other) 

59 

60 

61class EvictionPolicyInterface(ABC): 

62 @property 

63 @abstractmethod 

64 def cache(self): 

65 pass 

66 

67 @cache.setter 

68 @abstractmethod 

69 def cache(self, value): 

70 pass 

71 

72 @property 

73 @abstractmethod 

74 def type(self) -> EvictionPolicyType: 

75 pass 

76 

77 @abstractmethod 

78 def evict_next(self) -> CacheKey: 

79 pass 

80 

81 @abstractmethod 

82 def evict_many(self, count: int) -> List[CacheKey]: 

83 pass 

84 

85 @abstractmethod 

86 def touch(self, cache_key: CacheKey) -> None: 

87 pass 

88 

89 

90class CacheConfigurationInterface(ABC): 

91 @abstractmethod 

92 def get_cache_class(self): 

93 pass 

94 

95 @abstractmethod 

96 def get_max_size(self) -> int: 

97 pass 

98 

99 @abstractmethod 

100 def get_eviction_policy(self): 

101 pass 

102 

103 @abstractmethod 

104 def is_exceeds_max_size(self, count: int) -> bool: 

105 pass 

106 

107 @abstractmethod 

108 def is_allowed_to_cache(self, command: str) -> bool: 

109 pass 

110 

111 

112class CacheInterface(ABC): 

113 @property 

114 @abstractmethod 

115 def collection(self) -> OrderedDict: 

116 pass 

117 

118 @property 

119 @abstractmethod 

120 def config(self) -> CacheConfigurationInterface: 

121 pass 

122 

123 @property 

124 @abstractmethod 

125 def eviction_policy(self) -> EvictionPolicyInterface: 

126 pass 

127 

128 @property 

129 @abstractmethod 

130 def size(self) -> int: 

131 pass 

132 

133 @abstractmethod 

134 def get(self, key: CacheKey) -> Union[CacheEntry, None]: 

135 pass 

136 

137 @abstractmethod 

138 def set(self, entry: CacheEntry) -> bool: 

139 pass 

140 

141 @abstractmethod 

142 def delete_by_cache_keys(self, cache_keys: List[CacheKey]) -> List[bool]: 

143 pass 

144 

145 @abstractmethod 

146 def delete_by_redis_keys(self, redis_keys: List[bytes]) -> List[bool]: 

147 pass 

148 

149 @abstractmethod 

150 def flush(self) -> int: 

151 pass 

152 

153 @abstractmethod 

154 def is_cachable(self, key: CacheKey) -> bool: 

155 pass 

156 

157 

158class DefaultCache(CacheInterface): 

159 def __init__( 

160 self, 

161 cache_config: CacheConfigurationInterface, 

162 ) -> None: 

163 self._cache = OrderedDict() 

164 self._cache_config = cache_config 

165 self._eviction_policy = self._cache_config.get_eviction_policy().value() 

166 self._eviction_policy.cache = self 

167 

168 @property 

169 def collection(self) -> OrderedDict: 

170 return self._cache 

171 

172 @property 

173 def config(self) -> CacheConfigurationInterface: 

174 return self._cache_config 

175 

176 @property 

177 def eviction_policy(self) -> EvictionPolicyInterface: 

178 return self._eviction_policy 

179 

180 @property 

181 def size(self) -> int: 

182 return len(self._cache) 

183 

184 def set(self, entry: CacheEntry) -> bool: 

185 if not self.is_cachable(entry.cache_key): 

186 return False 

187 

188 self._cache[entry.cache_key] = entry 

189 self._eviction_policy.touch(entry.cache_key) 

190 

191 return True 

192 

193 def get(self, key: CacheKey) -> Union[CacheEntry, None]: 

194 entry = self._cache.get(key, None) 

195 

196 if entry is None: 

197 return None 

198 

199 self._eviction_policy.touch(key) 

200 return entry 

201 

202 def delete_by_cache_keys(self, cache_keys: List[CacheKey]) -> List[bool]: 

203 response = [] 

204 

205 for key in cache_keys: 

206 if self.get(key) is not None: 

207 self._cache.pop(key) 

208 response.append(True) 

209 else: 

210 response.append(False) 

211 

212 return response 

213 

214 def delete_by_redis_keys( 

215 self, redis_keys: Union[List[bytes], List[str]] 

216 ) -> List[bool]: 

217 response = [] 

218 keys_to_delete = [] 

219 

220 for redis_key in redis_keys: 

221 # Prepare both versions for lookup 

222 candidates = [redis_key] 

223 if isinstance(redis_key, str): 

224 candidates.append(redis_key.encode("utf-8")) 

225 elif isinstance(redis_key, bytes): 

226 try: 

227 candidates.append(redis_key.decode("utf-8")) 

228 except UnicodeDecodeError: 

229 pass # Non-UTF-8 bytes, skip str version 

230 

231 for cache_key in self._cache: 

232 if any(candidate in cache_key.redis_keys for candidate in candidates): 

233 keys_to_delete.append(cache_key) 

234 response.append(True) 

235 

236 for key in keys_to_delete: 

237 self._cache.pop(key) 

238 

239 return response 

240 

241 def flush(self) -> int: 

242 elem_count = len(self._cache) 

243 self._cache.clear() 

244 return elem_count 

245 

246 def is_cachable(self, key: CacheKey) -> bool: 

247 return self._cache_config.is_allowed_to_cache(key.command) 

248 

249 

250class CacheProxy(CacheInterface): 

251 """ 

252 Proxy object that wraps cache implementations to enable additional logic on top 

253 """ 

254 

255 def __init__(self, cache: CacheInterface): 

256 self._cache = cache 

257 

258 @property 

259 def collection(self) -> OrderedDict: 

260 return self._cache.collection 

261 

262 @property 

263 def config(self) -> CacheConfigurationInterface: 

264 return self._cache.config 

265 

266 @property 

267 def eviction_policy(self) -> EvictionPolicyInterface: 

268 return self._cache.eviction_policy 

269 

270 @property 

271 def size(self) -> int: 

272 return self._cache.size 

273 

274 def get(self, key: CacheKey) -> Union[CacheEntry, None]: 

275 return self._cache.get(key) 

276 

277 def set(self, entry: CacheEntry) -> bool: 

278 is_set = self._cache.set(entry) 

279 

280 if self.config.is_exceeds_max_size(self.size): 

281 # Lazy import to avoid circular dependency 

282 from redis.observability.recorder import record_csc_eviction 

283 

284 record_csc_eviction( 

285 count=1, 

286 reason=CSCReason.FULL, 

287 ) 

288 self.eviction_policy.evict_next() 

289 

290 return is_set 

291 

292 def delete_by_cache_keys(self, cache_keys: List[CacheKey]) -> List[bool]: 

293 return self._cache.delete_by_cache_keys(cache_keys) 

294 

295 def delete_by_redis_keys(self, redis_keys: List[bytes]) -> List[bool]: 

296 return self._cache.delete_by_redis_keys(redis_keys) 

297 

298 def flush(self) -> int: 

299 return self._cache.flush() 

300 

301 def is_cachable(self, key: CacheKey) -> bool: 

302 return self._cache.is_cachable(key) 

303 

304 

305class LRUPolicy(EvictionPolicyInterface): 

306 def __init__(self): 

307 self.cache = None 

308 

309 @property 

310 def cache(self): 

311 return self._cache 

312 

313 @cache.setter 

314 def cache(self, cache: CacheInterface): 

315 self._cache = cache 

316 

317 @property 

318 def type(self) -> EvictionPolicyType: 

319 return EvictionPolicyType.time_based 

320 

321 def evict_next(self) -> CacheKey: 

322 self._assert_cache() 

323 popped_entry = self._cache.collection.popitem(last=False) 

324 return popped_entry[0] 

325 

326 def evict_many(self, count: int) -> List[CacheKey]: 

327 self._assert_cache() 

328 if count > len(self._cache.collection): 

329 raise ValueError("Evictions count is above cache size") 

330 

331 popped_keys = [] 

332 

333 for _ in range(count): 

334 popped_entry = self._cache.collection.popitem(last=False) 

335 popped_keys.append(popped_entry[0]) 

336 

337 return popped_keys 

338 

339 def touch(self, cache_key: CacheKey) -> None: 

340 self._assert_cache() 

341 

342 if self._cache.collection.get(cache_key) is None: 

343 raise ValueError("Given entry does not belong to the cache") 

344 

345 self._cache.collection.move_to_end(cache_key) 

346 

347 def _assert_cache(self): 

348 if self.cache is None or not isinstance(self.cache, CacheInterface): 

349 raise ValueError("Eviction policy should be associated with valid cache.") 

350 

351 

352class EvictionPolicy(Enum): 

353 LRU = LRUPolicy 

354 

355 

356class CacheConfig(CacheConfigurationInterface): 

357 DEFAULT_CACHE_CLASS = DefaultCache 

358 DEFAULT_EVICTION_POLICY = EvictionPolicy.LRU 

359 DEFAULT_MAX_SIZE = 10000 

360 

361 DEFAULT_ALLOW_LIST = [ 

362 "BITCOUNT", 

363 "BITFIELD_RO", 

364 "BITPOS", 

365 "EXISTS", 

366 "GEODIST", 

367 "GEOHASH", 

368 "GEOPOS", 

369 "GEORADIUSBYMEMBER_RO", 

370 "GEORADIUS_RO", 

371 "GEOSEARCH", 

372 "GET", 

373 "GETBIT", 

374 "GETRANGE", 

375 "HEXISTS", 

376 "HGET", 

377 "HGETALL", 

378 "HKEYS", 

379 "HLEN", 

380 "HMGET", 

381 "HSTRLEN", 

382 "HVALS", 

383 "JSON.ARRINDEX", 

384 "JSON.ARRLEN", 

385 "JSON.GET", 

386 "JSON.MGET", 

387 "JSON.OBJKEYS", 

388 "JSON.OBJLEN", 

389 "JSON.RESP", 

390 "JSON.STRLEN", 

391 "JSON.TYPE", 

392 "LCS", 

393 "LINDEX", 

394 "LLEN", 

395 "LPOS", 

396 "LRANGE", 

397 "MGET", 

398 "SCARD", 

399 "SDIFF", 

400 "SINTER", 

401 "SINTERCARD", 

402 "SISMEMBER", 

403 "SMEMBERS", 

404 "SMISMEMBER", 

405 "SORT_RO", 

406 "STRLEN", 

407 "SUBSTR", 

408 "SUNION", 

409 "TS.GET", 

410 "TS.INFO", 

411 "TS.RANGE", 

412 "TS.REVRANGE", 

413 "TYPE", 

414 "XLEN", 

415 "XPENDING", 

416 "XRANGE", 

417 "XREAD", 

418 "XREVRANGE", 

419 "ZCARD", 

420 "ZCOUNT", 

421 "ZDIFF", 

422 "ZINTER", 

423 "ZINTERCARD", 

424 "ZLEXCOUNT", 

425 "ZMSCORE", 

426 "ZRANGE", 

427 "ZRANGEBYLEX", 

428 "ZRANGEBYSCORE", 

429 "ZRANK", 

430 "ZREVRANGE", 

431 "ZREVRANGEBYLEX", 

432 "ZREVRANGEBYSCORE", 

433 "ZREVRANK", 

434 "ZSCORE", 

435 "ZUNION", 

436 ] 

437 

438 def __init__( 

439 self, 

440 max_size: int = DEFAULT_MAX_SIZE, 

441 cache_class: Any = DEFAULT_CACHE_CLASS, 

442 eviction_policy: EvictionPolicy = DEFAULT_EVICTION_POLICY, 

443 ): 

444 self._cache_class = cache_class 

445 self._max_size = max_size 

446 self._eviction_policy = eviction_policy 

447 

448 def get_cache_class(self): 

449 return self._cache_class 

450 

451 def get_max_size(self) -> int: 

452 return self._max_size 

453 

454 def get_eviction_policy(self) -> EvictionPolicy: 

455 return self._eviction_policy 

456 

457 def is_exceeds_max_size(self, count: int) -> bool: 

458 return count > self._max_size 

459 

460 def is_allowed_to_cache(self, command: str) -> bool: 

461 return command in self.DEFAULT_ALLOW_LIST 

462 

463 

464class CacheFactoryInterface(ABC): 

465 @abstractmethod 

466 def get_cache(self) -> CacheInterface: 

467 pass 

468 

469 

470class CacheFactory(CacheFactoryInterface): 

471 def __init__(self, cache_config: Optional[CacheConfig] = None): 

472 self._config = cache_config 

473 

474 if self._config is None: 

475 self._config = CacheConfig() 

476 

477 def get_cache(self) -> CacheInterface: 

478 cache_class = self._config.get_cache_class() 

479 return CacheProxy(cache_class(cache_config=self._config))