Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/lru_cache.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# lru_cache.py -- Simple LRU cache for dulwich
2# Copyright (C) 2006, 2008 Canonical Ltd
3# Copyright (C) 2022 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
23"""A simple least-recently-used (LRU) cache."""
25__all__ = [
26 "LRUCache",
27 "LRUSizeCache",
28]
30from collections.abc import Callable, Iterable, Iterator
31from typing import Generic, TypeVar, cast
33_null_key = object()
36K = TypeVar("K")
37V = TypeVar("V")
40class _LRUNode(Generic[K, V]):
41 """This maintains the linked-list which is the lru internals."""
43 __slots__ = ("cleanup", "key", "next_key", "prev", "size", "value")
45 prev: "_LRUNode[K, V] | None"
46 next_key: K | object
47 size: int | None
49 def __init__(
50 self, key: K, value: V, cleanup: Callable[[K, V], None] | None = None
51 ) -> None:
52 self.prev = None
53 self.next_key = _null_key
54 self.key = key
55 self.value = value
56 self.cleanup = cleanup
57 # TODO: We could compute this 'on-the-fly' like we used to, and remove
58 # one pointer from this object, we just need to decide if it
59 # actually costs us much of anything in normal usage
60 self.size = None
62 def __repr__(self) -> str:
63 if self.prev is None:
64 prev_key = None
65 else:
66 prev_key = self.prev.key
67 return f"{self.__class__.__name__}({self.key!r} n:{self.next_key!r} p:{prev_key!r})"
69 def run_cleanup(self) -> None:
70 if self.cleanup is not None:
71 self.cleanup(self.key, self.value)
72 self.cleanup = None
73 # Just make sure to break any refcycles, etc
74 del self.value
77class LRUCache(Generic[K, V]):
78 """A class which manages a cache of entries, removing unused ones."""
80 _least_recently_used: _LRUNode[K, V] | None
81 _most_recently_used: _LRUNode[K, V] | None
83 def __init__(
84 self, max_cache: int = 100, after_cleanup_count: int | None = None
85 ) -> None:
86 """Initialize LRUCache.
88 Args:
89 max_cache: Maximum number of entries to cache
90 after_cleanup_count: Number of entries to keep after cleanup
91 """
92 self._cache: dict[K, _LRUNode[K, V]] = {}
93 # The "HEAD" of the lru linked list
94 self._most_recently_used = None
95 # The "TAIL" of the lru linked list
96 self._least_recently_used = None
97 self._update_max_cache(max_cache, after_cleanup_count)
99 def __contains__(self, key: K) -> bool:
100 """Check if key is in cache."""
101 return key in self._cache
103 def __getitem__(self, key: K) -> V:
104 """Get item from cache and mark as recently used."""
105 cache = self._cache
106 node = cache[key]
107 # Inlined from _record_access to decrease the overhead of __getitem__
108 # We also have more knowledge about structure if __getitem__ is
109 # succeeding, then we know that self._most_recently_used must not be
110 # None, etc.
111 mru = self._most_recently_used
112 if node is mru:
113 # Nothing to do, this node is already at the head of the queue
114 return node.value
115 # Remove this node from the old location
116 node_prev = node.prev
117 next_key = node.next_key
118 # benchmarking shows that the lookup of _null_key in globals is faster
119 # than the attribute lookup for (node is self._least_recently_used)
120 if next_key is _null_key:
121 # 'node' is the _least_recently_used, because it doesn't have a
122 # 'next' item. So move the current lru to the previous node.
123 self._least_recently_used = node_prev
124 else:
125 node_next = cache[cast(K, next_key)]
126 node_next.prev = node_prev
127 assert node_prev
128 assert mru
129 node_prev.next_key = next_key
130 # Insert this node at the front of the list
131 node.next_key = mru.key
132 mru.prev = node
133 self._most_recently_used = node
134 node.prev = None
135 return node.value
137 def __len__(self) -> int:
138 """Return number of items in cache."""
139 return len(self._cache)
141 def _walk_lru(self) -> Iterator[_LRUNode[K, V]]:
142 """Walk the LRU list, only meant to be used in tests."""
143 node = self._most_recently_used
144 if node is not None:
145 if node.prev is not None:
146 raise AssertionError(
147 "the _most_recently_used entry is not"
148 " supposed to have a previous entry"
149 f" {node}"
150 )
151 while node is not None:
152 if node.next_key is _null_key:
153 if node is not self._least_recently_used:
154 raise AssertionError(
155 f"only the last node should have no next value: {node}"
156 )
157 node_next = None
158 else:
159 node_next = self._cache[cast(K, node.next_key)]
160 if node_next.prev is not node:
161 raise AssertionError(
162 f"inconsistency found, node.next.prev != node: {node}"
163 )
164 if node.prev is None:
165 if node is not self._most_recently_used:
166 raise AssertionError(
167 "only the _most_recently_used should"
168 f" not have a previous node: {node}"
169 )
170 else:
171 if node.prev.next_key != node.key:
172 raise AssertionError(
173 f"inconsistency found, node.prev.next != node: {node}"
174 )
175 yield node
176 node = node_next
178 def add(
179 self, key: K, value: V, cleanup: Callable[[K, V], None] | None = None
180 ) -> None:
181 """Add a new value to the cache.
183 Also, if the entry is ever removed from the cache, call
184 cleanup(key, value).
186 Args:
187 key: The key to store it under
188 value: The object to store
189 cleanup: None or a function taking (key, value) to indicate
190 'value' should be cleaned up.
191 """
192 if key is _null_key:
193 raise ValueError("cannot use _null_key as a key")
194 if key in self._cache:
195 node = self._cache[key]
196 node.run_cleanup()
197 node.value = value
198 node.cleanup = cleanup
199 else:
200 node = _LRUNode(key, value, cleanup=cleanup)
201 self._cache[key] = node
202 self._record_access(node)
204 if len(self._cache) > self._max_cache:
205 # Trigger the cleanup
206 self.cleanup()
208 def cache_size(self) -> int:
209 """Get the number of entries we will cache."""
210 return self._max_cache
212 def get(self, key: K, default: V | None = None) -> V | None:
213 """Get value from cache with default if not found.
215 Args:
216 key: Key to look up
217 default: Default value if key not found
219 Returns:
220 Value from cache or default
221 """
222 node = self._cache.get(key, None)
223 if node is None:
224 return default
225 self._record_access(node)
226 return node.value
228 def keys(self) -> Iterable[K]:
229 """Get the list of keys currently cached.
231 Note that values returned here may not be available by the time you
232 request them later. This is simply meant as a peak into the current
233 state.
235 Returns: An unordered list of keys that are currently cached.
236 """
237 return self._cache.keys()
239 def items(self) -> dict[K, V]:
240 """Get the key:value pairs as a dict."""
241 return {k: n.value for k, n in self._cache.items()}
243 def cleanup(self) -> None:
244 """Clear the cache until it shrinks to the requested size.
246 This does not completely wipe the cache, just makes sure it is under
247 the after_cleanup_count.
248 """
249 # Make sure the cache is shrunk to the correct size
250 while len(self._cache) > self._after_cleanup_count:
251 self._remove_lru()
253 def __setitem__(self, key: K, value: V) -> None:
254 """Add a value to the cache, there will be no cleanup function."""
255 self.add(key, value, cleanup=None)
257 def _record_access(self, node: _LRUNode[K, V]) -> None:
258 """Record that key was accessed."""
259 # Move 'node' to the front of the queue
260 if self._most_recently_used is None:
261 self._most_recently_used = node
262 self._least_recently_used = node
263 return
264 elif node is self._most_recently_used:
265 # Nothing to do, this node is already at the head of the queue
266 return
267 # We've taken care of the tail pointer, remove the node, and insert it
268 # at the front
269 # REMOVE
270 if node is self._least_recently_used:
271 self._least_recently_used = node.prev
272 if node.prev is not None:
273 node.prev.next_key = node.next_key
274 if node.next_key is not _null_key:
275 node_next = self._cache[cast(K, node.next_key)]
276 node_next.prev = node.prev
277 # INSERT
278 node.next_key = self._most_recently_used.key
279 self._most_recently_used.prev = node
280 self._most_recently_used = node
281 node.prev = None
283 def _remove_node(self, node: _LRUNode[K, V]) -> None:
284 if node is self._least_recently_used:
285 self._least_recently_used = node.prev
286 self._cache.pop(node.key)
287 # If we have removed all entries, remove the head pointer as well
288 if self._least_recently_used is None:
289 self._most_recently_used = None
290 node.run_cleanup()
291 # Now remove this node from the linked list
292 if node.prev is not None:
293 node.prev.next_key = node.next_key
294 if node.next_key is not _null_key:
295 node_next = self._cache[cast(K, node.next_key)]
296 node_next.prev = node.prev
297 # And remove this node's pointers
298 node.prev = None
299 node.next_key = _null_key
301 def _remove_lru(self) -> None:
302 """Remove one entry from the lru, and handle consequences.
304 If there are no more references to the lru, then this entry should be
305 removed from the cache.
306 """
307 assert self._least_recently_used
308 self._remove_node(self._least_recently_used)
310 def clear(self) -> None:
311 """Clear out all of the cache."""
312 # Clean up in LRU order
313 while self._cache:
314 self._remove_lru()
316 def resize(self, max_cache: int, after_cleanup_count: int | None = None) -> None:
317 """Change the number of entries that will be cached."""
318 self._update_max_cache(max_cache, after_cleanup_count=after_cleanup_count)
320 def _update_max_cache(
321 self, max_cache: int, after_cleanup_count: int | None = None
322 ) -> None:
323 self._max_cache = max_cache
324 if after_cleanup_count is None:
325 self._after_cleanup_count = self._max_cache * 8 / 10
326 else:
327 self._after_cleanup_count = min(after_cleanup_count, self._max_cache)
328 self.cleanup()
331class LRUSizeCache(LRUCache[K, V]):
332 """An LRUCache that removes things based on the size of the values.
334 This differs in that it doesn't care how many actual items there are,
335 it just restricts the cache to be cleaned up after so much data is stored.
337 The size of items added will be computed using compute_size(value), which
338 defaults to len() if not supplied.
339 """
341 _compute_size: Callable[[V], int]
343 def __init__(
344 self,
345 max_size: int = 1024 * 1024,
346 after_cleanup_size: int | None = None,
347 compute_size: Callable[[V], int] | None = None,
348 ) -> None:
349 """Create a new LRUSizeCache.
351 Args:
352 max_size: The max number of bytes to store before we start
353 clearing out entries.
354 after_cleanup_size: After cleaning up, shrink everything to this
355 size.
356 compute_size: A function to compute the size of the values. We
357 use a function here, so that you can pass 'len' if you are just
358 using simple strings, or a more complex function if you are using
359 something like a list of strings, or even a custom object.
360 The function should take the form "compute_size(value) => integer".
361 If not supplied, it defaults to 'len()'
362 """
363 self._value_size = 0
364 if compute_size is None:
365 self._compute_size = cast(Callable[[V], int], len)
366 else:
367 self._compute_size = compute_size
368 self._update_max_size(max_size, after_cleanup_size=after_cleanup_size)
369 LRUCache.__init__(self, max_cache=max(int(max_size / 512), 1))
371 def add(
372 self, key: K, value: V, cleanup: Callable[[K, V], None] | None = None
373 ) -> None:
374 """Add a new value to the cache.
376 Also, if the entry is ever removed from the cache, call
377 cleanup(key, value).
379 Args:
380 key: The key to store it under
381 value: The object to store
382 cleanup: None or a function taking (key, value) to indicate
383 'value' should be cleaned up.
384 """
385 if key is _null_key:
386 raise ValueError("cannot use _null_key as a key")
387 node = self._cache.get(key, None)
388 value_len = self._compute_size(value)
389 if value_len >= self._after_cleanup_size:
390 # The new value is 'too big to fit', as it would fill up/overflow
391 # the cache all by itself
392 if node is not None:
393 # We won't be replacing the old node, so just remove it
394 self._remove_node(node)
395 if cleanup is not None:
396 cleanup(key, value)
397 return
398 if node is None:
399 node = _LRUNode(key, value, cleanup=cleanup)
400 self._cache[key] = node
401 else:
402 assert node.size is not None
403 self._value_size -= node.size
404 node.size = value_len
405 self._value_size += value_len
406 self._record_access(node)
408 if self._value_size > self._max_size:
409 # Time to cleanup
410 self.cleanup()
412 def cleanup(self) -> None:
413 """Clear the cache until it shrinks to the requested size.
415 This does not completely wipe the cache, just makes sure it is under
416 the after_cleanup_size.
417 """
418 # Make sure the cache is shrunk to the correct size
419 while self._value_size > self._after_cleanup_size:
420 self._remove_lru()
422 def _remove_node(self, node: _LRUNode[K, V]) -> None:
423 assert node.size is not None
424 self._value_size -= node.size
425 LRUCache._remove_node(self, node)
427 def resize(self, max_size: int, after_cleanup_size: int | None = None) -> None:
428 """Change the number of bytes that will be cached."""
429 self._update_max_size(max_size, after_cleanup_size=after_cleanup_size)
430 max_cache = max(int(max_size / 512), 1)
431 self._update_max_cache(max_cache)
433 def _update_max_size(
434 self, max_size: int, after_cleanup_size: int | None = None
435 ) -> None:
436 self._max_size = max_size
437 if after_cleanup_size is None:
438 self._after_cleanup_size = self._max_size * 8 // 10
439 else:
440 self._after_cleanup_size = min(after_cleanup_size, self._max_size)