Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/lru_cache.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

204 statements  

1# lru_cache.py -- Simple LRU cache for dulwich 

2# Copyright (C) 2006, 2008 Canonical Ltd 

3# Copyright (C) 2022 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""A simple least-recently-used (LRU) cache.""" 

24 

25__all__ = [ 

26 "LRUCache", 

27 "LRUSizeCache", 

28] 

29 

30from collections.abc import Callable, Iterable, Iterator 

31from typing import Generic, TypeVar, cast 

32 

33_null_key = object() 

34 

35 

36K = TypeVar("K") 

37V = TypeVar("V") 

38 

39 

40class _LRUNode(Generic[K, V]): 

41 """This maintains the linked-list which is the lru internals.""" 

42 

43 __slots__ = ("cleanup", "key", "next_key", "prev", "size", "value") 

44 

45 prev: "_LRUNode[K, V] | None" 

46 next_key: K | object 

47 size: int | None 

48 

49 def __init__( 

50 self, key: K, value: V, cleanup: Callable[[K, V], None] | None = None 

51 ) -> None: 

52 self.prev = None 

53 self.next_key = _null_key 

54 self.key = key 

55 self.value = value 

56 self.cleanup = cleanup 

57 # TODO: We could compute this 'on-the-fly' like we used to, and remove 

58 # one pointer from this object, we just need to decide if it 

59 # actually costs us much of anything in normal usage 

60 self.size = None 

61 

62 def __repr__(self) -> str: 

63 if self.prev is None: 

64 prev_key = None 

65 else: 

66 prev_key = self.prev.key 

67 return f"{self.__class__.__name__}({self.key!r} n:{self.next_key!r} p:{prev_key!r})" 

68 

69 def run_cleanup(self) -> None: 

70 if self.cleanup is not None: 

71 self.cleanup(self.key, self.value) 

72 self.cleanup = None 

73 # Just make sure to break any refcycles, etc 

74 del self.value 

75 

76 

77class LRUCache(Generic[K, V]): 

78 """A class which manages a cache of entries, removing unused ones.""" 

79 

80 _least_recently_used: _LRUNode[K, V] | None 

81 _most_recently_used: _LRUNode[K, V] | None 

82 

83 def __init__( 

84 self, max_cache: int = 100, after_cleanup_count: int | None = None 

85 ) -> None: 

86 """Initialize LRUCache. 

87 

88 Args: 

89 max_cache: Maximum number of entries to cache 

90 after_cleanup_count: Number of entries to keep after cleanup 

91 """ 

92 self._cache: dict[K, _LRUNode[K, V]] = {} 

93 # The "HEAD" of the lru linked list 

94 self._most_recently_used = None 

95 # The "TAIL" of the lru linked list 

96 self._least_recently_used = None 

97 self._update_max_cache(max_cache, after_cleanup_count) 

98 

99 def __contains__(self, key: K) -> bool: 

100 """Check if key is in cache.""" 

101 return key in self._cache 

102 

103 def __getitem__(self, key: K) -> V: 

104 """Get item from cache and mark as recently used.""" 

105 cache = self._cache 

106 node = cache[key] 

107 # Inlined from _record_access to decrease the overhead of __getitem__ 

108 # We also have more knowledge about structure if __getitem__ is 

109 # succeeding, then we know that self._most_recently_used must not be 

110 # None, etc. 

111 mru = self._most_recently_used 

112 if node is mru: 

113 # Nothing to do, this node is already at the head of the queue 

114 return node.value 

115 # Remove this node from the old location 

116 node_prev = node.prev 

117 next_key = node.next_key 

118 # benchmarking shows that the lookup of _null_key in globals is faster 

119 # than the attribute lookup for (node is self._least_recently_used) 

120 if next_key is _null_key: 

121 # 'node' is the _least_recently_used, because it doesn't have a 

122 # 'next' item. So move the current lru to the previous node. 

123 self._least_recently_used = node_prev 

124 else: 

125 node_next = cache[cast(K, next_key)] 

126 node_next.prev = node_prev 

127 assert node_prev 

128 assert mru 

129 node_prev.next_key = next_key 

130 # Insert this node at the front of the list 

131 node.next_key = mru.key 

132 mru.prev = node 

133 self._most_recently_used = node 

134 node.prev = None 

135 return node.value 

136 

137 def __len__(self) -> int: 

138 """Return number of items in cache.""" 

139 return len(self._cache) 

140 

141 def _walk_lru(self) -> Iterator[_LRUNode[K, V]]: 

142 """Walk the LRU list, only meant to be used in tests.""" 

143 node = self._most_recently_used 

144 if node is not None: 

145 if node.prev is not None: 

146 raise AssertionError( 

147 "the _most_recently_used entry is not" 

148 " supposed to have a previous entry" 

149 f" {node}" 

150 ) 

151 while node is not None: 

152 if node.next_key is _null_key: 

153 if node is not self._least_recently_used: 

154 raise AssertionError( 

155 f"only the last node should have no next value: {node}" 

156 ) 

157 node_next = None 

158 else: 

159 node_next = self._cache[cast(K, node.next_key)] 

