Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/cache.py: 52%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from abc import ABC, abstractmethod
2from collections import OrderedDict
3from dataclasses import dataclass
4from enum import Enum
5from typing import Any, List, Optional, Union
7from redis.observability.attributes import CSCReason
10class CacheEntryStatus(Enum):
11 VALID = "VALID"
12 IN_PROGRESS = "IN_PROGRESS"
15class EvictionPolicyType(Enum):
16 time_based = "time_based"
17 frequency_based = "frequency_based"
20@dataclass(frozen=True)
21class CacheKey:
22 """
23 Represents a unique key for a cache entry.
25 Attributes:
26 command (str): The Redis command being cached.
27 redis_keys (tuple): The Redis keys involved in the command.
28 redis_args (tuple): Additional arguments for the Redis command.
29 This field is included in the cache key to ensure uniqueness
30 when commands have the same keys but different arguments.
31 Changing this field will affect cache key uniqueness.
32 """
34 command: str
35 redis_keys: tuple
36 redis_args: tuple = () # Additional arguments for the Redis command; affects cache key uniqueness.
39class CacheEntry:
40 def __init__(
41 self,
42 cache_key: CacheKey,
43 cache_value: bytes,
44 status: CacheEntryStatus,
45 connection_ref,
46 ):
47 self.cache_key = cache_key
48 self.cache_value = cache_value
49 self.status = status
50 self.connection_ref = connection_ref
52 def __hash__(self):
53 return hash(
54 (self.cache_key, self.cache_value, self.status, self.connection_ref)
55 )
57 def __eq__(self, other):
58 return hash(self) == hash(other)
61class EvictionPolicyInterface(ABC):
62 @property
63 @abstractmethod
64 def cache(self):
65 pass
67 @cache.setter
68 @abstractmethod
69 def cache(self, value):
70 pass
72 @property
73 @abstractmethod
74 def type(self) -> EvictionPolicyType:
75 pass
77 @abstractmethod
78 def evict_next(self) -> CacheKey:
79 pass
81 @abstractmethod
82 def evict_many(self, count: int) -> List[CacheKey]:
83 pass
85 @abstractmethod
86 def touch(self, cache_key: CacheKey) -> None:
87 pass
90class CacheConfigurationInterface(ABC):
91 @abstractmethod
92 def get_cache_class(self):
93 pass
95 @abstractmethod
96 def get_max_size(self) -> int:
97 pass
99 @abstractmethod
100 def get_eviction_policy(self):
101 pass
103 @abstractmethod
104 def is_exceeds_max_size(self, count: int) -> bool:
105 pass
107 @abstractmethod
108 def is_allowed_to_cache(self, command: str) -> bool:
109 pass
112class CacheInterface(ABC):
113 @property
114 @abstractmethod
115 def collection(self) -> OrderedDict:
116 pass
118 @property
119 @abstractmethod
120 def config(self) -> CacheConfigurationInterface:
121 pass
123 @property
124 @abstractmethod
125 def eviction_policy(self) -> EvictionPolicyInterface:
126 pass
128 @property
129 @abstractmethod
130 def size(self) -> int:
131 pass
133 @abstractmethod
134 def get(self, key: CacheKey) -> Union[CacheEntry, None]:
135 pass
137 @abstractmethod
138 def set(self, entry: CacheEntry) -> bool:
139 pass
141 @abstractmethod
142 def delete_by_cache_keys(self, cache_keys: List[CacheKey]) -> List[bool]:
143 pass
145 @abstractmethod
146 def delete_by_redis_keys(self, redis_keys: List[bytes]) -> List[bool]:
147 pass
149 @abstractmethod
150 def flush(self) -> int:
151 pass
153 @abstractmethod
154 def is_cachable(self, key: CacheKey) -> bool:
155 pass
158class DefaultCache(CacheInterface):
159 def __init__(
160 self,
161 cache_config: CacheConfigurationInterface,
162 ) -> None:
163 self._cache = OrderedDict()
164 self._cache_config = cache_config
165 self._eviction_policy = self._cache_config.get_eviction_policy().value()
166 self._eviction_policy.cache = self
168 @property
169 def collection(self) -> OrderedDict:
170 return self._cache
172 @property
173 def config(self) -> CacheConfigurationInterface:
174 return self._cache_config
176 @property
177 def eviction_policy(self) -> EvictionPolicyInterface:
178 return self._eviction_policy
180 @property
181 def size(self) -> int:
182 return len(self._cache)
184 def set(self, entry: CacheEntry) -> bool:
185 if not self.is_cachable(entry.cache_key):
186 return False
188 self._cache[entry.cache_key] = entry
189 self._eviction_policy.touch(entry.cache_key)
191 return True
193 def get(self, key: CacheKey) -> Union[CacheEntry, None]:
194 entry = self._cache.get(key, None)
196 if entry is None:
197 return None
199 self._eviction_policy.touch(key)
200 return entry
202 def delete_by_cache_keys(self, cache_keys: List[CacheKey]) -> List[bool]:
203 response = []
205 for key in cache_keys:
206 if self.get(key) is not None:
207 self._cache.pop(key)
208 response.append(True)
209 else:
210 response.append(False)
212 return response
214 def delete_by_redis_keys(
215 self, redis_keys: Union[List[bytes], List[str]]
216 ) -> List[bool]:
217 response = []
218 keys_to_delete = []
220 for redis_key in redis_keys:
221 # Prepare both versions for lookup
222 candidates = [redis_key]
223 if isinstance(redis_key, str):
224 candidates.append(redis_key.encode("utf-8"))
225 elif isinstance(redis_key, bytes):
226 try:
227 candidates.append(redis_key.decode("utf-8"))
228 except UnicodeDecodeError:
229 pass # Non-UTF-8 bytes, skip str version
231 for cache_key in self._cache:
232 if any(candidate in cache_key.redis_keys for candidate in candidates):
233 keys_to_delete.append(cache_key)
234 response.append(True)
236 for key in keys_to_delete:
237 self._cache.pop(key)
239 return response
241 def flush(self) -> int:
242 elem_count = len(self._cache)
243 self._cache.clear()
244 return elem_count
246 def is_cachable(self, key: CacheKey) -> bool:
247 return self._cache_config.is_allowed_to_cache(key.command)
250class CacheProxy(CacheInterface):
251 """
252 Proxy object that wraps cache implementations to enable additional logic on top
253 """
255 def __init__(self, cache: CacheInterface):
256 self._cache = cache
258 @property
259 def collection(self) -> OrderedDict:
260 return self._cache.collection
262 @property
263 def config(self) -> CacheConfigurationInterface:
264 return self._cache.config
266 @property
267 def eviction_policy(self) -> EvictionPolicyInterface:
268 return self._cache.eviction_policy
270 @property
271 def size(self) -> int:
272 return self._cache.size
274 def get(self, key: CacheKey) -> Union[CacheEntry, None]:
275 return self._cache.get(key)
277 def set(self, entry: CacheEntry) -> bool:
278 is_set = self._cache.set(entry)
280 if self.config.is_exceeds_max_size(self.size):
281 # Lazy import to avoid circular dependency
282 from redis.observability.recorder import record_csc_eviction
284 record_csc_eviction(
285 count=1,
286 reason=CSCReason.FULL,
287 )
288 self.eviction_policy.evict_next()
290 return is_set
292 def delete_by_cache_keys(self, cache_keys: List[CacheKey]) -> List[bool]:
293 return self._cache.delete_by_cache_keys(cache_keys)
295 def delete_by_redis_keys(self, redis_keys: List[bytes]) -> List[bool]:
296 return self._cache.delete_by_redis_keys(redis_keys)
298 def flush(self) -> int:
299 return self._cache.flush()
301 def is_cachable(self, key: CacheKey) -> bool:
302 return self._cache.is_cachable(key)
305class LRUPolicy(EvictionPolicyInterface):
306 def __init__(self):
307 self.cache = None
309 @property
310 def cache(self):
311 return self._cache
313 @cache.setter
314 def cache(self, cache: CacheInterface):
315 self._cache = cache
317 @property
318 def type(self) -> EvictionPolicyType:
319 return EvictionPolicyType.time_based
321 def evict_next(self) -> CacheKey:
322 self._assert_cache()
323 popped_entry = self._cache.collection.popitem(last=False)
324 return popped_entry[0]
326 def evict_many(self, count: int) -> List[CacheKey]:
327 self._assert_cache()
328 if count > len(self._cache.collection):
329 raise ValueError("Evictions count is above cache size")
331 popped_keys = []
333 for _ in range(count):
334 popped_entry = self._cache.collection.popitem(last=False)
335 popped_keys.append(popped_entry[0])
337 return popped_keys
339 def touch(self, cache_key: CacheKey) -> None:
340 self._assert_cache()
342 if self._cache.collection.get(cache_key) is None:
343 raise ValueError("Given entry does not belong to the cache")
345 self._cache.collection.move_to_end(cache_key)
347 def _assert_cache(self):
348 if self.cache is None or not isinstance(self.cache, CacheInterface):
349 raise ValueError("Eviction policy should be associated with valid cache.")
352class EvictionPolicy(Enum):
353 LRU = LRUPolicy
356class CacheConfig(CacheConfigurationInterface):
357 DEFAULT_CACHE_CLASS = DefaultCache
358 DEFAULT_EVICTION_POLICY = EvictionPolicy.LRU
359 DEFAULT_MAX_SIZE = 10000
361 DEFAULT_ALLOW_LIST = [
362 "BITCOUNT",
363 "BITFIELD_RO",
364 "BITPOS",
365 "EXISTS",
366 "GEODIST",
367 "GEOHASH",
368 "GEOPOS",
369 "GEORADIUSBYMEMBER_RO",
370 "GEORADIUS_RO",
371 "GEOSEARCH",
372 "GET",
373 "GETBIT",
374 "GETRANGE",
375 "HEXISTS",
376 "HGET",
377 "HGETALL",
378 "HKEYS",
379 "HLEN",
380 "HMGET",
381 "HSTRLEN",
382 "HVALS",
383 "JSON.ARRINDEX",
384 "JSON.ARRLEN",
385 "JSON.GET",
386 "JSON.MGET",
387 "JSON.OBJKEYS",
388 "JSON.OBJLEN",
389 "JSON.RESP",
390 "JSON.STRLEN",
391 "JSON.TYPE",
392 "LCS",
393 "LINDEX",
394 "LLEN",
395 "LPOS",
396 "LRANGE",
397 "MGET",
398 "SCARD",
399 "SDIFF",
400 "SINTER",
401 "SINTERCARD",
402 "SISMEMBER",
403 "SMEMBERS",
404 "SMISMEMBER",
405 "SORT_RO",
406 "STRLEN",
407 "SUBSTR",
408 "SUNION",
409 "TS.GET",
410 "TS.INFO",
411 "TS.RANGE",
412 "TS.REVRANGE",
413 "TYPE",
414 "XLEN",
415 "XPENDING",
416 "XRANGE",
417 "XREAD",
418 "XREVRANGE",
419 "ZCARD",
420 "ZCOUNT",
421 "ZDIFF",
422 "ZINTER",
423 "ZINTERCARD",
424 "ZLEXCOUNT",
425 "ZMSCORE",
426 "ZRANGE",
427 "ZRANGEBYLEX",
428 "ZRANGEBYSCORE",
429 "ZRANK",
430 "ZREVRANGE",
431 "ZREVRANGEBYLEX",
432 "ZREVRANGEBYSCORE",
433 "ZREVRANK",
434 "ZSCORE",
435 "ZUNION",
436 ]
438 def __init__(
439 self,
440 max_size: int = DEFAULT_MAX_SIZE,
441 cache_class: Any = DEFAULT_CACHE_CLASS,
442 eviction_policy: EvictionPolicy = DEFAULT_EVICTION_POLICY,
443 ):
444 self._cache_class = cache_class
445 self._max_size = max_size
446 self._eviction_policy = eviction_policy
448 def get_cache_class(self):
449 return self._cache_class
451 def get_max_size(self) -> int:
452 return self._max_size
454 def get_eviction_policy(self) -> EvictionPolicy:
455 return self._eviction_policy
457 def is_exceeds_max_size(self, count: int) -> bool:
458 return count > self._max_size
460 def is_allowed_to_cache(self, command: str) -> bool:
461 return command in self.DEFAULT_ALLOW_LIST
464class CacheFactoryInterface(ABC):
465 @abstractmethod
466 def get_cache(self) -> CacheInterface:
467 pass
470class CacheFactory(CacheFactoryInterface):
471 def __init__(self, cache_config: Optional[CacheConfig] = None):
472 self._config = cache_config
474 if self._config is None:
475 self._config = CacheConfig()
477 def get_cache(self) -> CacheInterface:
478 cache_class = self._config.get_cache_class()
479 return CacheProxy(cache_class(cache_config=self._config))