Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/lru_cache.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

203 statements  

1# lru_cache.py -- Simple LRU cache for dulwich 

2# Copyright (C) 2006, 2008 Canonical Ltd 

3# Copyright (C) 2022 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as public by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""A simple least-recently-used (LRU) cache.""" 

24 

25from collections.abc import Iterable, Iterator 

26from typing import Callable, Generic, Optional, TypeVar, Union, cast 

27 

28_null_key = object() 

29 

30 

31K = TypeVar("K") 

32V = TypeVar("V") 

33 

34 

35class _LRUNode(Generic[K, V]): 

36 """This maintains the linked-list which is the lru internals.""" 

37 

38 __slots__ = ("cleanup", "key", "next_key", "prev", "size", "value") 

39 

40 prev: Optional["_LRUNode[K, V]"] 

41 next_key: Union[K, object] 

42 size: Optional[int] 

43 

44 def __init__( 

45 self, key: K, value: V, cleanup: Optional[Callable[[K, V], None]] = None 

46 ) -> None: 

47 self.prev = None 

48 self.next_key = _null_key 

49 self.key = key 

50 self.value = value 

51 self.cleanup = cleanup 

52 # TODO: We could compute this 'on-the-fly' like we used to, and remove 

53 # one pointer from this object, we just need to decide if it 

54 # actually costs us much of anything in normal usage 

55 self.size = None 

56 

57 def __repr__(self) -> str: 

58 if self.prev is None: 

59 prev_key = None 

60 else: 

61 prev_key = self.prev.key 

62 return f"{self.__class__.__name__}({self.key!r} n:{self.next_key!r} p:{prev_key!r})" 

63 

64 def run_cleanup(self) -> None: 

65 if self.cleanup is not None: 

66 self.cleanup(self.key, self.value) 

67 self.cleanup = None 

68 # Just make sure to break any refcycles, etc 

69 del self.value 

70 

71 

72class LRUCache(Generic[K, V]): 

73 """A class which manages a cache of entries, removing unused ones.""" 

74 

75 _least_recently_used: Optional[_LRUNode[K, V]] 

76 _most_recently_used: Optional[_LRUNode[K, V]] 

77 

78 def __init__( 

79 self, max_cache: int = 100, after_cleanup_count: Optional[int] = None 

80 ) -> None: 

81 self._cache: dict[K, _LRUNode[K, V]] = {} 

82 # The "HEAD" of the lru linked list 

83 self._most_recently_used = None 

84 # The "TAIL" of the lru linked list 

85 self._least_recently_used = None 

86 self._update_max_cache(max_cache, after_cleanup_count) 

87 

88 def __contains__(self, key: K) -> bool: 

89 return key in self._cache 

90 

91 def __getitem__(self, key: K) -> V: 

92 cache = self._cache 

93 node = cache[key] 

94 # Inlined from _record_access to decrease the overhead of __getitem__ 

95 # We also have more knowledge about structure if __getitem__ is 

96 # succeeding, then we know that self._most_recently_used must not be 

97 # None, etc. 

98 mru = self._most_recently_used 

99 if node is mru: 

100 # Nothing to do, this node is already at the head of the queue 

101 return node.value 

102 # Remove this node from the old location 

103 node_prev = node.prev 

104 next_key = node.next_key 

105 # benchmarking shows that the lookup of _null_key in globals is faster 

106 # than the attribute lookup for (node is self._least_recently_used) 

107 if next_key is _null_key: 

108 # 'node' is the _least_recently_used, because it doesn't have a 

109 # 'next' item. So move the current lru to the previous node. 

110 self._least_recently_used = node_prev 

111 else: 

112 node_next = cache[cast(K, next_key)] 

113 node_next.prev = node_prev 

114 assert node_prev 

115 assert mru 

116 node_prev.next_key = next_key 

117 # Insert this node at the front of the list 

118 node.next_key = mru.key 

119 mru.prev = node 

120 self._most_recently_used = node 

121 node.prev = None 

122 return node.value 

123 

124 def __len__(self) -> int: 

125 return len(self._cache) 

126 

127 def _walk_lru(self) -> Iterator[_LRUNode[K, V]]: 

128 """Walk the LRU list, only meant to be used in tests.""" 

129 node = self._most_recently_used 

130 if node is not None: 

131 if node.prev is not None: 

132 raise AssertionError( 

133 "the _most_recently_used entry is not" 

134 " supposed to have a previous entry" 

135 f" {node}" 

136 ) 

137 while node is not None: 

138 if node.next_key is _null_key: 

139 if node is not self._least_recently_used: 

140 raise AssertionError( 

141 f"only the last node should have no next value: {node}" 

142 ) 

143 node_next = None 

144 else: 

145 node_next = self._cache[cast(K, node.next_key)] 

146 if node_next.prev is not node: 

147 raise AssertionError( 

148 f"inconsistency found, node.next.prev != node: {node}" 

149 ) 

150 if node.prev is None: 