160 if node_next.prev is not node: 

161 raise AssertionError( 

162 f"inconsistency found, node.next.prev != node: {node}" 

163 ) 

164 if node.prev is None: 

165 if node is not self._most_recently_used: 

166 raise AssertionError( 

167 "only the _most_recently_used should" 

168 f" not have a previous node: {node}" 

169 ) 

170 else: 

171 if node.prev.next_key != node.key: 

172 raise AssertionError( 

173 f"inconsistency found, node.prev.next != node: {node}" 

174 ) 

175 yield node 

176 node = node_next 

177 

178 def add( 

179 self, key: K, value: V, cleanup: Callable[[K, V], None] | None = None 

180 ) -> None: 

181 """Add a new value to the cache. 

182 

183 Also, if the entry is ever removed from the cache, call 

184 cleanup(key, value). 

185 

186 Args: 

187 key: The key to store it under 

188 value: The object to store 

189 cleanup: None or a function taking (key, value) to indicate 

190 'value' should be cleaned up. 

191 """ 

192 if key is _null_key: 

193 raise ValueError("cannot use _null_key as a key") 

194 if key in self._cache: 

195 node = self._cache[key] 

196 node.run_cleanup() 

197 node.value = value 

198 node.cleanup = cleanup 

199 else: 

200 node = _LRUNode(key, value, cleanup=cleanup) 

201 self._cache[key] = node 

202 self._record_access(node) 

203 

204 if len(self._cache) > self._max_cache: 

205 # Trigger the cleanup 

206 self.cleanup() 

207 

208 def cache_size(self) -> int: 

209 """Get the number of entries we will cache.""" 

210 return self._max_cache 

211 

212 def get(self, key: K, default: V | None = None) -> V | None: 

213 """Get value from cache with default if not found. 

214 

215 Args: 

216 key: Key to look up 

217 default: Default value if key not found 

218 

219 Returns: 

220 Value from cache or default 

221 """ 

222 node = self._cache.get(key, None) 

223 if node is None: 

224 return default 

225 self._record_access(node) 

226 return node.value 

227 

228 def keys(self) -> Iterable[K]: 

229 """Get the list of keys currently cached. 

230 

231 Note that values returned here may not be available by the time you 

232 request them later. This is simply meant as a peak into the current 

233 state. 

234 

235 Returns: An unordered list of keys that are currently cached. 

236 """ 

237 return self._cache.keys() 

238 

239 def items(self) -> dict[K, V]: 

240 """Get the key:value pairs as a dict.""" 

241 return {k: n.value for k, n in self._cache.items()} 

242 

243 def cleanup(self) -> None: 

244 """Clear the cache until it shrinks to the requested size. 

245 

246 This does not completely wipe the cache, just makes sure it is under 

247 the after_cleanup_count. 

248 """ 

249 # Make sure the cache is shrunk to the correct size 

250 while len(self._cache) > self._after_cleanup_count: 

251 self._remove_lru() 

252 

253 def __setitem__(self, key: K, value: V) -> None: 

254 """Add a value to the cache, there will be no cleanup function.""" 

255 self.add(key, value, cleanup=None) 

256 

257 def _record_access(self, node: _LRUNode[K, V]) -> None: 

258 """Record that key was accessed.""" 

259 # Move 'node' to the front of the queue 

260 if self._most_recently_used is None: 

261 self._most_recently_used = node 

262 self._least_recently_used = node 

263 return 

264 elif node is self._most_recently_used: 

265 # Nothing to do, this node is already at the head of the queue 

266 return 

267 # We've taken care of the tail pointer, remove the node, and insert it 

268 # at the front 

269 # REMOVE 

270 if node is self._least_recently_used: 

271 self._least_recently_used = node.prev 

272 if node.prev is not None: 

273 node.prev.next_key = node.next_key 

274 if node.next_key is not _null_key: 

275 node_next = self._cache[cast(K, node.next_key)] 

276 node_next.prev = node.prev 

277 # INSERT 

278 node.next_key = self._most_recently_used.key 

279 self._most_recently_used.prev = node 

280 self._most_recently_used = node 

281 node.prev = None 

282 

283 def _remove_node(self, node: _LRUNode[K, V]) -> None: 

284 if node is self._least_recently_used: 

285 self._least_recently_used = node.prev 

286 self._cache.pop(node.key) 

287 # If we have removed all entries, remove the head pointer as well 

288 if self._least_recently_used is None: 

289 self._most_recently_used = None 

290 node.run_cleanup() 

291 # Now remove this node from the linked list 

292 if node.prev is not None: 

293 node.prev.next_key = node.next_key 

294 if node.next_key is not _null_key: 

295 node_next = self._cache[cast(K, node.next_key)] 

296 node_next.prev = node.prev 

297 # And remove this node's pointers 

298 node.prev = None 

299 node.next_key = _null_key 

300 

301 def _remove_lru(self) -> None: 

302 """Remove one entry from the lru, and handle consequences. 

303 

304 If there are no more references to the lru, then this entry should be 

305 removed from the cache. 

306 """ 

307 assert self._least_recently_used 

