Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/django/core/cache/backends/filebased.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

113 statements  

1"File-based cache backend" 

2import glob 

3import os 

4import pickle 

5import random 

6import tempfile 

7import time 

8import zlib 

9from hashlib import md5 

10 

11from django.core.cache.backends.base import DEFAULT_TIMEOUT, BaseCache 

12from django.core.files import locks 

13from django.core.files.move import file_move_safe 

14 

15 

16class FileBasedCache(BaseCache): 

17 cache_suffix = ".djcache" 

18 pickle_protocol = pickle.HIGHEST_PROTOCOL 

19 

20 def __init__(self, dir, params): 

21 super().__init__(params) 

22 self._dir = os.path.abspath(dir) 

23 self._createdir() 

24 

25 def add(self, key, value, timeout=DEFAULT_TIMEOUT, version=None): 

26 if self.has_key(key, version): 

27 return False 

28 self.set(key, value, timeout, version) 

29 return True 

30 

31 def get(self, key, default=None, version=None): 

32 fname = self._key_to_file(key, version) 

33 try: 

34 with open(fname, "rb") as f: 

35 if not self._is_expired(f): 

36 return pickle.loads(zlib.decompress(f.read())) 

37 except FileNotFoundError: 

38 pass 

39 return default 

40 

41 def _write_content(self, file, timeout, value): 

42 expiry = self.get_backend_timeout(timeout) 

43 file.write(pickle.dumps(expiry, self.pickle_protocol)) 

44 file.write(zlib.compress(pickle.dumps(value, self.pickle_protocol))) 

45 

46 def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None): 

47 self._createdir() # Cache dir can be deleted at any time. 

48 fname = self._key_to_file(key, version) 

49 self._cull() # make some room if necessary 

50 fd, tmp_path = tempfile.mkstemp(dir=self._dir) 

51 renamed = False 

52 try: 

53 with open(fd, "wb") as f: 

54 self._write_content(f, timeout, value) 

55 file_move_safe(tmp_path, fname, allow_overwrite=True) 

56 renamed = True 

57 finally: 

58 if not renamed: 

59 os.remove(tmp_path) 

60 

61 def touch(self, key, timeout=DEFAULT_TIMEOUT, version=None): 

62 try: 

63 with open(self._key_to_file(key, version), "r+b") as f: 

64 try: 

65 locks.lock(f, locks.LOCK_EX) 

66 if self._is_expired(f): 

67 return False 

68 else: 

69 previous_value = pickle.loads(zlib.decompress(f.read())) 

70 f.seek(0) 

71 self._write_content(f, timeout, previous_value) 

72 return True 

73 finally: 

74 locks.unlock(f) 

75 except FileNotFoundError: 

76 return False 

77 

78 def delete(self, key, version=None): 

79 return self._delete(self._key_to_file(key, version)) 

80 

81 def _delete(self, fname): 

82 if not fname.startswith(self._dir) or not os.path.exists(fname): 

83 return False 

84 try: 

85 os.remove(fname) 

86 except FileNotFoundError: 

87 # The file may have been removed by another process. 

88 return False 

89 return True 

90 

91 def has_key(self, key, version=None): 

92 fname = self._key_to_file(key, version) 

93 try: 

94 with open(fname, "rb") as f: 

95 return not self._is_expired(f) 

96 except FileNotFoundError: 

97 return False 

98 

99 def _cull(self): 

100 """ 

101 Remove random cache entries if max_entries is reached at a ratio 

102 of num_entries / cull_frequency. A value of 0 for CULL_FREQUENCY means 

103 that the entire cache will be purged. 

104 """ 

105 filelist = self._list_cache_files() 

106 num_entries = len(filelist) 

107 if num_entries < self._max_entries: 

108 return # return early if no culling is required 

109 if self._cull_frequency == 0: 

110 return self.clear() # Clear the cache when CULL_FREQUENCY = 0 

111 # Delete a random selection of entries 

112 filelist = random.sample(filelist, int(num_entries / self._cull_frequency)) 

113 for fname in filelist: 

114 self._delete(fname) 

115 

116 def _createdir(self): 

117 # Set the umask because os.makedirs() doesn't apply the "mode" argument 

118 # to intermediate-level directories. 

119 old_umask = os.umask(0o077) 

120 try: 

121 os.makedirs(self._dir, 0o700, exist_ok=True) 

122 finally: 

123 os.umask(old_umask) 

124 

125 def _key_to_file(self, key, version=None): 

126 """ 

127 Convert a key into a cache file path. Basically this is the 

128 root cache path joined with the md5sum of the key and a suffix. 

129 """ 

130 key = self.make_and_validate_key(key, version=version) 

131 return os.path.join( 

132 self._dir, 

133 "".join( 

134 [ 

135 md5(key.encode(), usedforsecurity=False).hexdigest(), 

136 self.cache_suffix, 

137 ] 

138 ), 

139 ) 

140 

141 def clear(self): 

142 """ 

143 Remove all the cache files. 

144 """ 

145 for fname in self._list_cache_files(): 

146 self._delete(fname) 

147 

148 def _is_expired(self, f): 

149 """ 

150 Take an open cache file `f` and delete it if it's expired. 

151 """ 

152 try: 

153 exp = pickle.load(f) 

154 except EOFError: 

155 exp = 0 # An empty file is considered expired. 

156 if exp is not None and exp < time.time(): 

157 f.close() # On Windows a file has to be closed before deleting 

158 self._delete(f.name) 

159 return True 

160 return False 

161 

162 def _list_cache_files(self): 

163 """ 

164 Get a list of paths to all the cache files. These are all the files 

165 in the root cache dir that end on the cache_suffix. 

166 """ 

167 return [ 

168 os.path.join(self._dir, fname) 

169 for fname in glob.glob(f"*{self.cache_suffix}", root_dir=self._dir) 

170 ]