151 if node is not self._most_recently_used: 

152 raise AssertionError( 

153 "only the _most_recently_used should" 

154 f" not have a previous node: {node}" 

155 ) 

156 else: 

157 if node.prev.next_key != node.key: 

158 raise AssertionError( 

159 f"inconsistency found, node.prev.next != node: {node}" 

160 ) 

161 yield node 

162 node = node_next 

163 

164 def add( 

165 self, key: K, value: V, cleanup: Optional[Callable[[K, V], None]] = None 

166 ) -> None: 

167 """Add a new value to the cache. 

168 

169 Also, if the entry is ever removed from the cache, call 

170 cleanup(key, value). 

171 

172 Args: 

173 key: The key to store it under 

174 value: The object to store 

175 cleanup: None or a function taking (key, value) to indicate 

176 'value' should be cleaned up. 

177 """ 

178 if key is _null_key: 

179 raise ValueError("cannot use _null_key as a key") 

180 if key in self._cache: 

181 node = self._cache[key] 

182 node.run_cleanup() 

183 node.value = value 

184 node.cleanup = cleanup 

185 else: 

186 node = _LRUNode(key, value, cleanup=cleanup) 

187 self._cache[key] = node 

188 self._record_access(node) 

189 

190 if len(self._cache) > self._max_cache: 

191 # Trigger the cleanup 

192 self.cleanup() 

193 

194 def cache_size(self) -> int: 

195 """Get the number of entries we will cache.""" 

196 return self._max_cache 

197 

198 def get(self, key: K, default: Optional[V] = None) -> Optional[V]: 

199 node = self._cache.get(key, None) 

200 if node is None: 

201 return default 

202 self._record_access(node) 

203 return node.value 

204 

205 def keys(self) -> Iterable[K]: 

206 """Get the list of keys currently cached. 

207 

208 Note that values returned here may not be available by the time you 

209 request them later. This is simply meant as a peak into the current 

210 state. 

211 

212 Returns: An unordered list of keys that are currently cached. 

213 """ 

214 return self._cache.keys() 

215 

216 def items(self) -> dict[K, V]: 

217 """Get the key:value pairs as a dict.""" 

218 return {k: n.value for k, n in self._cache.items()} 

219 

220 def cleanup(self) -> None: 

221 """Clear the cache until it shrinks to the requested size. 

222 

223 This does not completely wipe the cache, just makes sure it is under 

224 the after_cleanup_count. 

225 """ 

226 # Make sure the cache is shrunk to the correct size 

227 while len(self._cache) > self._after_cleanup_count: 

228 self._remove_lru() 

229 

230 def __setitem__(self, key: K, value: V) -> None: 

231 """Add a value to the cache, there will be no cleanup function.""" 

232 self.add(key, value, cleanup=None) 

233 

234 def _record_access(self, node: _LRUNode[K, V]) -> None: 

235 """Record that key was accessed.""" 

236 # Move 'node' to the front of the queue 

237 if self._most_recently_used is None: 

238 self._most_recently_used = node 

239 self._least_recently_used = node 

240 return 

241 elif node is self._most_recently_used: 

242 # Nothing to do, this node is already at the head of the queue 

243 return 

244 # We've taken care of the tail pointer, remove the node, and insert it 

245 # at the front 

246 # REMOVE 

247 if node is self._least_recently_used: 

248 self._least_recently_used = node.prev 

249 if node.prev is not None: 

250 node.prev.next_key = node.next_key 

251 if node.next_key is not _null_key: 

252 node_next = self._cache[cast(K, node.next_key)] 

253 node_next.prev = node.prev 

254 # INSERT 

255 node.next_key = self._most_recently_used.key 

256 self._most_recently_used.prev = node 

257 self._most_recently_used = node 

258 node.prev = None 

259 

260 def _remove_node(self, node: _LRUNode[K, V]) -> None: 

261 if node is self._least_recently_used: 

262 self._least_recently_used = node.prev 

263 self._cache.pop(node.key) 

264 # If we have removed all entries, remove the head pointer as well 

265 if self._least_recently_used is None: 

266 self._most_recently_used = None 

267 node.run_cleanup() 

268 # Now remove this node from the linked list 

269 if node.prev is not None: 

270 node.prev.next_key = node.next_key 

271 if node.next_key is not _null_key: 

272 node_next = self._cache[cast(K, node.next_key)] 

273 node_next.prev = node.prev 

274 # And remove this node's pointers 

275 node.prev = None 

276 node.next_key = _null_key 

277 

278 def _remove_lru(self) -> None: 

279 """Remove one entry from the lru, and handle consequences. 

280 

281 If there are no more references to the lru, then this entry should be 

282 removed from the cache. 

283 """ 

284 assert self._least_recently_used 

285 self._remove_node(self._least_recently_used) 

286 

287 def clear(self) -> None: 

288 """Clear out all of the cache.""" 

289 # Clean up in LRU order 

290 while self._cache: 