308 self._remove_node(self._least_recently_used) 

309 

310 def clear(self) -> None: 

311 """Clear out all of the cache.""" 

312 # Clean up in LRU order 

313 while self._cache: 

314 self._remove_lru() 

315 

316 def resize(self, max_cache: int, after_cleanup_count: int | None = None) -> None: 

317 """Change the number of entries that will be cached.""" 

318 self._update_max_cache(max_cache, after_cleanup_count=after_cleanup_count) 

319 

320 def _update_max_cache( 

321 self, max_cache: int, after_cleanup_count: int | None = None 

322 ) -> None: 

323 self._max_cache = max_cache 

324 if after_cleanup_count is None: 

325 self._after_cleanup_count = self._max_cache * 8 / 10 

326 else: 

327 self._after_cleanup_count = min(after_cleanup_count, self._max_cache) 

328 self.cleanup() 

329 

330 

331class LRUSizeCache(LRUCache[K, V]): 

332 """An LRUCache that removes things based on the size of the values. 

333 

334 This differs in that it doesn't care how many actual items there are, 

335 it just restricts the cache to be cleaned up after so much data is stored. 

336 

337 The size of items added will be computed using compute_size(value), which 

338 defaults to len() if not supplied. 

339 """ 

340 

341 _compute_size: Callable[[V], int] 

342 

343 def __init__( 

344 self, 

345 max_size: int = 1024 * 1024, 

346 after_cleanup_size: int | None = None, 

347 compute_size: Callable[[V], int] | None = None, 

348 ) -> None: 

349 """Create a new LRUSizeCache. 

350 

351 Args: 

352 max_size: The max number of bytes to store before we start 

353 clearing out entries. 

354 after_cleanup_size: After cleaning up, shrink everything to this 

355 size. 

356 compute_size: A function to compute the size of the values. We 

357 use a function here, so that you can pass 'len' if you are just 

358 using simple strings, or a more complex function if you are using 

359 something like a list of strings, or even a custom object. 

360 The function should take the form "compute_size(value) => integer". 

361 If not supplied, it defaults to 'len()' 

362 """ 

363 self._value_size = 0 

364 if compute_size is None: 

365 self._compute_size = cast(Callable[[V], int], len) 

366 else: 

367 self._compute_size = compute_size 

368 self._update_max_size(max_size, after_cleanup_size=after_cleanup_size) 

369 LRUCache.__init__(self, max_cache=max(int(max_size / 512), 1)) 

370 

371 def add( 

372 self, key: K, value: V, cleanup: Callable[[K, V], None] | None = None 

373 ) -> None: 

374 """Add a new value to the cache. 

375 

376 Also, if the entry is ever removed from the cache, call 

377 cleanup(key, value). 

378 

379 Args: 

380 key: The key to store it under 

381 value: The object to store 

382 cleanup: None or a function taking (key, value) to indicate 

383 'value' should be cleaned up. 

384 """ 

385 if key is _null_key: 

386 raise ValueError("cannot use _null_key as a key") 

387 node = self._cache.get(key, None) 

388 value_len = self._compute_size(value) 

389 if value_len >= self._after_cleanup_size: 

390 # The new value is 'too big to fit', as it would fill up/overflow 

391 # the cache all by itself 

392 if node is not None: 

393 # We won't be replacing the old node, so just remove it 

394 self._remove_node(node) 

395 if cleanup is not None: 

396 cleanup(key, value) 

397 return 

398 if node is None: 

399 node = _LRUNode(key, value, cleanup=cleanup) 

400 self._cache[key] = node 

401 else: 

402 assert node.size is not None 

403 self._value_size -= node.size 

404 node.size = value_len 

405 self._value_size += value_len 

406 self._record_access(node) 

407 

408 if self._value_size > self._max_size: 

409 # Time to cleanup 

410 self.cleanup() 

411 

412 def cleanup(self) -> None: 

413 """Clear the cache until it shrinks to the requested size. 

414 

415 This does not completely wipe the cache, just makes sure it is under 

416 the after_cleanup_size. 

417 """ 

418 # Make sure the cache is shrunk to the correct size 

419 while self._value_size > self._after_cleanup_size: 

420 self._remove_lru() 

421 

422 def _remove_node(self, node: _LRUNode[K, V]) -> None: 

423 assert node.size is not None 

424 self._value_size -= node.size 

425 LRUCache._remove_node(self, node) 

426 

427 def resize(self, max_size: int, after_cleanup_size: int | None = None) -> None: 

428 """Change the number of bytes that will be cached.""" 

429 self._update_max_size(max_size, after_cleanup_size=after_cleanup_size) 

430 max_cache = max(int(max_size / 512), 1) 

431 self._update_max_cache(max_cache) 

432 

433 def _update_max_size( 

434 self, max_size: int, after_cleanup_size: int | None = None 

435 ) -> None: 

436 self._max_size = max_size 

437 if after_cleanup_size is None: 

438 self._after_cleanup_size = self._max_size * 8 // 10 

439 else: 

440 self._after_cleanup_size = min(after_cleanup_size, self._max_size)