291 self._remove_lru() 

292 

293 def resize(self, max_cache: int, after_cleanup_count: Optional[int] = None) -> None: 

294 """Change the number of entries that will be cached.""" 

295 self._update_max_cache(max_cache, after_cleanup_count=after_cleanup_count) 

296 

297 def _update_max_cache( 

298 self, max_cache: int, after_cleanup_count: Optional[int] = None 

299 ) -> None: 

300 self._max_cache = max_cache 

301 if after_cleanup_count is None: 

302 self._after_cleanup_count = self._max_cache * 8 / 10 

303 else: 

304 self._after_cleanup_count = min(after_cleanup_count, self._max_cache) 

305 self.cleanup() 

306 

307 

308class LRUSizeCache(LRUCache[K, V]): 

309 """An LRUCache that removes things based on the size of the values. 

310 

311 This differs in that it doesn't care how many actual items there are, 

312 it just restricts the cache to be cleaned up after so much data is stored. 

313 

314 The size of items added will be computed using compute_size(value), which 

315 defaults to len() if not supplied. 

316 """ 

317 

318 _compute_size: Callable[[V], int] 

319 

320 def __init__( 

321 self, 

322 max_size: int = 1024 * 1024, 

323 after_cleanup_size: Optional[int] = None, 

324 compute_size: Optional[Callable[[V], int]] = None, 

325 ) -> None: 

326 """Create a new LRUSizeCache. 

327 

328 Args: 

329 max_size: The max number of bytes to store before we start 

330 clearing out entries. 

331 after_cleanup_size: After cleaning up, shrink everything to this 

332 size. 

333 compute_size: A function to compute the size of the values. We 

334 use a function here, so that you can pass 'len' if you are just 

335 using simple strings, or a more complex function if you are using 

336 something like a list of strings, or even a custom object. 

337 The function should take the form "compute_size(value) => integer". 

338 If not supplied, it defaults to 'len()' 

339 """ 

340 self._value_size = 0 

341 if compute_size is None: 

342 self._compute_size = cast(Callable[[V], int], len) 

343 else: 

344 self._compute_size = compute_size 

345 self._update_max_size(max_size, after_cleanup_size=after_cleanup_size) 

346 LRUCache.__init__(self, max_cache=max(int(max_size / 512), 1)) 

347 

348 def add( 

349 self, key: K, value: V, cleanup: Optional[Callable[[K, V], None]] = None 

350 ) -> None: 

351 """Add a new value to the cache. 

352 

353 Also, if the entry is ever removed from the cache, call 

354 cleanup(key, value). 

355 

356 Args: 

357 key: The key to store it under 

358 value: The object to store 

359 cleanup: None or a function taking (key, value) to indicate 

360 'value' should be cleaned up. 

361 """ 

362 if key is _null_key: 

363 raise ValueError("cannot use _null_key as a key") 

364 node = self._cache.get(key, None) 

365 value_len = self._compute_size(value) 

366 if value_len >= self._after_cleanup_size: 

367 # The new value is 'too big to fit', as it would fill up/overflow 

368 # the cache all by itself 

369 if node is not None: 

370 # We won't be replacing the old node, so just remove it 

371 self._remove_node(node) 

372 if cleanup is not None: 

373 cleanup(key, value) 

374 return 

375 if node is None: 

376 node = _LRUNode(key, value, cleanup=cleanup) 

377 self._cache[key] = node 

378 else: 

379 assert node.size is not None 

380 self._value_size -= node.size 

381 node.size = value_len 

382 self._value_size += value_len 

383 self._record_access(node) 

384 

385 if self._value_size > self._max_size: 

386 # Time to cleanup 

387 self.cleanup() 

388 

389 def cleanup(self) -> None: 

390 """Clear the cache until it shrinks to the requested size. 

391 

392 This does not completely wipe the cache, just makes sure it is under 

393 the after_cleanup_size. 

394 """ 

395 # Make sure the cache is shrunk to the correct size 

396 while self._value_size > self._after_cleanup_size: 

397 self._remove_lru() 

398 

399 def _remove_node(self, node: _LRUNode[K, V]) -> None: 

400 assert node.size is not None 

401 self._value_size -= node.size 

402 LRUCache._remove_node(self, node) 

403 

404 def resize(self, max_size: int, after_cleanup_size: Optional[int] = None) -> None: 

405 """Change the number of bytes that will be cached.""" 

406 self._update_max_size(max_size, after_cleanup_size=after_cleanup_size) 

407 max_cache = max(int(max_size / 512), 1) 

408 self._update_max_cache(max_cache) 

409 

410 def _update_max_size( 

411 self, max_size: int, after_cleanup_size: Optional[int] = None 

412 ) -> None: 

413 self._max_size = max_size 

414 if after_cleanup_size is None: 

415 self._after_cleanup_size = self._max_size * 8 // 10 

416 else: 

417 self._after_cleanup_size = min(after_cleanup_size, self._max_size)